You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/09/10 13:57:37 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1104 Add SSL docker tests for PublishKafka and automate them in CI

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e99d31a  MINIFICPP-1104 Add SSL docker tests for PublishKafka and automate them in CI
e99d31a is described below

commit e99d31a9fc035a7d91044320746fa7e089a734a5
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Aug 24 15:39:07 2020 +0200

    MINIFICPP-1104 Add SSL docker tests for PublishKafka and automate them in CI
    
    MINIFICPP-1104 Fix exception handling of py::eval
    
    If pybind11 exceptions are not handled in the scope of the GIL
    it causes a segmentation fault. In out case it caused a segmentation
    fault if the to be evaluated python processor could not be loaded (e.g
    missing dependency in imports).
    
    See https://github.com/pybind/pybind11/issues/1803
    
    MINIFICPP-1104 Fix broken integration tests
    
    - Remove Windows specific DNS name
    - Change default scheduling strategy to TIMER_DRIVEN in tests to bypass
      incoming connection tests for EVENT_DRIVEN strategy
    - Change message timeout to 12 seconds to fix configuration error of
      linger.ms being higher than message.timeout.ms
    
    MINIFICPP-1104 Switch spotify/kafka image to newer kafka image
    
    - Separate wurstmeister/zookeeper and wurstmeister/kafka images are used
      now instead of spotify/kafka image for SSL testing.
    
    MINIFICPP-1104 - SSL docker tests for PublishKafka
    
    MINIFICPP-1104 Add docker tests to github workflows
    
    - Add docker tests to ci.yml
    - Change permissions of test files as github actions can only run docker
      containers as root
    - Change timeouts to adapt to github's environment
    
    MINIFICPP-1104 Speed up kafka integration tests
    
    In kafka tests the observer was not working correctly, ignoring the
    given subdirectory so it did not validate the output correctly. The
    correct validation was only done after the timeout was reached.
    Additionally the observer was never restarted when used multiple times
    and the observer thread failed on directory changes as well.
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #878
---
 .github/workflows/ci.yml                           |  13 ++
 docker/DockerVerify.sh                             |   6 +
 docker/test/integration/minifi/__init__.py         |  87 ++++++++----
 docker/test/integration/minifi/test/__init__.py    | 115 ++++++++-------
 .../integration/resources/kafka_broker/Dockerfile  |   3 +
 .../conf/certs/broker_kafka-broker_cert-file       |  18 +++
 .../conf/certs/broker_kafka-broker_cert-signed     |  20 +++
 .../certs/broker_kafka-broker_server.keystore.jks  | Bin 0 -> 3981 bytes
 .../broker_kafka-broker_server.truststore.jks      | Bin 0 -> 933 bytes
 .../conf/certs/broker_localhost_cert-file          |  17 +++
 .../conf/certs/broker_localhost_cert-signed        |  20 +++
 .../certs/broker_localhost_server.keystore.jks     | Bin 0 -> 3978 bytes
 .../certs/broker_localhost_server.truststore.jks   | Bin 0 -> 933 bytes
 .../resources/kafka_broker/conf/certs/ca-cert      |  21 +++
 .../resources/kafka_broker/conf/certs/ca-cert.key  |  30 ++++
 .../resources/kafka_broker/conf/certs/ca-cert.srl  |   1 +
 .../kafka_broker/conf/certs/client_LMN_cert-file   |  17 +++
 .../kafka_broker/conf/certs/client_LMN_cert-signed |  20 +++
 .../kafka_broker/conf/certs/client_LMN_client.key  |  30 ++++
 .../conf/certs/client_LMN_client.keystore.jks      | Bin 0 -> 3971 bytes
 .../kafka_broker/conf/certs/client_LMN_client.pem  |  20 +++
 .../kafka_broker/conf/certs/client_LMN_client.req  |  17 +++
 .../conf/certs/client_LMN_client.truststore.jks    | Bin 0 -> 933 bytes
 .../kafka_broker/conf/client-ssl-java.properties   |   7 +
 .../kafka_broker/conf/client-ssl.properties        |  14 ++
 .../kafka_broker/conf/server-ssl.properties        | 157 +++++++++++++++++++++
 .../resources/kafka_broker/conf/server.properties  | 136 ++++++++++++++++++
 docker/test/integration/test_rdkafka.py            |  44 ++++--
 extensions/script/python/PythonScriptEngine.cpp    |  13 +-
 29 files changed, 724 insertions(+), 102 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 349cd7d..e281fef 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -264,3 +264,16 @@ jobs:
           echo "::set-env name=PATH::/usr/lib/ccache:$PATH"
       - id: build
         run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS=ON -DSTRICT_GSL_CHECKS=AUDIT .. &&  cmake --build . --parallel 4  && make test ARGS="--timeout 300 -j4 --output-on-failure"
+  docker_integration_tests:
+    name: "Docker integration tests"
+    runs-on: ubuntu-20.04
+    timeout-minutes: 60
+    steps:
+      - id: checkout
+        uses: actions/checkout@v2
+      - id: build
+        run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT .. && make docker
+      - id: install_deps
+        run: sudo apt install -y python3-virtualenv
+      - id: test
+        run: cd build && make docker-verify
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index 592de3c..006e96f 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -17,6 +17,12 @@
 
 set -e
 
+if [[ $# -lt 1 ]]; then
+  echo "Usage:"
+  echo "  ./DockerVerify.sh <MINIFI_VERSION>"
+  exit 1
+fi
+
 docker_dir="$( cd ${0%/*} && pwd )"
 
 export MINIFI_VERSION=$1
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 3e55d5b..876ca21 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -30,7 +30,6 @@ from copy import copy
 import time
 from collections import OrderedDict
 
-
 class Cluster(object):
     """
     Base Cluster class. This is intended to be a generic interface
@@ -96,15 +95,7 @@ class SingleNodeDockerCluster(Cluster):
         if self.network is None:
             net_name = 'nifi-' + str(uuid.uuid4())
             logging.info('Creating network: %s', net_name)
-            # Set IP
-            ipam_pool = docker.types.IPAMPool(
-                subnet='192.168.42.0/24',
-                gateway='192.168.42.1'
-            )
-            ipam_config = docker.types.IPAMConfig(
-                pool_configs=[ipam_pool]
-            )
-            self.network = self.client.networks.create(net_name, ipam=ipam_config)
+            self.network = self.client.networks.create(net_name)
 
         if engine == 'nifi':
             self.deploy_nifi_flow(flow, name, vols)
@@ -216,32 +207,40 @@ class SingleNodeDockerCluster(Cluster):
         self.containers[container.name] = container
 
     def deploy_kafka_broker(self, name):
-        dockerfile = dedent("""FROM {base_image}
-                USER root
-                CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server host.docker.internal:9092 --topic test > heaven_signal.txt
-                """.format(base_image='spotify/kafka:latest'))
-
-        logging.info('Creating and running docker container for kafka broker...')
+        logging.info('Creating and running docker containers for kafka broker...')
+        zookeeper = self.client.containers.run(
+                    self.client.images.pull("wurstmeister/zookeeper:latest"),
+                    detach=True,
+                    name='zookeeper',
+                    network=self.network.name,
+                    ports={'2181/tcp': 2181},
+                    )
+        self.containers[zookeeper.name] = zookeeper
 
+        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
+        broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka')
         broker = self.client.containers.run(
-                    self.client.images.pull("spotify/kafka:latest"),
+                    broker_image[0],
                     detach=True,
                     name='kafka-broker',
-                    ports={'2181/tcp': 2181, '9092/tcp': 9092},
-                    environment=["ADVERTISED_HOST=192.168.42.4", "ADVERTISED_PORT=9092"]
+                    network=self.network.name,
+                    ports={'9092/tcp': 9092},
+                    environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
                     )
-        self.network.connect(broker, ipv4_address='192.168.42.4')
+        self.containers[broker.name] = broker
 
+        dockerfile = dedent("""FROM {base_image}
+                USER root
+                CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt
+                """.format(base_image='wurstmeister/kafka:2.12-2.5.0'))
         configured_image = self.build_image(dockerfile, [])
         consumer = self.client.containers.run(
                     configured_image[0],
                     detach=True,
                     name='kafka-consumer',
-                    network=self.network.name
+                    network=self.network.name,
                     )
-
         self.containers[consumer.name] = consumer
-        self.containers[broker.name] = broker
 
     def deploy_http_proxy(self):
         logging.info('Creating and running http-proxy docker container...')
@@ -301,6 +300,20 @@ class SingleNodeDockerCluster(Cluster):
 
         return configured_image
 
+    def build_image_by_path(self, dir, name=None):
+        try:
+            logging.info('Creating configured image...')
+            configured_image = self.client.images.build(path=dir,
+                                                        tag=name,
+                                                        rm=True,
+                                                        forcerm=True)
+            logging.info('Created image with id: %s', configured_image[0].id)
+            self.images.append(configured_image)
+            return configured_image
+        except Exception as e:
+            logging.info(e)
+            raise
+
     def __enter__(self):
         """
         Allocate ephemeral cluster resources.
@@ -318,7 +331,7 @@ class SingleNodeDockerCluster(Cluster):
             container.remove(v=True, force=True)
 
         # Clean up images
-        for image in self.images:
+        for image in reversed(self.images):
             logging.info('Cleaning up image: %s', image[0].id)
             self.client.images.remove(image[0].id, force=True)
 
@@ -437,7 +450,7 @@ class Processor(Connectable):
         self.controller_services = controller_services
 
         self.schedule = {
-            'scheduling strategy': 'EVENT_DRIVEN',
+            'scheduling strategy': 'TIMER_DRIVEN',
             'scheduling period': '1 sec',
             'penalization period': '30 sec',
             'yield period': '1 sec',
@@ -501,8 +514,8 @@ class LogAttribute(Processor):
     def __init__(self, ):
         super(LogAttribute, self).__init__('LogAttribute',
                                            auto_terminate=['success'])
-        
-        
+
+
 class DebugFlow(Processor):
     def __init__(self, ):
         super(DebugFlow, self).__init__('DebugFlow')
@@ -541,15 +554,31 @@ class PutFile(Processor):
         else:
             return key
 
+
 class PublishKafka(Processor):
     def __init__(self):
         super(PublishKafka, self).__init__('PublishKafka',
-                                           properties={'Client Name': 'nghiaxlee', 'Known Brokers': '192.168.42.4:9092', 'Topic Name': 'test',
+                                           properties={'Client Name': 'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test',
                                                        'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1',
-                                                       'Request Timeout': '10 sec', 'Message Timeout': '5 sec'},
+                                                       'Request Timeout': '10 sec', 'Message Timeout': '12 sec'},
                                            auto_terminate=['success'])
 
 
+class PublishKafkaSSL(Processor):
+    def __init__(self):
+        super(PublishKafkaSSL, self).__init__('PublishKafka',
+                                              properties={'Client Name': 'LMN', 'Known Brokers': 'kafka-broker:9093',
+                                                          'Topic Name': 'test', 'Batch Size': '10',
+                                                          'Compress Codec': 'none', 'Delivery Guarantee': '1',
+                                                          'Request Timeout': '10 sec', 'Message Timeout': '12 sec',
+                                                          'Security CA': '/tmp/resources/certs/ca-cert',
+                                                          'Security Cert': '/tmp/resources/certs/client_LMN_client.pem',
+                                                          'Security Pass Phrase': 'abcdefgh',
+                                                          'Security Private Key': '/tmp/resources/certs/client_LMN_client.key',
+                                                          'Security Protocol': 'ssl'},
+                                              auto_terminate=['success'])
+
+
 class InputPort(Connectable):
     def __init__(self, name=None, remote_process_group=None):
         super(InputPort, self).__init__(name=name)
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 2a4218f..fd0466c 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -19,6 +19,8 @@ import uuid
 import tarfile
 import subprocess
 import sys
+import time
+import subprocess
 from io import BytesIO
 from threading import Event
 
@@ -56,6 +58,13 @@ class DockerTestCluster(SingleNodeDockerCluster):
         os.makedirs(self.tmp_test_output_dir)
         logging.info('Creating tmp test resource dir: %s', self.tmp_test_resources_dir)
         os.makedirs(self.tmp_test_resources_dir)
+        os.chmod(self.tmp_test_output_dir, 0o777)
+        os.chmod(self.tmp_test_input_dir, 0o777)
+        os.chmod(self.tmp_test_resources_dir, 0o777)
+
+        # Add resources
+        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
+        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.tmp_test_resources_dir + "/certs")
 
         # Point output validator to ephemeral output dir
         self.output_validator = output_validator
@@ -64,15 +73,14 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
         # Start observing output dir
         self.done_event = Event()
-        event_handler = OutputEventHandler(output_validator, self.done_event)
+        self.event_handler = OutputEventHandler(self.output_validator, self.done_event)
         self.observer = Observer()
-        self.observer.schedule(event_handler, self.tmp_test_output_dir)
+        self.observer.schedule(self.event_handler, self.tmp_test_output_dir)
         self.observer.start()
 
         super(DockerTestCluster, self).__init__()
 
-        if isinstance(output_validator, KafkaValidator):
-            output_validator.set_containers(self.containers)
+
 
     def deploy_flow(self,
                     flow,
@@ -135,8 +143,18 @@ class DockerTestCluster(SingleNodeDockerCluster):
         file_abs_path = join(self.tmp_test_resources_dir, file_name)
         put_file_contents(contents, file_abs_path)
 
+    def restart_observer_if_needed(self):
+        if self.observer.is_alive():
+            return
+
+        self.observer = Observer()
+        self.done_event.clear()
+        self.observer.schedule(self.event_handler, self.tmp_test_output_dir)
+        self.observer.start()
+
     def wait_for_output(self, timeout_seconds):
         logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
+        self.restart_observer_if_needed()
         self.done_event.wait(timeout_seconds)
         self.observer.stop()
         self.observer.join()
@@ -168,16 +186,16 @@ class DockerTestCluster(SingleNodeDockerCluster):
             stats = container.stats(stream=False)
             logging.info('Container stats:\n%s', stats)
 
-    def check_output(self, timeout=5, **kwargs):
+    def check_output(self, timeout=5, subdir=''):
         """
         Wait for flow output, validate it, and log minifi output.
         """
+        if subdir:
+            self.output_validator.subdir = subdir
         self.wait_for_output(timeout)
         self.log_nifi_output()
         if self.segfault:
-            return false
-        if isinstance(self.output_validator, FileOutputValidator):
-            return self.output_validator.validate(dir=kwargs.get('dir', ''))
+            return False
         return self.output_validator.validate()
 
     def check_http_proxy_access(self):
@@ -189,8 +207,20 @@ class DockerTestCluster(SingleNodeDockerCluster):
         return False
 
     def rm_out_child(self, dir):
-        logging.info('Removing %s from output folder', self.tmp_test_output_dir + dir)
-        shutil.rmtree(self.tmp_test_output_dir + dir)
+        logging.info('Removing %s from output folder', os.path.join(self.tmp_test_output_dir, dir))
+        shutil.rmtree(os.path.join(self.tmp_test_output_dir, dir))
+
+    def wait_for_container_logs(self, container_name, log, timeout, count=1):
+        logging.info('Waiting for logs `%s` in container `%s`', log, container_name)
+        container = self.containers[container_name]
+        check_count = 0
+        while check_count <= timeout:
+            if container.logs().decode("utf-8").count(log) == count:
+                return True
+            else:
+                check_count += 1
+                time.sleep(1)
+        return False
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         """
@@ -254,25 +284,31 @@ class SingleFileOutputValidator(FileOutputValidator):
     Validates the content of a single file in the given directory.
     """
 
-    def __init__(self, expected_content):
+    def __init__(self, expected_content, subdir=''):
         self.valid = False
         self.expected_content = expected_content
+        self.subdir = subdir
 
-    def validate(self, dir=''):
-
+    def validate(self):
         self.valid = False
-
-        full_dir = self.output_dir + dir
+        full_dir = os.path.join(self.output_dir, self.subdir)
         logging.info("Output folder: %s", full_dir)
+        if "GITHUB_WORKSPACE" in os.environ:
+            subprocess.call(['sudo', 'chmod', '-R', '0777', full_dir])
 
-        listing = listdir(full_dir)
+        if not os.path.isdir(full_dir):
+            return self.valid
 
+        listing = listdir(full_dir)
         if listing:
             for l in listing:
                 logging.info("name:: %s", l)
             out_file_name = listing[0]
+            full_path = join(full_dir, out_file_name)
+            if not os.path.isfile(full_path):
+                return self.valid
 
-            with open(join(full_dir, out_file_name), 'r') as out_file:
+            with open(full_path, 'r') as out_file:
                 contents = out_file.read()
                 logging.info("dir %s -- name %s", full_dir, out_file_name)
                 logging.info("expected %s -- content %s", self.expected_content, contents)
@@ -282,47 +318,6 @@ class SingleFileOutputValidator(FileOutputValidator):
 
         return self.valid
 
-class KafkaValidator(OutputValidator):
-    """
-    Validates PublishKafka
-    """
-
-    def __init__(self, expected_content):
-        self.valid = False
-        self.expected_content = expected_content
-        self.containers = None
-
-    def set_containers(self, containers):
-        self.containers = containers
-
-    def validate(self):
-
-        if self.valid:
-            return True
-        if self.containers is None:
-            return self.valid
-
-        if 'kafka-consumer' not in self.containers:
-            logging.info('Not found kafka container.')
-            return False
-        else:
-            kafka_container = self.containers['kafka-consumer']
-
-        output, stat = kafka_container.get_archive('/heaven_signal.txt')
-        file_obj = BytesIO()
-        for i in output:
-            file_obj.write(i)
-        file_obj.seek(0)
-        tar = tarfile.open(mode='r', fileobj=file_obj)
-        contents = tar.extractfile('heaven_signal.txt').read()
-        logging.info("expected %s -- content %s", self.expected_content, contents)
-
-        contents = contents.decode("utf-8")
-        if self.expected_content in contents:
-            self.valid = True
-
-        logging.info("expected %s -- content %s", self.expected_content, contents)
-        return self.valid
 
 class EmptyFilesOutPutValidator(FileOutputValidator):
     """
@@ -338,6 +333,8 @@ class EmptyFilesOutPutValidator(FileOutputValidator):
 
         full_dir = self.output_dir + dir
         logging.info("Output folder: %s", full_dir)
+        if "GITHUB_WORKSPACE" in os.environ:
+            subprocess.call(['sudo', 'chmod', '-R', '0777', full_dir])
 
         listing = listdir(full_dir)
         if listing:
@@ -359,6 +356,8 @@ class NoFileOutPutValidator(FileOutputValidator):
 
         full_dir = self.output_dir + dir
         logging.info("Output folder: %s", full_dir)
+        if "GITHUB_WORKSPACE" in os.environ:
+            subprocess.call(['sudo', 'chmod', '-R', '0777', full_dir])
 
         listing = listdir(full_dir)
 
diff --git a/docker/test/integration/resources/kafka_broker/Dockerfile b/docker/test/integration/resources/kafka_broker/Dockerfile
new file mode 100644
index 0000000..31472c9
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/Dockerfile
@@ -0,0 +1,3 @@
+FROM wurstmeister/kafka:2.12-2.5.0
+ADD conf/server-ssl.properties $KAFKA_HOME/config/server.properties
+ADD conf/ /usr/local/etc/kafka/
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_cert-file b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_cert-file
new file mode 100644
index 0000000..663cca4
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_cert-file
@@ -0,0 +1,18 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIICzTCCAbUCAQAwWDELMAkGA1UEBhMCTk4xCzAJBgNVBAgTAk5OMQswCQYDVQQH
+EwJOTjELMAkGA1UEChMCTk4xCzAJBgNVBAsTAk5OMRUwEwYDVQQDEwxrYWZrYS1i
+cm9rZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCM90IcrcqdIAjO
+/MzZaE6unuZZz86XFWH4kZjW6yFpaSysWqN4tkWvpAsFgyiZaVsXj8jw9zU0eD57
+Qsbty+ex9gKvFa+Q7JqRiUb06H8/hgiFyypbV7PCFxCzvqTu4NePRFtvE5MAV/yh
+MNJ5dmvp1Aq2s+FR14rVDN768W3q8Fs30u1sMEU7IEy/43OQUcwQ8T03IheyESVQ
+8etOekJyXnM//p4WoITMH7QxN/AGNxSUZu1/rA+nBkACKW2b03SdHLn+3+CFeqfM
+ChY6LnHIYTjx+M9qAXaWSkt19vbcAtm2aiv/YgVvdvuBz9tNerO61+efMKUf/DDG
+wNwu5PuLAgMBAAGgMDAuBgkqhkiG9w0BCQ4xITAfMB0GA1UdDgQWBBQpWkZxaI+9
+pTExKjSCcG+WkRdo1jANBgkqhkiG9w0BAQsFAAOCAQEAhLmPORoF3olr94Le906R
+F1RAwYZPgc1H10zDPQruJq5ykp+zta15cUvhZ0g1ZeHEZyIcazobeC3dk4EBGwKh
+41l1pY/BM0gY/s55Clk7pzmxmOaDi1xz3cNA7EDxd0tgJZjR/SMFlRhAZfe5bEuU
+hg6VcePE6BVhNqyuvGj5XTkjL2l7iV9etICXZn/RGMgqCT6syK9qu2cpIOKQbva1
+xiaeO42x4DhHB74nagOZa04G8Ck/hhPGtVPkYcMNd4v4Tw0s99hKh9fMEZgjr11x
+RiDmiUbyGQ6tZjfFF77wJSyNVibizn0iRGtCXyoYe5lQzwg+cY4IPJEACfDKFSYM
+FA==
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_cert-signed b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_cert-signed
new file mode 100644
index 0000000..3ca47dd
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_cert-signed
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDRzCCAi8CCQCwdBfFbVp5aTANBgkqhkiG9w0BAQUFADBzMQswCQYDVQQGEwJO
+TjELMAkGA1UECAwCTk4xCzAJBgNVBAcMAk5OMQswCQYDVQQKDAJOTjELMAkGA1UE
+CwwCTk4xDjAMBgNVBAMMBU5naGlhMSAwHgYJKoZIhvcNAQkBFhFsZW1pbmhuZ2hp
+YUBOZ2hpYTAeFw0xOTEyMTMxNTAzMzJaFw00NzA0MzAxNTAzMzJaMFgxCzAJBgNV
+BAYTAk5OMQswCQYDVQQIEwJOTjELMAkGA1UEBxMCTk4xCzAJBgNVBAoTAk5OMQsw
+CQYDVQQLEwJOTjEVMBMGA1UEAxMMa2Fma2EtYnJva2VyMIIBIjANBgkqhkiG9w0B
+AQEFAAOCAQ8AMIIBCgKCAQEAjPdCHK3KnSAIzvzM2WhOrp7mWc/OlxVh+JGY1ush
+aWksrFqjeLZFr6QLBYMomWlbF4/I8Pc1NHg+e0LG7cvnsfYCrxWvkOyakYlG9Oh/
+P4YIhcsqW1ezwhcQs76k7uDXj0RbbxOTAFf8oTDSeXZr6dQKtrPhUdeK1Qze+vFt
+6vBbN9LtbDBFOyBMv+NzkFHMEPE9NyIXshElUPHrTnpCcl5zP/6eFqCEzB+0MTfw
+BjcUlGbtf6wPpwZAAiltm9N0nRy5/t/ghXqnzAoWOi5xyGE48fjPagF2lkpLdfb2
+3ALZtmor/2IFb3b7gc/bTXqzutfnnzClH/wwxsDcLuT7iwIDAQABMA0GCSqGSIb3
+DQEBBQUAA4IBAQCPqXvGrIlV4Rt+7X19IkSJGmnz7YtAFGCpdh4aA8l5zoo40TKW
+vPHhYJtFiE34pK8weciOzCXZelfhbIslbmJYohgHU8VX8cnYFRnKkiOksvUVKaMC
+msZSgsWvbPy+E/N9wm42KzOG7FzEz4jOQVlZKyvykFjw16m5biI/cIy7anHYjbqH
+Zbbhz36WYltmz0WQYhm0g2KH2uJCfm6pZpabZieCyuzgOkQycxZaQHNiYIsLh8LU
+djLD4bfBIRrj+HUS/eeB6CE3aM9mXIsRiXXd5ruA6wHUfTdkblAfVnUehgczLy1I
+HuMs3Zf/tt1E2B1Ok664q0NaAbKvLFd8EftS
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_server.keystore.jks b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_server.keystore.jks
new file mode 100644
index 0000000..e728493
Binary files /dev/null and b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_server.keystore.jks differ
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_server.truststore.jks b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_server.truststore.jks
new file mode 100644
index 0000000..2645006
Binary files /dev/null and b/docker/test/integration/resources/kafka_broker/conf/certs/broker_kafka-broker_server.truststore.jks differ
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_cert-file b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_cert-file
new file mode 100644
index 0000000..e1baf62
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_cert-file
@@ -0,0 +1,17 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIICyjCCAbICAQAwVTELMAkGA1UEBhMCTk4xCzAJBgNVBAgTAk5OMQswCQYDVQQH
+EwJOTjELMAkGA1UEChMCTk4xCzAJBgNVBAsTAk5OMRIwEAYDVQQDEwlsb2NhbGhv
+c3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCvT0NaJwg2M3UCIi1s
+xetBkFVKDjHnTd3bsUeOMPG31y0P7MTVxPjYkX7Jgk33IH3xyZStitiMm3of3lfr
+/Me0lbJ3OYwZB+pOBg4Ly7SQXGjXYIF3BI4VDvkYv5iTF4fOsTk5dN5Xsgss3XtY
+lONHCGk+NWbdwk9+/dFArVcXiE/fGm3qe2a4jijRm/75MKWpXSDmlyXj/EkxSPD0
+SN0mmR3irKdUxhvZeBk70DztLp/TQZZVYmSB7u8OJ+FRHji1kFAn9sFypf4ipDt7
+zMW/P9GzI7a18bNbfBXtBXajC1vrhHJapxOfen6cCNaahHJmb12bQDHc+xYKUkR6
+UU4fAgMBAAGgMDAuBgkqhkiG9w0BCQ4xITAfMB0GA1UdDgQWBBTtvxK6nA1N6cJs
+lbsi0j/m6eyMpzANBgkqhkiG9w0BAQsFAAOCAQEAJf506p6hsgH7sBVG7P6DYroH
+EaWB2e+ug2sHYeALvcYrUdT+Fn0p/ZxZHJdXa8l3k9R9qOxvspaG/G4tl6MJQJGa
+3tBtwJ2HveZF4LttWhcpZW1L1zpItSWs7y/wG2iw+AO6uShBIi6ndTP8jTmq6sbE
+UyktvBoAdinpxn3wGODVX1/UgNt70zSkTtdnFFAexVoTY2hQ1FNYiuztul7bYBN+
+DpMYFZ3xXK1Q9lhTkfBQp3zoxrDhOCBluovHzCsiXKzMEx9HXkIAoiTlkr+TuwxG
+agq2OIXJ0s/JVXpH5bX8F05n+Fg1Bk4Yddch645YcmZfGAUn49prup6YdNWEMw==
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_cert-signed b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_cert-signed
new file mode 100644
index 0000000..5fab447
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_cert-signed
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDRDCCAiwCCQCwdBfFbVp5ZTANBgkqhkiG9w0BAQUFADBzMQswCQYDVQQGEwJO
+TjELMAkGA1UECAwCTk4xCzAJBgNVBAcMAk5OMQswCQYDVQQKDAJOTjELMAkGA1UE
+CwwCTk4xDjAMBgNVBAMMBU5naGlhMSAwHgYJKoZIhvcNAQkBFhFsZW1pbmhuZ2hp
+YUBOZ2hpYTAeFw0xOTExMjYxMjMwMTJaFw00NzA0MTMxMjMwMTJaMFUxCzAJBgNV
+BAYTAk5OMQswCQYDVQQIEwJOTjELMAkGA1UEBxMCTk4xCzAJBgNVBAoTAk5OMQsw
+CQYDVQQLEwJOTjESMBAGA1UEAxMJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
+AAOCAQ8AMIIBCgKCAQEAr09DWicINjN1AiItbMXrQZBVSg4x503d27FHjjDxt9ct
+D+zE1cT42JF+yYJN9yB98cmUrYrYjJt6H95X6/zHtJWydzmMGQfqTgYOC8u0kFxo
+12CBdwSOFQ75GL+YkxeHzrE5OXTeV7ILLN17WJTjRwhpPjVm3cJPfv3RQK1XF4hP
+3xpt6ntmuI4o0Zv++TClqV0g5pcl4/xJMUjw9EjdJpkd4qynVMYb2XgZO9A87S6f
+00GWVWJkge7vDifhUR44tZBQJ/bBcqX+IqQ7e8zFvz/RsyO2tfGzW3wV7QV2owtb
+64RyWqcTn3p+nAjWmoRyZm9dm0Ax3PsWClJEelFOHwIDAQABMA0GCSqGSIb3DQEB
+BQUAA4IBAQBP+Xf8gEC+jtDwRCnv8PutvJbeHM0x7usT7itTQbmZuSprHF55zk6b
+EDxpYc9ey2mssenRS7RCgm19+nAM/D3M47pD0QxTVkHgW0Wv0vZotjXp1jcENNW7
+GwyenDgrtrZqFnl0+b3xYtj1jcC4W/pCeQLXrag6LMHK7C5cX1f6vCONxipPwNRy
+HVaWFR6hULXTHYZ1zuGVbL4QQCUURUrytIRLL8Hns4Glya7L1Hxkl1oer/J7cm7w
+F+Q4fhPgmQtC8zLh4XZVQ4fclFk4EqHLdioh4fi5TRoWbcrHl144H1dTGEOKjHfo
+k/5yivmWTwNOAZryhKn+4M2BJOFHF8xz
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_server.keystore.jks b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_server.keystore.jks
new file mode 100644
index 0000000..dc15f5d
Binary files /dev/null and b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_server.keystore.jks differ
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_server.truststore.jks b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_server.truststore.jks
new file mode 100644
index 0000000..b54a9e9
Binary files /dev/null and b/docker/test/integration/resources/kafka_broker/conf/certs/broker_localhost_server.truststore.jks differ
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert b/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert
new file mode 100644
index 0000000..43a9f42
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDYjCCAkoCCQDS47Sky1hfVzANBgkqhkiG9w0BAQsFADBzMQswCQYDVQQGEwJO
+TjELMAkGA1UECAwCTk4xCzAJBgNVBAcMAk5OMQswCQYDVQQKDAJOTjELMAkGA1UE
+CwwCTk4xDjAMBgNVBAMMBU5naGlhMSAwHgYJKoZIhvcNAQkBFhFsZW1pbmhuZ2hp
+YUBOZ2hpYTAeFw0xOTExMTkwOTIwNDhaFw00NzA0MDYwOTIwNDhaMHMxCzAJBgNV
+BAYTAk5OMQswCQYDVQQIDAJOTjELMAkGA1UEBwwCTk4xCzAJBgNVBAoMAk5OMQsw
+CQYDVQQLDAJOTjEOMAwGA1UEAwwFTmdoaWExIDAeBgkqhkiG9w0BCQEWEWxlbWlu
+aG5naGlhQE5naGlhMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuO7f
+uTupDTk/qhCTvjotdvmaWuJoleEZf9Orl3eD+fJd86sxdwfe5OxJMwSBZcmRWoSL
+8IZlzugsvNVadXxdIeHsqTudIZs5sn4MiwPzwfsjVfkhzW7qrIbQlr4yr6oh+iya
+0fK2vaYtfD/TMvxQ/tjSwmRESIZlDG8521AXq4qHNwgYb1S7oLn+RY2qPeamH0l/
+4vJghRWIMtyOG6Ezb1semyRjTr692fAdqoVK5xwi681taxbhtleL/qYqiqHN7v+X
+rOTIZhdNSyGqgCUPPY5CQCtT33G8QOiZxOHzOI4evd4CIiDmqLHe4St7ES3kav1z
+RDRooeZUWjxDJDrOfwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQBYjQM7CWnpn7Wl
+t6yqwxTvqKoPyjPUfp6NgXH73WbC2XIUr+RtI8UooA0RKcb37L4iVuI7xqYbhLbR
+Ad38JeU9DZfufo+PMyFa+AT9qVW5G7rwYSVD9AZeD2ksPsIXrFyQUAp+mBD1vrn7
+N0iT7MbKBG1CvC3QrPvf5Kv2+DUTdb0GDCxzqM30f0BgckA78irTjYdo3c9gpP7R
+XU2erda5Ec5cStKLcpWk9Rntbqx72btxW3j+xRGDmh+4lcTYALeio0g7FlXot+qs
+fk6/ohKP7BCYwXuFeNeDk7CBfzN9M2lr/zZa/POEUUc1uC5XoEDEf9e+Ff+dCmiA
++wJWpjTz
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert.key b/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert.key
new file mode 100644
index 0000000..35f2070
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert.key
@@ -0,0 +1,30 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQIwWGIq3k/AtsCAggA
+MB0GCWCGSAFlAwQBKgQQ5GwoeX1PJFk4Tzdd9LeqvgSCBNDZtUnoDzxV5njiHdW8
+Mn5h2V58jcDLvWzkrhsfjqevWWbdMtnvK9IzY3wGndxk00hSuGxz9awC0GeE1OLC
+I05e3PjFgpLdYx/6GltDijnoozVNo+pXgjMyEnGl5DsKsJcxOF3Bf9EvC0lwRB+y
+4CnHKWZl0bPddpRR/IDFbSie9+7X2EnSRm5KaB5q1uTXGTI8KZTYuzxID4CxxTff
+s2GvTiqZ7PSVLXjyMTSuLLrfQxuPFNfRWJG1A3LSkxzAoRfv2wbfbqPAmIPHIQCL
+yKxUK/H5RqiJkWhM/wQFXnKieBftjtcS4yQ+JOW4LDwsHI05sYryhl5X4zVwhe7b
+SlotRCQzgOVCO6SRW1WvlJA+oz426kpOSnH3LNUaOxYxPwL3NYGZTUsR9+Bn1qRc
+XAoSdceuxPkmweNSWI0vDLR8neP548IoXsPD5pDM/O8VbjhBO67huYc2JfwfixZ7
+BlZGsoCPu7W0pfn1lXumgcBDotUPdumpiQNo4pMmcvyFU2kKzMaKg27Xf/FqZkQT
+uOBk760iamUEM7PrFNAOu6fxJ9MUGNDx5FgsgzgC2VKOyshysRopQgeWNoNmkY8j
+6ItkJdFGnbc9r1b93mpDT6yuawKOcc8KCnNUAVfoa9ZSEc6hu8W9j0rRfsfJDqH3
+OxczxlCYs4gAhOOVMsNrUBeEmT8Rq88kCSo1EvdyNSRPXwJ/6YdQPNEHEFFlMU/G
+1BPRZwDfe/aKNb1Y+OTTT8ix9L1fbGxShgcMG3zrqZTqJ1pGkN9iZqDXEHY1HTwq
+oqX0QfpgSD5p+Ja7d1KmhDYeM5uXAHS0O/lC3AQyH4PTk4DeTRM9JVpi6WuIxBcP
+T16ZTYzAkdmeqtbDYiRIUZxWLS4hs/UXKlMIm03nNiURMPul/9azJ2bahN34++NY
+DSBWy3X9nCRmlNJb8vdZl1W5MzOg8XlHzJYmFZ4Qip4C8Ec6fZ+yGjqrPRzbz1NV
+nDoOpu/0DQAlka68pZQyUBYbBZf8MmIoYvBmzvQVP50elM5TB/BlX6s92K+mDLfI
+l4JdtVMoCaszvuCHcqph869WbRMdmq39AgacteqH+xMOe9F2DblTpKCoJva6rk0V
+yxisBbG0ycI3KNcpYPiZZ5mzu6bzzuQIH72/LAGizS52A+evW3QK+OKx5C6t727r
+BQ1314VxkvJ4JRgKdIUiGzZWprhC6X59YyFYG1PUpaAq4mFQqonB81pTpjbTjjW6
+87D+pKEmY0GcTxjdqyuqL5tns6H4MjqByEgSGhfxaMWausKYbIANhrfB4Lk9YLXm
+icGESQz+spXKGeCRUbsFkYA5CLQpdXQoFKCJyzHLvjXZAFrE+Y0dfTdnxqO93+cr
+VvpxMjE5XYsLduBwLr47wXT5XZMbKkoVJiQPqo3dl3OHFJVyZRyI3Wc+6Xq20B+F
+HDb1cXDXHXof1wGswP5HPSJlUhPkHRGxtuk3r6muf44jLNH8cvfAG9bq+dAUdMJK
+rjcwrFmZQnvO14jSIY5hh5khugT+3/LPJWHpXpjD6uj36ddxKllZfEA3SX2uXdRy
+HtF+XdC7RhXfFOiNpLVK7Dj8w5lNXBpMK3107u6pnsjbNE/Z3IggW6ESSgjN8VGq
+ZVKnlFJAdMoNTsjyrDuO9nZDPw==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert.srl b/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert.srl
new file mode 100644
index 0000000..3349fd9
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/ca-cert.srl
@@ -0,0 +1 @@
+B07417C56D5A7969
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_cert-file b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_cert-file
new file mode 100644
index 0000000..ba10df1
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_cert-file
@@ -0,0 +1,17 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIICxDCCAawCAQAwTzELMAkGA1UEBhMCTk4xCzAJBgNVBAgTAk5OMQswCQYDVQQH
+EwJOTjELMAkGA1UEChMCTk4xCzAJBgNVBAsTAk5OMQwwCgYDVQQDEwNMTU4wggEi
+MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCWgW4KeS/27ZZNBitvuid3lGoA
+3Solx4xdykmqe1hQGFkAt6hEdvEhFqfyMpC/uZeeZNaJNVFeT8kE6Hr6jyEqMUru
+7OA6mOwRAwB22W78YPhkWvYka1yhuyIB2xGePsbjTPNZHBIuDGJGjfzrzfmzveeX
+NOOwUTOyykgDWksJn05H6MmROI7qUxijg//gNfIuhYI+gaNuz1qgCka72/QTYLZx
+MxgjgNeS2bL/RGxmBBYaHBqPQ3MLyWG1MpLhG1kOBqbwmZPvceBZFHizeWe0vVbV
+/p4ztlUNd3txC4OnFp907AqwF2USqBNLkepdILlqZxJcMfseikOpR9+Q90xnAgMB
+AAGgMDAuBgkqhkiG9w0BCQ4xITAfMB0GA1UdDgQWBBT5IGAkhj6drjx2a1a1g+XZ
+snpX2zANBgkqhkiG9w0BAQsFAAOCAQEAKdudV5dk5IA62yZdUNhS8koI1qSncclw
+2Mh3zfRiT/bDnBGmomkGvNoyx+zFgI3JbA0SihWOy3RfNZRIX5YdRI+samWU0iu0
+Y4WImSVgJelhn75V6cZfAhIsfNpLSmiX3Katd9JON58FWEm9brdmV9TtLVqxhf4h
+26B+6lwVOpdJumpw+i5+3ODF9knClQEkg6t/AeijiXYZkYpqoHTS4ICPaPdn3uUw
+snvP0eQGvUfORugOoeXF9yIYbtMmDKPGwvg6OdffRWPp8l+SqLD3abUrbTIgqMX9
+fr4hixcmwIwuovSvVgX9hQBVc8vTuvNIpAmsnw9O02g89/0N17AheQ==
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_cert-signed b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_cert-signed
new file mode 100644
index 0000000..1fad2c6
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_cert-signed
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDPjCCAiYCCQCwdBfFbVp5YzANBgkqhkiG9w0BAQUFADBzMQswCQYDVQQGEwJO
+TjELMAkGA1UECAwCTk4xCzAJBgNVBAcMAk5OMQswCQYDVQQKDAJOTjELMAkGA1UE
+CwwCTk4xDjAMBgNVBAMMBU5naGlhMSAwHgYJKoZIhvcNAQkBFhFsZW1pbmhuZ2hp
+YUBOZ2hpYTAeFw0xOTExMjAxMzExMTNaFw00NzA0MDcxMzExMTNaME8xCzAJBgNV
+BAYTAk5OMQswCQYDVQQIEwJOTjELMAkGA1UEBxMCTk4xCzAJBgNVBAoTAk5OMQsw
+CQYDVQQLEwJOTjEMMAoGA1UEAxMDTE1OMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
+MIIBCgKCAQEAloFuCnkv9u2WTQYrb7ond5RqAN0qJceMXcpJqntYUBhZALeoRHbx
+IRan8jKQv7mXnmTWiTVRXk/JBOh6+o8hKjFK7uzgOpjsEQMAdtlu/GD4ZFr2JGtc
+obsiAdsRnj7G40zzWRwSLgxiRo386835s73nlzTjsFEzsspIA1pLCZ9OR+jJkTiO
+6lMYo4P/4DXyLoWCPoGjbs9aoApGu9v0E2C2cTMYI4DXktmy/0RsZgQWGhwaj0Nz
+C8lhtTKS4RtZDgam8JmT73HgWRR4s3lntL1W1f6eM7ZVDXd7cQuDpxafdOwKsBdl
+EqgTS5HqXSC5amcSXDH7HopDqUffkPdMZwIDAQABMA0GCSqGSIb3DQEBBQUAA4IB
+AQAyHq8xcPyfW+WACdWLjCHzJf9D92JwTNdQotMJMqxHw86OoSGvTnc7KeoeWdbt
+xlDaBWvLsV2M6DAjAvYgYHk0KG+EFU1xVTAe8+lOWWR+8a9CMDKq6IuwRywHnnEJ
+n9+YTI9cuJcGhYj9HFqXM4imR6rM1++tHjrLkiWqvtP9KWEK431xj27lhhcuOd2J
+RAn3xvapn55A2awAzDYzpq8VvG02JGetevDEwAQYfyZ+9i97+5GobGjfhVbt32TE
+R/yrVOzYACYhaZeUz5l0E9cChj+XDQxFIAtNYJpIzgxVCCr0ZMIyM66CD8Ii8h2B
+BRN6vYHUHccMh+B+7B7iZD4m
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.key b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.key
new file mode 100644
index 0000000..ed0b490
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.key
@@ -0,0 +1,30 @@
+-----BEGIN RSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,F0F92F59DA33CB2A
+
+Ytg3vlVSrBnqhz/x5HymCPeEpPY3fXQ56aw6wDjE71SKOoDLJWWbt3WVIEr9VOBQ
+1AcPBzJCJZJ2+qWV45/QoYnOWf+RO3WvVwcKBlCcCksSb1oCVZ+7fMlkEGFuppWi
+/EWBft71Uv0bCP3B6VAknOaKT5IYWW3aW6l5varYKaFL3+fK4U9ohK5ZbvEHQ7eg
+IfEt+jSlRwSxsfRLA45fLjlTSZqErg01rbiLdKZfOwOSvKXAYxTWTneWO57uQYS+
+VPwrkmKW4cn3IY73MskSpYpsqP7Hl7DhpMQCE0MeRw+x0rgeN7El9HtpXquqsOj1
+zYaju5WfmkkzY7VVZYsgJJABzCh179tTLBI/UxlOZuHKhYJPGndy3txkTvetD4go
+x0XJhujxZ0dAgysUwgCFKR+452WJafm8c5iTrwPIzQ4Zm3/KzTk65++06dq9EvY/
+4XydnElYw2qBuVYT6L293N+TfJ448k3jukkOznJyKHXf1VovPestpeGy5XYv9wWC
+dpzurMB8elpjVJ9aH8N7OV1+uOgMpnVMK6GSgFpxGM1sRWiC/svjmpXmQpkgw9/H
+RN1gJA6zkflutjUmnzVHUY3TOErrwTBeMMRP0hiSC3X1NdyL+8hB6cZumD/B2KQs
+tJTlV2Eld41iDcwvzqkLS6jOH3AR6pA3HFg1Gse3IqwBqfiEP6cm/VE4OF5YKuSS
+gxakGN8zibx+Hwtk7st7CoB8UxxVnsWZY90gY0kuXCXVIbqGmJAQMGwcGqm7q71x
+gOdifWd+j7LBtNbOizd4M9jfK+OHzPIr0RPAFN75jclh+IYNMQK30mGWZtnb4Elj
+kaj2t8AhTodwSxQU+cI8yML72ewIamtcl/4FiB4jg6KkEqOY8Asxiat4SD+tXKzk
+8GXJoa/qZ2Br4+zNOqmT8T7CfrP08l+WtCcRW1W2vhbhYzksU38eVWKoLpfytRgC
+t8Pthwcb5uS/rN/wzqIZ67uQFb/FMRe487MgMhJ0bHxfV1UslLKUr/q1CBUoi+Tw
+V9zWLGX4fy9SmmT9S1a1INMIphw9YZS1oiQk544tROF9WbxL6d+iMdO3lxzNdpGY
+8Dzj2rr7wLYSU2LM87JJhNvz5diQNvrikYadSXVOMPQ8AWSCZb8YrRnRkLMar5TB
+gH1+2cR2oN1nhbXeUPpybdeQwththikRD2mZ3kAZ7vbMcnqJt9uM32TmI2UIapiR
+Vzt+TGEoOiLGrqMBHe1zUNtZKPCi1e72zd9kTInLgZoJPoJ37MU1mI4lW5qHOD7l
+J6dP+gYjSdXnxpr1ZbrJzqxDdMsvhh6cYUaXjfgfI7FrImfsdOMBOlH+5n15XMXr
+K9pqiS8UTz/a6UyX5z/LMiKIXKBkPYkhdE9FezSkZ5ieFr2smu2CgYDx8V7gmp42
+o0wT5DmAuj4gY2JEQnol6TwHQhUC99MZOC8e6F7dYTda2AfQpiEb66GCPSoKhzVn
+xLTwgYxfCgEro281kgOan80w0OqF1lMLvRRSMfvumM3wAyP8en5pu9X1giCNgjQV
+2X5sGkiLT0Ckw1UHr0932+5/3uCJCldEubAfsGYCN17pQQFiZzI7hA==
+-----END RSA PRIVATE KEY-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.keystore.jks b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.keystore.jks
new file mode 100644
index 0000000..f39edf4
Binary files /dev/null and b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.keystore.jks differ
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.pem b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.pem
new file mode 100644
index 0000000..92c6ae5
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.pem
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDPjCCAiYCCQCwdBfFbVp5YjANBgkqhkiG9w0BAQUFADBzMQswCQYDVQQGEwJO
+TjELMAkGA1UECAwCTk4xCzAJBgNVBAcMAk5OMQswCQYDVQQKDAJOTjELMAkGA1UE
+CwwCTk4xDjAMBgNVBAMMBU5naGlhMSAwHgYJKoZIhvcNAQkBFhFsZW1pbmhuZ2hp
+YUBOZ2hpYTAeFw0xOTExMjAxMjU0MzNaFw00NzA0MDcxMjU0MzNaME8xCzAJBgNV
+BAYTAk5OMQswCQYDVQQIDAJOTjELMAkGA1UEBwwCTk4xCzAJBgNVBAoMAk5OMQsw
+CQYDVQQLDAJOTjEMMAoGA1UEAwwDTE1OMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
+MIIBCgKCAQEA/EwAPj3UOb9gQDR+9/teK0+C27j0pfrd0G14kB24k+lQRl2cykjJ
+mbTkfWT8P9/8re0+EfM7IXNInqrTjwHFjZZihODqGz/5f9dLJyncsKBFjUYkz9so
+taL8xO5JT017DqS4KGilwF5aJuLzWCsJn4TQnqyeynG27toKqw8QQmQDRe/fN7Qj
+gSrpKIwHq/aD8BuJYKFCdcr1Jl+0GoT5Gobpm0Z5n6yp2MR+ElW7qUf3BvxRZ8uY
+JFWPaokYWDgXDjYWAT+xA8ZD5YBDOfF/DrsYxdEEotJUyc6A/S3o+liw7hwWffqh
+dx92RpJzU8IGCjh8SFk7vp9JRSWRsOIhgQIDAQABMA0GCSqGSIb3DQEBBQUAA4IB
+AQB4I/7F6Wef3fL+a4gflayGJ1MZulXhWbxRqsKak2KPEQP+t3I5LOqmwl+d5dyz
+KoyHvlc4npIhuGTo0gY1M0iSAkID/+qJ9z7hSIwjCOzZFpZqqdKfZQneghQpo2dm
+3u1j6kG4xmDPbUcppEO10bqZRKJQghH6vcessws8Iq9E2Xwyb8cSUuSEOn2KX29A
+LDhsPHMufLu9SZTXLR09GbsdMDnotwY00UjsALbTiEoa+Zu090frRKT558s7TDGN
+5HW5Jap/sSI5J+PKgA0PWZmQgvZ3gwO3POvTA90NmfHLtW7SYM0WDpbJOC08Jm9+
+/bSUo6rjwrZqC7W3Uv83mryf
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.req b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.req
new file mode 100644
index 0000000..5ba3b91
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.req
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE REQUEST-----
+MIICrTCCAZUCAQAwTzELMAkGA1UEBhMCTk4xCzAJBgNVBAgMAk5OMQswCQYDVQQH
+DAJOTjELMAkGA1UECgwCTk4xCzAJBgNVBAsMAk5OMQwwCgYDVQQDDANMTU4wggEi
+MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQD8TAA+PdQ5v2BANH73+14rT4Lb
+uPSl+t3QbXiQHbiT6VBGXZzKSMmZtOR9ZPw/3/yt7T4R8zshc0ieqtOPAcWNlmKE
+4OobP/l/10snKdywoEWNRiTP2yi1ovzE7klPTXsOpLgoaKXAXlom4vNYKwmfhNCe
+rJ7Kcbbu2gqrDxBCZANF7983tCOBKukojAer9oPwG4lgoUJ1yvUmX7QahPkahumb
+RnmfrKnYxH4SVbupR/cG/FFny5gkVY9qiRhYOBcONhYBP7EDxkPlgEM58X8OuxjF
+0QSi0lTJzoD9Lej6WLDuHBZ9+qF3H3ZGknNTwgYKOHxIWTu+n0lFJZGw4iGBAgMB
+AAGgGTAXBgkqhkiG9w0BCQcxCgwIYWJjZGVmZ2gwDQYJKoZIhvcNAQELBQADggEB
+AAs5FzufAfDl5FcxJjcjcBSmB+yQzmHyWPJ6T8XNwjQ+PwTV1o8iM42TjRX2cjZw
+CQeRJh2FNEuxFDbzNQ1AZ1omsbaDewEvwthqEj8SMXGpC+VHxJXTjtGB5nrUlsm/
+tosLrKE4Xn8HTgiHtOLaziAdCsyK0kVbvmCYfB/gEZa7R48KST7DMVAX9mkmY4CB
+5E2Jn14tQlrgHzw/XH1fjRlJul+I6XLOeoY5LlKkA7dF0jgwbTZIlXAXjQIgPzz5
+o4rPawsozcHaJRisZeIRaJg6tCKMx2pNxldyQ5qct5AjGmy7D12xHY3upuzUPqwa
+qkvc2sqtEQaWWQrefTbAZuA=
+-----END CERTIFICATE REQUEST-----
diff --git a/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.truststore.jks b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.truststore.jks
new file mode 100644
index 0000000..bd71ba0
Binary files /dev/null and b/docker/test/integration/resources/kafka_broker/conf/certs/client_LMN_client.truststore.jks differ
diff --git a/docker/test/integration/resources/kafka_broker/conf/client-ssl-java.properties b/docker/test/integration/resources/kafka_broker/conf/client-ssl-java.properties
new file mode 100644
index 0000000..e4940c6
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/client-ssl-java.properties
@@ -0,0 +1,7 @@
+bootstrap.servers=kafka-broker:9093
+security.protocol=SSL
+ssl.truststore.location=/usr/local/etc/kafka/certs/client_LMN_client.truststore.jks
+ssl.truststore.password=abcdefgh
+ssl.keystore.location=/usr/local/etc/kafka/certs/client_LMN_client.keystore.jks
+ssl.keystore.password=abcdefgh
+ssl.key.password=abcdefgh
\ No newline at end of file
diff --git a/docker/test/integration/resources/kafka_broker/conf/client-ssl.properties b/docker/test/integration/resources/kafka_broker/conf/client-ssl.properties
new file mode 100644
index 0000000..f21dd91
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/client-ssl.properties
@@ -0,0 +1,14 @@
+bootstrap.servers=kafka-broker:9093
+security.protocol=SSL
+
+# CA certificate file for verifying the broker's certificate.
+ssl.ca.location=certs/ca-cert
+
+# Client's certificate
+ssl.certificate.location=certs/client_LMN_client.pem
+
+# Client's key
+ssl.key.location=certs/client_LMN_client.key
+
+# Key password, if any.
+ssl.key.password=abcdefgh
\ No newline at end of file
diff --git a/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties b/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
new file mode 100644
index 0000000..2a5e601
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = listener_name://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=6000
+
+
+############################# Group Coordinator Settings #############################
+
+# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
+# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
+# The default value for this is 3 seconds.
+# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
+# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
+group.initial.rebalance.delay.ms=0
+
+security.inter.broker.protocol=SSL
+listeners=PLAINTEXT://:9092,SSL://:9093
+
+# SSL
+ssl.protocol = TLS
+ssl.enabled.protocols=TLSv1.2
+ssl.keystore.type = JKS
+ssl.keystore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.keystore.jks
+# ssl.keystore.location = /usr/local/etc/kafka/certs/broker_localhost_server.keystore.jks
+ssl.keystore.password = abcdefgh
+ssl.key.password = abcdefgh
+ssl.truststore.type = JKS
+ssl.truststore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.truststore.jks
+# ssl.truststore.location = /usr/local/etc/kafka/certs/broker_localhost_server.truststore.jks
+ssl.truststore.password = abcdefgh
+# To require authentication of clients use "require", else "none" or "request"
+ssl.client.auth = required
+
+
+
diff --git a/docker/test/integration/resources/kafka_broker/conf/server.properties b/docker/test/integration/resources/kafka_broker/conf/server.properties
new file mode 100644
index 0000000..20d9095
--- /dev/null
+++ b/docker/test/integration/resources/kafka_broker/conf/server.properties
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from 
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = listener_name://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
+
+# Hostname and port the broker will advertise to producers and consumers. If not set, 
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/usr/local/var/lib/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=6000
+
+
+############################# Group Coordinator Settings #############################
+
+# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
+# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
+# The default value for this is 3 seconds.
+# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
+# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
+group.initial.rebalance.delay.ms=0
\ No newline at end of file
diff --git a/docker/test/integration/test_rdkafka.py b/docker/test/integration/test_rdkafka.py
index 5a5c696..980e66f 100644
--- a/docker/test/integration/test_rdkafka.py
+++ b/docker/test/integration/test_rdkafka.py
@@ -21,14 +21,16 @@ def test_publish_kafka():
     """
     Verify delivery of message to kafka broker
     """
-    producer_flow = GetFile('/tmp/input') >> PublishKafka() >> ('success', LogAttribute())
+    producer_flow = GetFile('/tmp/input') >> PublishKafka() \
+                        >> (('failure', LogAttribute()),
+                            ('success', PutFile('/tmp/output/success')))
 
-    with DockerTestCluster(KafkaValidator('test')) as cluster:
+    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
         cluster.put_test_data('test')
         cluster.deploy_flow(None, engine='kafka-broker')
         cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
 
-        assert cluster.check_output(10)
+        assert cluster.check_output(30)
 
 def test_no_broker():
     """
@@ -42,7 +44,7 @@ def test_no_broker():
         cluster.put_test_data('no broker')
         cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
 
-        assert cluster.check_output(30)
+        assert cluster.check_output(60)
 
 def test_broker_on_off():
     """
@@ -56,21 +58,43 @@ def test_broker_on_off():
         cluster.put_test_data('test')
         cluster.deploy_flow(None, engine='kafka-broker')
         cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+        start_count = 1
+        stop_count = 0
 
         def start_kafka():
+            nonlocal start_count
             assert cluster.start_flow('kafka-broker')
             assert cluster.start_flow('kafka-consumer')
+            start_count += 1
+            assert cluster.wait_for_container_logs('zookeeper', 'Established session', 30, start_count)
         def stop_kafka():
+            nonlocal stop_count
             assert cluster.stop_flow('kafka-consumer')
             assert cluster.stop_flow('kafka-broker')
+            stop_count += 1
+            assert cluster.wait_for_container_logs('zookeeper', 'Processed session termination for sessionid', 30, stop_count)
 
-        assert cluster.check_output(10, dir='/success')
+        assert cluster.check_output(30, subdir='success')
         stop_kafka()
-        assert cluster.check_output(30, dir='/failure')
+        assert cluster.check_output(60, subdir='failure')
         start_kafka()
-        cluster.rm_out_child('/success')
-        assert cluster.check_output(30, dir='/success')
+        cluster.rm_out_child('success')
+        assert cluster.check_output(60, subdir='success')
         stop_kafka()
-        cluster.rm_out_child('/failure')
-        assert cluster.check_output(30, dir='/failure')
+        cluster.rm_out_child('failure')
+        assert cluster.check_output(60, subdir='failure')
+
+def test_ssl():
+    """
+    Verify security connection
+    """
+    producer_flow = GetFile('/tmp/input') >> PublishKafkaSSL() \
+                    >> (('failure', LogAttribute()),
+                        ('success', PutFile('/tmp/output/ssl')))
 
+    with DockerTestCluster(SingleFileOutputValidator('test', subdir='ssl')) as cluster:
+        cluster.put_test_data('test')
+        cluster.deploy_flow(None, engine='kafka-broker')
+        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+
+        assert cluster.check_output(30)
diff --git a/extensions/script/python/PythonScriptEngine.cpp b/extensions/script/python/PythonScriptEngine.cpp
index c6fe7f4..98e1191 100644
--- a/extensions/script/python/PythonScriptEngine.cpp
+++ b/extensions/script/python/PythonScriptEngine.cpp
@@ -42,11 +42,14 @@ PythonScriptEngine::PythonScriptEngine() {
 
 void PythonScriptEngine::eval(const std::string &script) {
   py::gil_scoped_acquire gil { };
-
-  if (script[0] == '\n') {
-    py::eval<py::eval_statements>(py::module::import("textwrap").attr("dedent")(script), *bindings_, *bindings_);
-  } else {
-    py::eval<py::eval_statements>(script, *bindings_, *bindings_);
+  try {
+    if (script[0] == '\n') {
+      py::eval<py::eval_statements>(py::module::import("textwrap").attr("dedent")(script), *bindings_, *bindings_);
+    } else {
+      py::eval<py::eval_statements>(script, *bindings_, *bindings_);
+    }
+  } catch (std::exception& e) {
+     throw minifi::script::ScriptException(e.what());
   }
 }