You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/06/29 10:33:07 UTC

[nifi-minifi-cpp] branch main updated (31fd099 -> 7f7e5c1)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


    from 31fd099  MINIFICPP-1494 Allow InvokeHTTP GET requests without incoming flowfile
     new b638edc  MINIFICPP-1373 - Add integration tests for ConsumeKafka, fix docker test cleanup issues
     new 7f7e5c1  MINIFICPP-1597 Downgrade CentOS base image for CentOS 7 support

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 PROCESSORS.md                                      |   4 +-
 README.md                                          |   4 +-
 docker/DockerVerify.sh                             |  26 +--
 docker/centos/Dockerfile                           |   6 +-
 docker/requirements.txt                            |   2 +
 .../integration/MiNiFi_integration_test_driver.py  | 183 ++++++++++-------
 docker/test/integration/environment.py             |  30 ++-
 docker/test/integration/features/http.feature      |  22 +-
 docker/test/integration/features/https.feature     |   4 +-
 docker/test/integration/features/kafka.feature     | 195 ++++++++++++++++++
 docker/test/integration/features/s3.feature        |   4 +-
 .../integration/minifi/core/DockerTestCluster.py   |  17 +-
 .../minifi/core/DockerTestDirectoryBindings.py     |   4 +-
 .../integration/minifi/core/FileSystemObserver.py  |  17 +-
 .../integration/minifi/core/OutputEventHandler.py  |  22 +-
 docker/test/integration/minifi/core/Processor.py   |   8 +-
 .../minifi/core/SingleNodeDockerCluster.py         |  33 ++-
 .../integration/minifi/processors/ConsumeKafka.py  |  20 ++
 .../test/integration/minifi/processors/GetFile.py  |   1 -
 .../minifi/processors/RouteOnAttribute.py          |  10 +
 .../minifi/validators/EmptyFilesOutPutValidator.py |   2 +-
 .../minifi/validators/FileOutputValidator.py       |  38 ++++
 .../minifi/validators/MultiFileOutputValidator.py  |  18 +-
 .../NoContentCheckFileNumberValidator.py           |  22 ++
 .../minifi/validators/NoFileOutPutValidator.py     |  19 +-
 .../minifi/validators/NumFileRangeValidator.py     |  21 ++
 .../minifi/validators/SingleFileOutputValidator.py |  31 +--
 .../validators/SingleOrMultiFileOutputValidator.py |  22 ++
 docker/test/integration/steps/steps.py             | 221 ++++++++++++++++++++-
 extensions/librdkafka/ConsumeKafka.cpp             |  30 +--
 30 files changed, 796 insertions(+), 240 deletions(-)
 create mode 100644 docker/test/integration/minifi/processors/ConsumeKafka.py
 create mode 100644 docker/test/integration/minifi/processors/RouteOnAttribute.py
 create mode 100644 docker/test/integration/minifi/validators/NoContentCheckFileNumberValidator.py
 create mode 100644 docker/test/integration/minifi/validators/NumFileRangeValidator.py
 create mode 100644 docker/test/integration/minifi/validators/SingleOrMultiFileOutputValidator.py

[nifi-minifi-cpp] 02/02: MINIFICPP-1597 Downgrade CentOS base image for CentOS 7 support

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7f7e5c15e6d5a2e82b6d4ba9e34df941d4100e27
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Jun 29 12:26:26 2021 +0200

    MINIFICPP-1597 Downgrade CentOS base image for CentOS 7 support
    
    Closes #1117
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 README.md                | 4 ++--
 docker/centos/Dockerfile | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index 5fef5f1..39c5261 100644
--- a/README.md
+++ b/README.md
@@ -490,13 +490,13 @@ $ make docker-verify
 ```
 
 ### Building For Other Distros
-If you have docker installed on your machine you can build for CentOS 8, Fedora 34, Ubuntu 18.04, Ubuntu 20.04, and Debian 10 via our make docker commands. The following table
+If you have docker installed on your machine you can build for CentOS 7, Fedora 34, Ubuntu 18.04, Ubuntu 20.04, and Debian 10 via our make docker commands. The following table
 provides the command to build your distro and the output file in your build directory. Since the versions are limited ( except for Ubuntu ) we output the archive based on the distro's name.
 
 
 | Distro         | command           | Output File  |
 | ------------- |:-------------| :-----|
-| CentOS 8  | make centos | nifi-minifi-cpp-centos-$VERSION-bin.tar.gz
+| CentOS 7  | make centos | nifi-minifi-cpp-centos-$VERSION-bin.tar.gz
 | Debian 10 (buster)  | make debian | nifi-minifi-cpp-debian-$VERSION-bin.tar.gz
 | Fedora 34  | make fedora | nifi-minifi-cpp-fedora-$VERSION-bin.tar.gz
 | Ubuntu 18.04 (bionic)  | make u18 | nifi-minifi-cpp-bionic-$VERSION-bin.tar.gz
diff --git a/docker/centos/Dockerfile b/docker/centos/Dockerfile
index b6284c1..adaba9d 100644
--- a/docker/centos/Dockerfile
+++ b/docker/centos/Dockerfile
@@ -17,7 +17,7 @@
 #
 
 # First stage: the build environment
-FROM centos:8 AS build_deps
+FROM centos:7 AS build_deps
 LABEL maintainer="Apache NiFi <de...@nifi.apache.org>"
 
 ARG MINIFI_VERSION
@@ -27,7 +27,7 @@ ARG MINIFI_VERSION
 ENV MINIFI_BASE_DIR /opt/minifi
 ENV MINIFI_HOME $MINIFI_BASE_DIR/nifi-minifi-cpp-$MINIFI_VERSION
 
-RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel python36-devel gcc gcc-c++ sudo git which maven make cmake libarchive
+RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel python36-devel gcc gcc-c++ sudo git which maven make libarchive
 
 RUN mkdir -p $MINIFI_BASE_DIR
 COPY . ${MINIFI_BASE_DIR}
@@ -40,6 +40,6 @@ RUN cd $MINIFI_BASE_DIR \
 	&& rm -rf build \
 	&& mkdir build \
 	&& cd build \
-	&& cmake -DUSE_SHARED_LIBS=  -DENABLE_MQTT=ON -DENABLE_LIBRDKAFKA=ON -DPORTABLE=ON -DENABLE_COAP=ON -DCMAKE_BUILD_TYPE=Release -DSKIP_TESTS=true -DENABLE_JNI=$ENABLE_JNI .. \
+	&& cmake3 -DUSE_SHARED_LIBS=  -DENABLE_MQTT=ON -DENABLE_LIBRDKAFKA=ON -DPORTABLE=ON -DENABLE_COAP=ON -DCMAKE_BUILD_TYPE=Release -DSKIP_TESTS=true -DENABLE_JNI=$ENABLE_JNI .. \
 	&& make -j$(nproc) package
 

[nifi-minifi-cpp] 01/02: MINIFICPP-1373 - Add integration tests for ConsumeKafka, fix docker test cleanup issues

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit b638edc1318227c3066967e3dd6a89748b173b7a
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Jun 29 12:21:51 2021 +0200

    MINIFICPP-1373 - Add integration tests for ConsumeKafka, fix docker test cleanup issues
    
    - Add missing test for Offset Reset property
    - Fix naming and description of validators
    - Remove unused behave definition
    - Handle possible curl timeout in tests
    - Remove individual test running, skip failing tests
    - Remove unused subdir parameter in tests
    - Fix flake8 issues
    - Fix multiple issues with tests and remove manual offset reset
        - Remove sleep for kafka which required removing the manual offset
          reset, which could poll some messages from the kafka broker before the
          first ontrigger call and missing the first messages due to this.
        - Fix some validators to shorten the test runtimes
        - Add debug messages to minifi logs
        - Remove unnecessary kafka consumer container
        - Change GetFile processor in python tests to not keep source files and
          only be triggered once, therefore assertions can be more precise
    - Fix assertions on multiple flow files generated
    - Remove extra wait in ConsumeKafka integ. tests, improve readibility
    
    Closes #1076
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
    
    Co-authored-by: Adam Hunyadi <hu...@gmail.com>
---
 PROCESSORS.md                                      |   4 +-
 docker/DockerVerify.sh                             |  26 +--
 docker/requirements.txt                            |   2 +
 .../integration/MiNiFi_integration_test_driver.py  | 183 ++++++++++-------
 docker/test/integration/environment.py             |  30 ++-
 docker/test/integration/features/http.feature      |  22 +-
 docker/test/integration/features/https.feature     |   4 +-
 docker/test/integration/features/kafka.feature     | 195 ++++++++++++++++++
 docker/test/integration/features/s3.feature        |   4 +-
 .../integration/minifi/core/DockerTestCluster.py   |  17 +-
 .../minifi/core/DockerTestDirectoryBindings.py     |   4 +-
 .../integration/minifi/core/FileSystemObserver.py  |  17 +-
 .../integration/minifi/core/OutputEventHandler.py  |  22 +-
 docker/test/integration/minifi/core/Processor.py   |   8 +-
 .../minifi/core/SingleNodeDockerCluster.py         |  33 ++-
 .../integration/minifi/processors/ConsumeKafka.py  |  20 ++
 .../test/integration/minifi/processors/GetFile.py  |   1 -
 .../minifi/processors/RouteOnAttribute.py          |  10 +
 .../minifi/validators/EmptyFilesOutPutValidator.py |   2 +-
 .../minifi/validators/FileOutputValidator.py       |  38 ++++
 .../minifi/validators/MultiFileOutputValidator.py  |  18 +-
 .../NoContentCheckFileNumberValidator.py           |  22 ++
 .../minifi/validators/NoFileOutPutValidator.py     |  19 +-
 .../minifi/validators/NumFileRangeValidator.py     |  21 ++
 .../minifi/validators/SingleFileOutputValidator.py |  31 +--
 .../validators/SingleOrMultiFileOutputValidator.py |  22 ++
 docker/test/integration/steps/steps.py             | 221 ++++++++++++++++++++-
 extensions/librdkafka/ConsumeKafka.cpp             |  30 +--
 28 files changed, 791 insertions(+), 235 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 4fe1f06..e2f7b30 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -234,13 +234,13 @@ In the list below, the names of required properties appear in bold. Any other pr
 |**Offset Reset**|latest|earliest<br>latest<br>none<br>|Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.|
 |**Security Protocol**|PLAINTEXT|PLAINTEXT<br>|This property is currently not supported. Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
 |Session Timeout|60 seconds||Client group session and failure detection timeout. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms.|
-|**Topic Name Format**|Names|Names<br>Patterns<br>|Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression.|
+|**Topic Name Format**|Names|Names<br>Patterns<br>|Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression. Using regular expressions does not automatically discover Kafka topics created after the processor started.|
 |**Topic Names**|||The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.<br/>**Supports Expression Language: true**|
 ### Properties
 
 | Name | Description |
 | - | - |
-|success|Incoming kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.|
+|success|Incoming Kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.|
 
 ## ConsumeMQTT
 
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index cbe2b82..e1e70d8 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -66,28 +66,12 @@ export PATH
 PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
 export PYTHONPATH
 
+# Add --no-logcapture to see logs interleaved with the test output
 BEHAVE_OPTS=(-f pretty --logging-level INFO --logging-clear-handlers)
 
+# Specify feature or scenario to run a specific test e.g.:
+# behave "${BEHAVE_OPTS[@]}" "features/file_system_operations.feature"
+# behave "${BEHAVE_OPTS[@]}" "features/file_system_operations.feature" -n "Get and put operations run in a simple flow"
 cd "${docker_dir}/test/integration"
 exec
-  behave "${BEHAVE_OPTS[@]}" "features/file_system_operations.feature" -n "Get and put operations run in a simple flow" &&
-  behave "${BEHAVE_OPTS[@]}" "features/file_system_operations.feature" -n "PutFile does not overwrite a file that already exists" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s2s.feature" -n "A MiNiFi instance produces and transfers data to a NiFi instance via s2s" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s2s.feature" -n "Zero length files are transfered between via s2s if the \"drop empty\" connection property is false" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s2s.feature" -n "Zero length files are not transfered between via s2s if the \"drop empty\" connection property is true" &&
-  behave "${BEHAVE_OPTS[@]}" "features/http.feature" -n "A MiNiFi instance transfers data to another MiNiFi instance" &&
-  behave "${BEHAVE_OPTS[@]}" "features/http.feature" -n "A MiNiFi instance sends data through a HTTP proxy and another one listens" &&
-  behave "${BEHAVE_OPTS[@]}" "features/http.feature" -n "A MiNiFi instance and transfers hashed data to another MiNiFi instance" &&
-  behave "${BEHAVE_OPTS[@]}" "features/http.feature" -n "A MiNiFi instance transfers data to another MiNiFi instance without message body" &&
-  behave "${BEHAVE_OPTS[@]}" "features/kafka.feature" -n "A MiNiFi instance transfers data to a kafka broker" &&
-  behave "${BEHAVE_OPTS[@]}" "features/kafka.feature" -n "PublishKafka sends flowfiles to failure when the broker is not available" &&
-  behave "${BEHAVE_OPTS[@]}" "features/kafka.feature" -n "PublishKafka sends can use SSL connect" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "A MiNiFi instance transfers encoded data to s3" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "A MiNiFi instance transfers encoded data through a http proxy to s3" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "A MiNiFi instance can remove s3 bucket objects" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "Deletion of a s3 object through a proxy-server succeeds" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "A MiNiFi instance can download s3 bucket objects directly" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "A MiNiFi instance can download s3 bucket objects via a http-proxy" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "A MiNiFi instance can list an S3 bucket directly" &&
-  behave "${BEHAVE_OPTS[@]}" "features/s3.feature" -n "A MiNiFi instance can list an S3 bucket objects via a http-proxy" &&
-  behave "${BEHAVE_OPTS[@]}" "features/azure_storage.feature" -n "A MiNiFi instance can upload data to Azure blob storage"
+  behave "${BEHAVE_OPTS[@]}"
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 4abac40..f6a1785 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -1,6 +1,8 @@
 behave==1.2.6
 pytimeparse==1.1.8
 docker==5.0.0
+kafka-python==2.0.2
+confluent-kafka==1.7.0
 PyYAML==5.4.1
 m2crypto==0.37.1
 watchdog==2.1.2
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index d347e61..43178d6 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -1,5 +1,6 @@
 import docker
 import logging
+import threading
 import time
 import uuid
 
@@ -15,6 +16,9 @@ from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidato
 from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
 from minifi.validators.SingleFileOutputValidator import SingleFileOutputValidator
 from minifi.validators.MultiFileOutputValidator import MultiFileOutputValidator
+from minifi.validators.SingleOrMultiFileOutputValidator import SingleOrMultiFileOutputValidator
+from minifi.validators.NoContentCheckFileNumberValidator import NoContentCheckFileNumberValidator
+from minifi.validators.NumFileRangeValidator import NumFileRangeValidator
 
 
 class MiNiFi_integration_test():
@@ -28,33 +32,41 @@ class MiNiFi_integration_test():
         self.file_system_observer = None
 
         self.docker_network = None
+        self.cleanup_lock = threading.Lock()
 
         self.docker_directory_bindings = DockerTestDirectoryBindings()
         self.docker_directory_bindings.create_new_data_directories(self.test_id)
 
     def __del__(self):
-        logging.info("MiNiFi_integration_test cleanup")
-
-        # Clean up network, for some reason only this order of events work for cleanup
-        if self.docker_network is not None:
-            logging.info('Cleaning up network network: %s', self.docker_network.name)
-            while len(self.docker_network.containers) != 0:
-                for container in self.docker_network.containers:
-                    self.docker_network.disconnect(container, force=True)
-                self.docker_network.reload()
-            self.docker_network.remove()
-
-        container_ids = []
-        for cluster in self.clusters.values():
-            for container in cluster.containers.values():
-                container_ids.append(container.id)
-            del cluster
-
-        # The cluster deleter is not reliable for cleaning up
-        for container_id in container_ids:
-            self.delete_docker_container_by_id(container_id)
-
-        del self.docker_directory_bindings
+        self.cleanup()
+
+    def cleanup(self):
+        with self.cleanup_lock:
+            logging.info("MiNiFi_integration_test cleanup")
+            # Clean up network, for some reason only this order of events work for cleanup
+            if self.docker_network is not None:
+                logging.info('Cleaning up network network: %s', self.docker_network.name)
+                while len(self.docker_network.containers) != 0:
+                    for container in self.docker_network.containers:
+                        self.docker_network.disconnect(container, force=True)
+                    self.docker_network.reload()
+                self.docker_network.remove()
+                self.docker_network = None
+
+            container_ids = []
+            for cluster in self.clusters.values():
+                for container in cluster.containers.values():
+                    container_ids.append(container.id)
+                del cluster
+
+            # The cluster deleter is not reliable for cleaning up
+            logging.info("%d containers left for integration tests.", len(container_ids))
+            for container_id in container_ids:
+                self.delete_docker_container_by_id(container_id)
+
+            if self.docker_directory_bindings is not None:
+                del self.docker_directory_bindings
+                self.docker_directory_bindings = None
 
     def delete_docker_container_by_id(self, container_id):
         docker_client = docker.from_env()
@@ -85,36 +97,52 @@ class MiNiFi_integration_test():
         return self.clusters.setdefault(name, DockerTestCluster())
 
     def set_up_cluster_network(self):
-        self.docker_network = SingleNodeDockerCluster.create_docker_network()
-        for cluster in self.clusters.values():
-            cluster.set_network(self.docker_network)
+        if self.docker_network is None:
+            logging.info("Setting up new network.")
+            self.docker_network = SingleNodeDockerCluster.create_docker_network()
+            for cluster in self.clusters.values():
+                cluster.set_network(self.docker_network)
+        else:
+            logging.info("Network is already set.")
+
+    def wait_for_cluster_startup_finish(self, cluster):
+        startup_success = True
+        logging.info("Engine: %s", cluster.get_engine())
+        if cluster.get_engine() == "minifi-cpp":
+            startup_success = cluster.wait_for_app_logs("Starting Flow Controller", 120)
+        elif cluster.get_engine() == "nifi":
+            startup_success = cluster.wait_for_app_logs("Starting Flow Controller...", 120)
+        elif cluster.get_engine() == "kafka-broker":
+            startup_success = cluster.wait_for_app_logs("Kafka startTimeMs", 120)
+        elif cluster.get_engine() == "http-proxy":
+            startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket connections at", 120)
+        elif cluster.get_engine() == "s3-server":
+            startup_success = cluster.wait_for_app_logs("Started S3MockApplication", 120)
+        elif cluster.get_engine() == "azure-storage-server":
+            startup_success = cluster.wait_for_app_logs("Azurite Queue service is successfully listening at", 120)
+        if not startup_success:
+            logging.error("Cluster startup failed for %s", cluster.get_name())
+            cluster.log_app_output()
+        return startup_success
+
+    def start_single_cluster(self, cluster_name):
+        self.set_up_cluster_network()
+        cluster = self.clusters[cluster_name]
+        cluster.deploy_flow()
+        assert self.wait_for_cluster_startup_finish(cluster)
 
     def start(self):
         logging.info("MiNiFi_integration_test start")
         self.set_up_cluster_network()
         for cluster in self.clusters.values():
-            logging.info("Starting cluster %s with an engine of %s", cluster.get_name(), cluster.get_engine())
-            cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
-            cluster.deploy_flow()
-        for _, cluster in self.clusters.items():
-            startup_success = True
-            logging.info("Engine: %s", cluster.get_engine())
-            if cluster.get_engine() == "minifi-cpp":
-                startup_success = cluster.wait_for_app_logs("Starting Flow Controller", 120)
-            elif cluster.get_engine() == "nifi":
-                startup_success = cluster.wait_for_app_logs("Starting Flow Controller...", 120)
-            elif cluster.get_engine() == "kafka-broker":
-                startup_success = cluster.wait_for_app_logs("Startup complete.", 120)
-            elif cluster.get_engine() == "http-proxy":
-                startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket connections at", 120)
-            elif cluster.get_engine() == "s3-server":
-                startup_success = cluster.wait_for_app_logs("Started S3MockApplication", 120)
-            elif cluster.get_engine() == "azure-storage-server":
-                startup_success = cluster.wait_for_app_logs("Azurite Queue service is successfully listening at", 120)
-            if not startup_success:
-                logging.error("Cluster startup failed for %s", cluster.get_name())
-                cluster.log_app_output()
-            assert startup_success
+            if len(cluster.containers) == 0:
+                logging.info("Starting cluster %s with an engine of %s", cluster.get_name(), cluster.get_engine())
+                cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
+                cluster.deploy_flow()
+            else:
+                logging.info("Container %s is already started with an engine of %s", cluster.get_name(), cluster.get_engine())
+        for cluster in self.clusters.values():
+            assert self.wait_for_cluster_startup_finish(cluster)
 
     def add_node(self, processor):
         if processor.get_name() in (elem.get_name() for elem in self.connectable_nodes):
@@ -135,7 +163,7 @@ class MiNiFi_integration_test():
         for node in self.connectable_nodes:
             if name == node.get_name():
                 return node
-        raise Exception("Trying to fetch unknow node: \"%s\"" % name)
+        raise Exception("Trying to fetch unknown node: \"%s\"" % name)
 
     def add_remote_process_group(self, remote_process_group):
         if remote_process_group.get_name() in (elem.get_name() for elem in self.remote_process_groups):
@@ -161,43 +189,60 @@ class MiNiFi_integration_test():
     def put_test_resource(self, file_name, contents):
         self.docker_directory_bindings.put_test_resource(self.test_id, file_name, contents)
 
-    def get_out_subdir(self, subdir):
-        return self.docker_directory_bindings.get_out_subdir(self.test_id, subdir)
-
-    def rm_out_child(self, subdir):
-        self.docker_directory_bindings.rm_out_child(self.test_id, subdir)
+    def rm_out_child(self):
+        self.docker_directory_bindings.rm_out_child(self.test_id)
 
     def add_file_system_observer(self, file_system_observer):
         self.file_system_observer = file_system_observer
 
-    def check_for_no_files_generated(self, timeout_seconds, subdir=''):
+    def check_for_no_files_generated(self, timeout_seconds):
         output_validator = NoFileOutPutValidator()
         output_validator.set_output_dir(self.file_system_observer.get_output_dir())
-        self.check_output(timeout_seconds, output_validator, 1, subdir)
+        self.check_output(timeout_seconds, output_validator, 1)
 
-    def check_for_file_with_content_generated(self, content, timeout_seconds, subdir=''):
+    def check_for_single_file_with_content_generated(self, content, timeout_seconds):
         output_validator = SingleFileOutputValidator(content)
         output_validator.set_output_dir(self.file_system_observer.get_output_dir())
-        self.check_output(timeout_seconds, output_validator, 1, subdir)
+        self.check_output(timeout_seconds, output_validator, 1)
 
-    def check_for_multiple_files_generated(self, file_count, timeout_seconds, subdir=''):
-        output_validator = MultiFileOutputValidator(file_count, subdir)
+    def check_for_multiple_files_generated(self, file_count, timeout_seconds, expected_content=[]):
+        output_validator = MultiFileOutputValidator(file_count, expected_content)
         output_validator.set_output_dir(self.file_system_observer.get_output_dir())
-        self.check_output(timeout_seconds, output_validator, file_count, subdir)
+        self.check_output(timeout_seconds, output_validator, file_count)
 
-    def check_for_multiple_empty_files_generated(self, timeout_seconds, subdir=''):
+    def check_for_at_least_one_file_with_content_generated(self, content, timeout_seconds):
+        output_validator = SingleOrMultiFileOutputValidator(content)
+        output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, 1)
+
+    def check_for_num_files_generated(self, num_flowfiles, timeout_seconds):
+        output_validator = NoContentCheckFileNumberValidator(num_flowfiles)
+        output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, max(1, num_flowfiles))
+
+    def check_for_num_file_range_generated(self, min_files, max_files, timeout_seconds):
+        output_validator = NumFileRangeValidator(min_files, max_files)
+        output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output_force_wait(timeout_seconds, output_validator)
+
+    def check_for_an_empty_file_generated(self, timeout_seconds):
         output_validator = EmptyFilesOutPutValidator()
         output_validator.set_output_dir(self.file_system_observer.get_output_dir())
-        self.check_output(timeout_seconds, output_validator, 2, subdir)
+        self.check_output(timeout_seconds, output_validator, 1)
+
+    def check_output_force_wait(self, timeout_seconds, output_validator):
+        time.sleep(timeout_seconds)
+        self.validate(output_validator)
+
+    def check_output(self, timeout_seconds, output_validator, max_files):
+        self.file_system_observer.wait_for_output(timeout_seconds, max_files)
+        self.validate(output_validator)
 
-    def check_output(self, timeout_seconds, output_validator, max_files, subdir):
-        if subdir:
-            output_validator.subdir = subdir
-        self.file_system_observer.wait_for_output(timeout_seconds, output_validator, max_files)
+    def validate(self, validator):
         for cluster in self.clusters.values():
             cluster.log_app_output()
             assert not cluster.segfault_happened()
-        assert output_validator.validate()
+        assert validator.validate()
 
     def check_s3_server_object_data(self, cluster_name, object_data):
         cluster = self.acquire_cluster(cluster_name)
@@ -217,3 +262,7 @@ class MiNiFi_integration_test():
     def check_azure_storage_server_data(self, cluster_name, object_data):
         cluster = self.acquire_cluster(cluster_name)
         assert cluster.check_azure_storage_server_data(object_data)
+
+    def wait_for_kafka_consumer_to_be_registered(self, cluster_name):
+        cluster = self.acquire_cluster(cluster_name)
+        assert cluster.wait_for_kafka_consumer_to_be_registered()
diff --git a/docker/test/integration/environment.py b/docker/test/integration/environment.py
index 46c68c3..120790f 100644
--- a/docker/test/integration/environment.py
+++ b/docker/test/integration/environment.py
@@ -1,8 +1,9 @@
-from behave import fixture, use_fixture
 import logging
+import datetime
 import sys
 sys.path.append('../minifi')
 
+
 from MiNiFi_integration_test_driver import MiNiFi_integration_test  # noqa: E402
 from minifi import *  # noqa
 
@@ -11,20 +12,31 @@ def raise_exception(exception):
     raise exception
 
 
-@fixture
-def test_driver_fixture(context):
-    context.test = MiNiFi_integration_test(context)
-    yield context.test
-    logging.info("Integration test teardown...")
-    del context.test
+def integration_test_cleanup(test):
+    logging.info("Integration test cleanup...")
+    del test
 
 
 def before_scenario(context, scenario):
-    use_fixture(test_driver_fixture, context)
+    if "skip" in scenario.effective_tags:
+        scenario.skip("Marked with @skip")
+        return
+
+    logging.info("Integration test setup at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
+    context.test = MiNiFi_integration_test(context)
 
 
 def after_scenario(context, scenario):
-    pass
+    if "skip" in scenario.effective_tags:
+        logging.info("Scenario was skipped, no need for clean up.")
+        return
+
+    logging.info("Integration test teardown at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
+    if context is not None and hasattr(context, "test"):
+        context.test.cleanup()  # force invocation
+        del context.test
+    else:
+        raise Exception("Unable to manually clean up test context. Might already be deleted?")
 
 
 def before_all(context):
diff --git a/docker/test/integration/features/http.feature b/docker/test/integration/features/http.feature
index 2e5b479..d34d36f 100644
--- a/docker/test/integration/features/http.feature
+++ b/docker/test/integration/features/http.feature
@@ -6,11 +6,12 @@ Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
   Background:
     Given the content of "/tmp/output" is monitored
 
-  Scenario: A MiNiFi instance transfers data to another MiNiFi instance
+  Scenario: A MiNiFi instance transfers data to another MiNiFi instance with message body
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And the "Keep Source File" property of the GetFile processor is set to "true"
     And a file with the content "test" is present in "/tmp/input"
     And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary:8080/contentListener"
-    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+    And the "HTTP Method" property of the InvokeHTTP processor is set to "POST"
     And the "success" relationship of the GetFile processor is connected to the InvokeHTTP
 
     And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow
@@ -18,10 +19,11 @@ Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
     And the "success" relationship of the ListenHTTP processor is connected to the PutFile
 
     When both instances start up
-    Then a flowfile with the content "test" is placed in the monitored directory in less than 30 seconds
+    Then at least one flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
 
   Scenario: A MiNiFi instance sends data through a HTTP proxy and another one listens
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And the "Keep Source File" property of the GetFile processor is set to "true"
     And a file with the content "test" is present in "/tmp/input"
     And a InvokeHTTP processor with the "Remote URL" property set to "http://minifi-listen:8080/contentListener"
     And these processor properties are set to match the http proxy:
@@ -40,15 +42,16 @@ Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
     And the "success" relationship of the ListenHTTP processor is connected to the PutFile
 
     When all instances start up
-    Then a flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
+    Then at least one flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
     And no errors were generated on the "http-proxy" regarding "http://minifi-listen:8080/contentListener"
 
   Scenario: A MiNiFi instance and transfers hashed data to another MiNiFi instance
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And the "Keep Source File" property of the GetFile processor is set to "true"
     And a file with the content "test" is present in "/tmp/input"
     And a HashContent processor with the "Hash Attribute" property set to "hash"
     And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary:8080/contentListener"
-    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+    And the "HTTP Method" property of the InvokeHTTP processor is set to "POST"
     And the "success" relationship of the GetFile processor is connected to the HashContent
     And the "success" relationship of the HashContent processor is connected to the InvokeHTTP
 
@@ -57,14 +60,15 @@ Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
     And the "success" relationship of the ListenHTTP processor is connected to the PutFile
 
     When both instances start up
-    Then a flowfile with the content "test" is placed in the monitored directory in less than 30 seconds
+    Then at least one flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
 
   Scenario: A MiNiFi instance transfers data to another MiNiFi instance without message body
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And the "Keep Source File" property of the GetFile processor is set to "true"
     And a file with the content "test" is present in "/tmp/input"
     And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary:8080/contentListener"
-    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
-    And the "Send Message Body" of the InvokeHTTP processor is set to "false"
+    And the "HTTP Method" property of the InvokeHTTP processor is set to "POST"
+    And the "Send Message Body" property of the InvokeHTTP processor is set to "false"
     And the "success" relationship of the GetFile processor is connected to the InvokeHTTP
 
     And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow
@@ -72,4 +76,4 @@ Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
     And the "success" relationship of the ListenHTTP processor is connected to the PutFile
 
     When both instances start up
-    Then at least one empty flowfile is placed in the monitored directory in less than 30 seconds
+    Then at least one empty flowfile is placed in the monitored directory in less than 120 seconds
diff --git a/docker/test/integration/features/https.feature b/docker/test/integration/features/https.feature
index de13b05..9d6c6a1 100644
--- a/docker/test/integration/features/https.feature
+++ b/docker/test/integration/features/https.feature
@@ -1,3 +1,5 @@
+# FIXME: Failing feature, needs to be fixed
+@skip
 Feature: Using SSL context service to send data with TLS
   In order to send data via HTTPS
   As a user of MiNiFi
@@ -10,7 +12,7 @@ Feature: Using SSL context service to send data with TLS
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
     And a InvokeHTTP processor with the "Remote URL" property set to "https://secondary:4430/contentListener"
-    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+    And the "HTTP Method" property of the InvokeHTTP processor is set to "POST"
 
     And the "success" relationship of the GetFile processor is connected to the InvokeHTTP
 
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
index 2395115..32e784b 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -57,3 +57,198 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
 
     When both instances start up
     Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: MiNiFi consumes data from a kafka topic
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And a message with content "some test message" is published to the "ConsumeKafkaTest" topic
+
+    Then at least one flowfile with the content "some test message" is placed in the monitored directory in less than 60 seconds
+
+  Scenario Outline: ConsumeKafka parses and uses kafka topics and topic name formats
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "Topic Names" property of the ConsumeKafka processor is set to "<topic names>"
+    And the "Topic Name Format" property of the ConsumeKafka processor is set to "<topic name format>"
+    And the "Offset Reset" property of the ConsumeKafka processor is set to "earliest"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+    And the kafka broker "broker" is started
+    And the topic "ConsumeKafkaTest" is initialized on the kafka broker
+
+    When a message with content "<message 1>" is published to the "ConsumeKafkaTest" topic
+    And all other processes start up
+    And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic
+
+    Then two flowfiles with the contents "<message 1>" and "<message 2>" are placed in the monitored directory in less than 90 seconds
+
+  Examples: Topic names and formats to test
+    | message 1            | message 2           | topic names              | topic name format |
+    | Ulysses              | James Joyce         | ConsumeKafkaTest         | (not set)         |
+    | The Great Gatsby     | F. Scott Fitzgerald | ConsumeKafkaTest         | Names             |
+    | War and Peace        | Lev Tolstoy         | a,b,c,ConsumeKafkaTest,d | Names             |
+    | Nineteen Eighty Four | George Orwell       | ConsumeKafkaTest         | Patterns          |
+    | Hamlet               | William Shakespeare | Cons[emu]*KafkaTest      | Patterns          |
+
+  Scenario: ConsumeKafka consumes only messages after MiNiFi startup when "Offset Reset" property is set to latest
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "Topic Names" property of the ConsumeKafka processor is set to "ConsumeKafkaTest"
+    And the "Offset Reset" property of the ConsumeKafka processor is set to "latest"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+    And the kafka broker "broker" is started
+    And the topic "ConsumeKafkaTest" is initialized on the kafka broker
+
+    When a message with content "Ulysses" is published to the "ConsumeKafkaTest" topic
+    And all other processes start up
+    And the Kafka consumer is registered in kafka broker "broker"
+    And a message with content "James Joyce" is published to the "ConsumeKafkaTest" topic
+
+    Then a flowfile with the content "James Joyce" is placed in the monitored directory in less than 60 seconds
+
+  Scenario Outline: ConsumeKafka key attribute is encoded according to the "Key Attribute Encoding" property
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "Key Attribute Encoding" property of the ConsumeKafka processor is set to "<key attribute encoding>"
+    And a RouteOnAttribute processor in the "kafka-consumer-flow" flow
+    And a LogAttribute processor in the "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" property of the RouteOnAttribute processor is set to match <key attribute encoding> encoded kafka message key "consume_kafka_test_key"
+
+    And the "success" relationship of the ConsumeKafka processor is connected to the LogAttribute
+    And the "success" relationship of the LogAttribute processor is connected to the RouteOnAttribute
+    And the "success" relationship of the RouteOnAttribute processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And a message with content "<message 1>" is published to the "ConsumeKafkaTest" topic with key "consume_kafka_test_key"
+    And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic with key "consume_kafka_test_key"
+
+    Then two flowfiles with the contents "<message 1>" and "<message 2>" are placed in the monitored directory in less than 45 seconds
+
+  Examples: Key attribute encoding values
+    | message 1            | message 2                     | key attribute encoding |
+    | The Odyssey          | Ὅμηρος                        | (not set)              |
+    | Lolita               | Владимир Владимирович Набоков | utf-8                  |
+    | Crime and Punishment | Фёдор Михайлович Достоевский  | hex                    |
+    | Paradise Lost        | John Milton                   | hEX                    |
+
+  Scenario Outline: ConsumeKafka transactional behaviour is supported
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "Topic Names" property of the ConsumeKafka processor is set to "ConsumeKafkaTest"
+    And the "Honor Transactions" property of the ConsumeKafka processor is set to "<honor transactions>"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And the publisher performs a <transaction type> transaction publishing to the "ConsumeKafkaTest" topic these messages: <messages sent>
+
+    Then <number of flowfiles expected> flowfiles are placed in the monitored directory in less than 30 seconds
+
+  Examples: Transaction descriptions
+    | messages sent                     | transaction type             | honor transactions | number of flowfiles expected |
+    | Pride and Prejudice, Jane Austen  | SINGLE_COMMITTED_TRANSACTION | (not set)          | 2                            |
+    | Dune, Frank Herbert               | TWO_SEPARATE_TRANSACTIONS    | (not set)          | 2                            |
+    | The Black Sheep, Honore De Balzac | NON_COMMITTED_TRANSACTION    | (not set)          | 0                            |
+    | Gospel of Thomas                  | CANCELLED_TRANSACTION        | (not set)          | 0                            |
+    | Operation Dark Heart              | CANCELLED_TRANSACTION        | true               | 0                            |
+    | Brexit                            | CANCELLED_TRANSACTION        | false              | 1                            |
+
+  Scenario Outline: Headers on consumed kafka messages are extracted into attributes if requested on ConsumeKafka
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "Headers To Add As Attributes" property of the ConsumeKafka processor is set to "<headers to add as attributes>"
+    And the "Duplicate Header Handling" property of the ConsumeKafka processor is set to "<duplicate header handling>"
+    And a RouteOnAttribute processor in the "kafka-consumer-flow" flow
+    And a LogAttribute processor in the "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" property of the RouteOnAttribute processor is set to match the attribute "<headers to add as attributes>" to "<expected value>"
+
+    And the "success" relationship of the ConsumeKafka processor is connected to the LogAttribute
+    And the "success" relationship of the LogAttribute processor is connected to the RouteOnAttribute
+    And the "success" relationship of the RouteOnAttribute processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And a message with content "<message 1>" is published to the "ConsumeKafkaTest" topic with headers "<message headers sent>"
+    And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic with headers "<message headers sent>"
+
+    Then two flowfiles with the contents "<message 1>" and "<message 2>" are placed in the monitored directory in less than 45 seconds
+
+  Examples: Messages with headers
+    | message 1             | message 2         | message headers sent        | headers to add as attributes | expected value       | duplicate header handling |
+    | Homeland              | R. A. Salvatore   | Contains dark elves: yes    | (not set)                    | (not set)            | (not set)                 |
+    | Magician              | Raymond E. Feist  | Rating: 10/10               | Rating                       | 10/10                | (not set)                 |
+    | Mistborn              | Brandon Sanderson | Metal: Copper; Metal: Iron  | Metal                        | Copper               | Keep First                |
+    | Mistborn              | Brandon Sanderson | Metal: Copper; Metal: Iron  | Metal                        | Iron                 | Keep Latest               |
+    | Mistborn              | Brandon Sanderson | Metal: Copper; Metal: Iron  | Metal                        | Copper, Iron         | Comma-separated Merge     |
+    | The Lord of the Rings | J. R. R. Tolkien  | Parts: First, second, third | Parts                        | First, second, third | (not set)                 |
+
+  Scenario: Messages are separated into multiple flowfiles if the message demarcator is present in the message
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "Message Demarcator" property of the ConsumeKafka processor is set to "a"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And a message with content "Barbapapa" is published to the "ConsumeKafkaTest" topic
+    And a message with content "Anette Tison and Talus Taylor" is published to the "ConsumeKafkaTest" topic
+
+    Then flowfiles with these contents are placed in the monitored directory in less than 45 seconds: "B,rb,p,Anette Tison ,nd T,lus T,ylor"
+
+  Scenario Outline: The ConsumeKafka "Maximum Poll Records" property sets a limit on the messages processed in a single batch
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And a LogAttribute processor in the "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+
+    And the "Max Poll Records" property of the ConsumeKafka processor is set to "<max poll records>"
+    And the scheduling period of the ConsumeKafka processor is set to "<scheduling period>"
+    And the scheduling period of the LogAttribute processor is set to "<scheduling period>"
+    And the "FlowFiles To Log" property of the LogAttribute processor is set to "<max poll records>"
+
+    And the "success" relationship of the ConsumeKafka processor is connected to the LogAttribute
+    And the "success" relationship of the LogAttribute processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And 1000 kafka messages are sent to the topic "ConsumeKafkaTest"
+
+    Then after a wait of <polling time>, at least <min expected messages> and at most <max expected messages> flowfiles are produced and placed in the monitored directory
+
+  Examples: Message batching
+    | max poll records | scheduling period | polling time | min expected messages | max expected messages |
+    | 3                | 5 sec             | 15 seconds   | 6                     | 12                    |
+    | 6                | 5 sec             | 15 seconds   | 12                    | 24                    |
+
+  Scenario Outline: Unsupported encoding attributes for ConsumeKafka throw scheduling errors
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "<property name>" property of the ConsumeKafka processor is set to "<property value>"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And a message with content "<message 1>" is published to the "ConsumeKafkaTest" topic
+    And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic
+
+    Then no files are placed in the monitored directory in 30 seconds of running time
+
+    Examples: Unsupported property values
+      | message 1        | message 2      | property name           | property value |
+      | Miyamoto Musashi | Eiji Yoshikawa | Key Attribute Encoding  | UTF-32         |
+      | Shogun           | James Clavell  | Message Header Encoding | UTF-32         |
diff --git a/docker/test/integration/features/s3.feature b/docker/test/integration/features/s3.feature
index 4f6ee74..571b982 100644
--- a/docker/test/integration/features/s3.feature
+++ b/docker/test/integration/features/s3.feature
@@ -156,10 +156,10 @@ Feature: Sending data from MiNiFi-C++ to an AWS server
 
   Scenario: A MiNiFi instance can list an S3 bucket directly
     Given a TailFile processor with the "File to Tail" property set to "/tmp/input/test_file.log"
-    And the "Input Delimiter" of the TailFile processor is set to "%"
+    And the "Input Delimiter" property of the TailFile processor is set to "%"
     And a file with filename "test_file.log" and content "test_data%" is present in "/tmp/input"
     And a PutS3Object processor set up to communicate with an s3 server
-    And the "Object Key" of the PutS3Object processor is set to "${filename}"
+    And the "Object Key" property of the PutS3Object processor is set to "${filename}"
     And the "success" relationship of the TailFile processor is connected to the PutS3Object
 
     Given a ListS3 processor in the "secondary" flow
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 0131a72..c5c4e01 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -55,7 +55,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
             self.segfault = True
 
         try:
-            apps = [("MiNiFi", self.minifi_root + '/logs/minifi-app.log'), ("NiFi", self.nifi_root + '/logs/nifi-app.log'), ("Kafka", self.kafka_broker_root + '/logs/server.log')]
+            apps = [("MiNiFi", self.minifi_root + '/logs/minifi-app.log'), ("NiFi", self.nifi_root + '/logs/nifi-app.log')]
             if container.status == 'running':
                 for app in apps:
                     app_log_status, app_log = container.exec_run('/bin/sh -c \'cat ' + app[1] + '\'')
@@ -134,17 +134,8 @@ class DockerTestCluster(SingleNodeDockerCluster):
         ls_result = subprocess.check_output(["docker", "exec", "s3-server", "ls", s3_mock_dir + "/test_bucket/"]).decode(self.get_stdout_encoding())
         return not ls_result
 
-    def wait_for_container_logs(self, container_name, log, timeout, count=1):
-        logging.info('Waiting for logs `%s` in container `%s`', log, container_name)
-        container = self.containers[container_name]
-        check_count = 0
-        while check_count <= timeout:
-            if count <= container.logs().decode("utf-8").count(log):
-                return True
-            else:
-                check_count += 1
-                time.sleep(1)
-        return False
-
     def segfault_happened(self):
         return self.segfault
+
+    def wait_for_kafka_consumer_to_be_registered(self):
+        return self.wait_for_app_logs("Assignment received from leader for group docker_test_group", 60)
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index c1dd3c9..c66e62f 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -77,6 +77,7 @@ class DockerTestDirectoryBindings:
         logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path)
         with open(file_abs_path, 'ab') as test_input_file:
             test_input_file.write(contents)
+        os.chmod(file_abs_path, 0o0777)
 
     def put_test_resource(self, test_id, file_name, contents):
         """
@@ -95,9 +96,6 @@ class DockerTestDirectoryBindings:
         file_abs_path = os.path.join(self.docker_path_to_local_path(test_id, path), file_name)
         self.put_file_contents(file_abs_path, contents)
 
-    def get_out_subdir(self, test_id, dir):
-        return os.path.join(self.data_directories[test_id]["output_dir"], dir)
-
     def rm_out_child(self, test_id, dir):
         child = os.path.join(self.data_directories[test_id]["output_dir"], dir)
         logging.info('Removing %s from output folder', child)
diff --git a/docker/test/integration/minifi/core/FileSystemObserver.py b/docker/test/integration/minifi/core/FileSystemObserver.py
index f73c5a3..0411bf5 100644
--- a/docker/test/integration/minifi/core/FileSystemObserver.py
+++ b/docker/test/integration/minifi/core/FileSystemObserver.py
@@ -31,16 +31,21 @@ class FileSystemObserver(object):
         self.observer.schedule(self.event_handler, self.test_output_dir, recursive=True)
         self.observer.start()
 
-    def wait_for_output(self, timeout_seconds, output_validator, max_files):
+    def wait_for_output(self, timeout_seconds, max_files):
         logging.info('Waiting up to %d seconds for %d test outputs...', timeout_seconds, max_files)
         self.restart_observer_if_needed()
+        if max_files <= self.event_handler.get_num_files_created():
+            return
         wait_start_time = time.perf_counter()
-        for i in range(0, max_files):
+        for _ in range(0, max_files):
             # Note: The timing on Event.wait() is inaccurate
-            self.done_event.wait(timeout_seconds)
-            self.done_event.clear()
-            current_time = time.perf_counter()
-            if timeout_seconds < (current_time - wait_start_time) or output_validator.validate():
+            self.done_event.wait(timeout_seconds - time.perf_counter() + wait_start_time)
+            if self.done_event.isSet():
+                self.done_event.clear()
+                if max_files <= self.event_handler.get_num_files_created():
+                    self.done_event.set()
+                    return
+            if timeout_seconds < (time.perf_counter() - wait_start_time):
                 break
         self.observer.stop()
         self.observer.join()
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/minifi/core/OutputEventHandler.py
index d6986fe..c245e6d 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -1,4 +1,6 @@
 import logging
+import threading
+import os
 
 from watchdog.events import FileSystemEventHandler
 
@@ -6,13 +8,27 @@ from watchdog.events import FileSystemEventHandler
 class OutputEventHandler(FileSystemEventHandler):
     def __init__(self, done_event):
         self.done_event = done_event
+        self.files_created_lock = threading.Lock()
+        self.files_created = 0
+
+    def get_num_files_created(self):
+        with self.files_created_lock:
+            return self.files_created
 
     def on_created(self, event):
-        logging.info('Output file created: ' + event.src_path)
+        if os.path.isfile(event.src_path):
+            logging.info("Output file created: " + event.src_path)
+            with open(os.path.abspath(event.src_path), "r") as out_file:
+                logging.info("Contents: %s", out_file.read())
+            with self.files_created_lock:
+                self.files_created += 1
         self.done_event.set()
 
     def on_modified(self, event):
-        logging.info('Output file modified: ' + event.src_path)
+        if os.path.isfile(event.src_path):
+            logging.info("Output file modified: " + event.src_path)
+            with open(os.path.abspath(event.src_path), "r") as out_file:
+                logging.info("Contents: %s", out_file.read())
 
     def on_deleted(self, event):
-        logging.info('Output file modified: ' + event.src_path)
+        logging.info("Output file deleted: " + event.src_path)
diff --git a/docker/test/integration/minifi/core/Processor.py b/docker/test/integration/minifi/core/Processor.py
index 8b7eb10..217d0aa 100644
--- a/docker/test/integration/minifi/core/Processor.py
+++ b/docker/test/integration/minifi/core/Processor.py
@@ -44,8 +44,14 @@ class Processor(Connectable):
         else:
             self.properties[key] = value
 
+    def unset_property(self, key):
+        self.properties.pop(key, None)
+
+    def set_scheduling_strategy(self, value):
+        self.schedule["scheduling strategy"] = value
+
     def set_scheduling_period(self, value):
-        self.schedule['scheduling period'] = value
+        self.schedule["scheduling period"] = value
 
     def nifi_property_key(self, key):
         """
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 79ca7dc..3d30a90 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -124,6 +124,7 @@ class SingleNodeDockerCluster(Cluster):
                 USER root
                 ADD config.yml {minifi_root}/conf/config.yml
                 RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
+                RUN sed -i -e 's/INFO/DEBUG/g' {minifi_root}/conf/minifi-log.properties
                 USER minificpp
                 """.format(base_image='apacheminificpp:' + self.minifi_version,
                            minifi_root=self.minifi_root))
@@ -160,8 +161,7 @@ class SingleNodeDockerCluster(Cluster):
             volumes=self.vols)
         self.network.reload()
 
-        logging.info('Started container \'%s\'', container.name)
-
+        logging.info('Adding container \'%s\'', container.name)
         self.containers[container.name] = container
 
     def deploy_nifi_flow(self):
@@ -211,8 +211,7 @@ class SingleNodeDockerCluster(Cluster):
             network=self.network.name,
             volumes=self.vols)
 
-        logging.info('Started container \'%s\'', container.name)
-
+        logging.info('Adding container \'%s\'', container.name)
         self.containers[container.name] = container
 
     def deploy_kafka_broker(self):
@@ -223,6 +222,7 @@ class SingleNodeDockerCluster(Cluster):
             name='zookeeper',
             network=self.network.name,
             ports={'2181/tcp': 2181})
+        logging.info('Adding container \'%s\'', zookeeper.name)
         self.containers[zookeeper.name] = zookeeper
 
         test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
@@ -232,26 +232,21 @@ class SingleNodeDockerCluster(Cluster):
             detach=True,
             name='kafka-broker',
             network=self.network.name,
-            ports={'9092/tcp': 9092},
-            environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"])
+            ports={'9092/tcp': 9092, '29092/tcp': 29092},
+            environment=[
+                "KAFKA_BROKER_ID=1",
+                'ALLOW_PLAINTEXT_LISTENER: "yes"',
+                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://0.0.0.0:29092",
+                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL",
+                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://localhost:29092",
+                "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"])
+        logging.info('Adding container \'%s\'', broker.name)
         self.containers[broker.name] = broker
 
-        dockerfile = dedent("""FROM {base_image}
-                USER root
-                CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt
-                """.format(base_image='wurstmeister/kafka:2.12-2.5.0'))
-        configured_image = self.build_image(dockerfile, [])
-        consumer = self.client.containers.run(
-            configured_image[0],
-            detach=True,
-            name='kafka-consumer',
-            network=self.network.name)
-        self.containers[consumer.name] = consumer
-
     def deploy_http_proxy(self):
         logging.info('Creating and running http-proxy docker container...')
         dockerfile = dedent("""FROM {base_image}
-                RUN apt update && apt install -y apache2-utils
+                RUN apt -y update && apt install -y apache2-utils
                 RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
                 RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users'  > /etc/squid/squid.conf && \
                     echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \
diff --git a/docker/test/integration/minifi/processors/ConsumeKafka.py b/docker/test/integration/minifi/processors/ConsumeKafka.py
new file mode 100644
index 0000000..35fabd9
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ConsumeKafka.py
@@ -0,0 +1,20 @@
+from ..core.Processor import Processor
+
+
+class ConsumeKafka(Processor):
+    def __init__(self, schedule=None):
+        super(ConsumeKafka, self).__init__(
+            "ConsumeKafka",
+            properties={
+                "Kafka Brokers": "kafka-broker:9092",
+                "Topic Names": "ConsumeKafkaTest",
+                "Topic Name Format": "Names",
+                "Honor Transactions": "true",
+                "Group ID": "docker_test_group",
+                "Offset Reset": "earliest",
+                "Key Attribute Encoding": "UTF-8",
+                "Message Header Encoding": "UTF-8",
+                "Max Poll Time": "4 sec",
+                "Session Timeout": "60 sec"},
+            auto_terminate=["success"],
+            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/GetFile.py b/docker/test/integration/minifi/processors/GetFile.py
index 70d7bb7..de7b182 100644
--- a/docker/test/integration/minifi/processors/GetFile.py
+++ b/docker/test/integration/minifi/processors/GetFile.py
@@ -7,7 +7,6 @@ class GetFile(Processor):
             'GetFile',
             properties={
                 'Input Directory': input_dir,
-                'Keep Source File': 'true'
             },
             schedule=schedule,
             auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/RouteOnAttribute.py b/docker/test/integration/minifi/processors/RouteOnAttribute.py
new file mode 100644
index 0000000..c7a9158
--- /dev/null
+++ b/docker/test/integration/minifi/processors/RouteOnAttribute.py
@@ -0,0 +1,10 @@
+from ..core.Processor import Processor
+
+
+class RouteOnAttribute(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(RouteOnAttribute, self).__init__(
+            'RouteOnAttribute',
+            properties={},
+            schedule=schedule,
+            auto_terminate=['unmatched', "failure"])
diff --git a/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
index 07eac6e..521a7e9 100644
--- a/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
+++ b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
@@ -23,6 +23,6 @@ class EmptyFilesOutPutValidator(FileOutputValidator):
         logging.info("Output folder: %s", full_dir)
         listing = listdir(full_dir)
         if listing:
-            self.valid = all(os.path.getsize(os.path.join(full_dir, x)) == 0 for x in listing)
+            self.valid = 0 < self.get_num_files(full_dir) and all(os.path.getsize(os.path.join(full_dir, x)) == 0 for x in listing)
 
         return self.valid
diff --git a/docker/test/integration/minifi/validators/FileOutputValidator.py b/docker/test/integration/minifi/validators/FileOutputValidator.py
index caaf789..7e16a17 100644
--- a/docker/test/integration/minifi/validators/FileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/FileOutputValidator.py
@@ -1,3 +1,9 @@
+import logging
+import os
+
+from os import listdir
+from os.path import join
+
 from .OutputValidator import OutputValidator
 
 
@@ -5,5 +11,37 @@ class FileOutputValidator(OutputValidator):
     def set_output_dir(self, output_dir):
         self.output_dir = output_dir
 
+    @staticmethod
+    def num_files_matching_content_in_dir(dir_path, expected_content):
+        listing = listdir(dir_path)
+        if not listing:
+            return 0
+        files_of_matching_content_found = 0
+        for file_name in listing:
+            full_path = join(dir_path, file_name)
+            if not os.path.isfile(full_path):
+                continue
+            with open(full_path, 'r') as out_file:
+                contents = out_file.read()
+                logging.info("dir %s -- name %s", dir_path, file_name)
+                logging.info("expected content: %s -- actual: %s, match: %r", expected_content, contents, expected_content in contents)
+                if expected_content in contents:
+                    files_of_matching_content_found += 1
+        return files_of_matching_content_found
+
+    @staticmethod
+    def get_num_files(dir_path):
+        listing = listdir(dir_path)
+        logging.info("Num files in %s: %d", dir_path, len(listing))
+        if not listing:
+            return 0
+        files_found = 0
+        for file_name in listing:
+            full_path = join(dir_path, file_name)
+            if os.path.isfile(full_path):
+                logging.info("Found output file in %s: %s", dir_path, file_name)
+                files_found += 1
+        return files_found
+
     def validate(self, dir=''):
         pass
diff --git a/docker/test/integration/minifi/validators/MultiFileOutputValidator.py b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
index 05b439d..abd2326 100644
--- a/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
@@ -12,15 +12,25 @@ class MultiFileOutputValidator(FileOutputValidator):
     Validates the content of multiple files in the given directory, also verifying that the old files are not rewritten.
     """
 
-    def __init__(self, expected_file_count, subdir=''):
+    def __init__(self, expected_file_count, expected_content=[]):
         self.valid = False
         self.expected_file_count = expected_file_count
-        self.subdir = subdir
         self.file_timestamps = dict()
+        self.expected_content = expected_content
+
+    def check_expected_content(self, full_dir):
+        if not self.expected_content:
+            return True
+
+        for content in self.expected_content:
+            if self.num_files_matching_content_in_dir(full_dir, content) == 0:
+                return False
+
+        return True
 
     def validate(self):
         self.valid = False
-        full_dir = os.path.join(self.output_dir, self.subdir)
+        full_dir = os.path.join(self.output_dir)
         logging.info("Output folder: %s", full_dir)
 
         if not os.path.isdir(full_dir):
@@ -49,7 +59,7 @@ class MultiFileOutputValidator(FileOutputValidator):
             logging.info("New file added %s", full_path)
 
             if len(self.file_timestamps) == self.expected_file_count:
-                self.valid = True
+                self.valid = self.check_expected_content(full_dir)
                 return self.valid
 
         return self.valid
diff --git a/docker/test/integration/minifi/validators/NoContentCheckFileNumberValidator.py b/docker/test/integration/minifi/validators/NoContentCheckFileNumberValidator.py
new file mode 100644
index 0000000..f91336a
--- /dev/null
+++ b/docker/test/integration/minifi/validators/NoContentCheckFileNumberValidator.py
@@ -0,0 +1,22 @@
+import logging
+import os
+
+from .FileOutputValidator import FileOutputValidator
+
+
+class NoContentCheckFileNumberValidator(FileOutputValidator):
+    """
+    Validates the number of files created without content validation.
+    """
+
+    def __init__(self, num_files_expected):
+        self.num_files_expected = num_files_expected
+
+    def validate(self):
+        full_dir = os.path.join(self.output_dir)
+        logging.info("Output folder: %s", full_dir)
+
+        if not os.path.isdir(full_dir):
+            return False
+
+        return self.num_files_expected == self.get_num_files(full_dir)
diff --git a/docker/test/integration/minifi/validators/NoFileOutPutValidator.py b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
index 7383fc0..f4f06f7 100644
--- a/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
+++ b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
@@ -1,7 +1,6 @@
 import logging
+import os
 
-
-from os import listdir
 from .FileOutputValidator import FileOutputValidator
 
 
@@ -9,18 +8,8 @@ class NoFileOutPutValidator(FileOutputValidator):
     """
     Validates if no flowfiles were transferred
     """
-    def __init__(self):
-        self.valid = False
-
-    def validate(self, dir=''):
-
-        if self.valid:
-            return True
-
-        full_dir = self.output_dir + dir
+    def validate(self):
+        full_dir = os.path.join(self.output_dir)
         logging.info("Output folder: %s", full_dir)
-        listing = listdir(full_dir)
-
-        self.valid = not bool(listing)
 
-        return self.valid
+        return os.path.isdir(full_dir) and 0 == self.get_num_files(full_dir)
diff --git a/docker/test/integration/minifi/validators/NumFileRangeValidator.py b/docker/test/integration/minifi/validators/NumFileRangeValidator.py
new file mode 100644
index 0000000..686011b
--- /dev/null
+++ b/docker/test/integration/minifi/validators/NumFileRangeValidator.py
@@ -0,0 +1,21 @@
+import logging
+import os
+
+from .FileOutputValidator import FileOutputValidator
+
+
+class NumFileRangeValidator(FileOutputValidator):
+
+    def __init__(self, min_files, max_files):
+        self.min_files = min_files
+        self.max_files = max_files
+
+    def validate(self):
+        full_dir = os.path.join(self.output_dir)
+        logging.info("Output folder: %s", full_dir)
+
+        if not os.path.isdir(full_dir):
+            return False
+
+        num_files = self.get_num_files(full_dir)
+        return self.min_files <= num_files and num_files <= self.max_files
diff --git a/docker/test/integration/minifi/validators/SingleFileOutputValidator.py b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
index 4f53bf7..b8669bd 100644
--- a/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
@@ -1,9 +1,6 @@
 import logging
 import os
 
-from os import listdir
-from os.path import join
-
 from .FileOutputValidator import FileOutputValidator
 
 
@@ -12,34 +9,14 @@ class SingleFileOutputValidator(FileOutputValidator):
     Validates the content of a single file in the given directory.
     """
 
-    def __init__(self, expected_content, subdir=''):
-        self.valid = False
+    def __init__(self, expected_content):
         self.expected_content = expected_content
-        self.subdir = subdir
 
     def validate(self):
-        self.valid = False
-        full_dir = os.path.join(self.output_dir, self.subdir)
+        full_dir = os.path.join(self.output_dir)
         logging.info("Output folder: %s", full_dir)
 
         if not os.path.isdir(full_dir):
-            return self.valid
-
-        listing = listdir(full_dir)
-        if listing:
-            for listed in listing:
-                logging.info("name:: %s", listed)
-            out_file_name = listing[0]
-            full_path = join(full_dir, out_file_name)
-            if not os.path.isfile(full_path):
-                return self.valid
-
-            with open(full_path, 'r') as out_file:
-                contents = out_file.read()
-                logging.info("dir %s -- name %s", full_dir, out_file_name)
-                logging.info("expected %s -- content %s", self.expected_content, contents)
-
-                if self.expected_content in contents:
-                    self.valid = True
+            return False
 
-        return self.valid
+        return self.get_num_files(full_dir) == 1 and self.num_files_matching_content_in_dir(full_dir, self.expected_content) == 1
diff --git a/docker/test/integration/minifi/validators/SingleOrMultiFileOutputValidator.py b/docker/test/integration/minifi/validators/SingleOrMultiFileOutputValidator.py
new file mode 100644
index 0000000..3c09066
--- /dev/null
+++ b/docker/test/integration/minifi/validators/SingleOrMultiFileOutputValidator.py
@@ -0,0 +1,22 @@
+import logging
+import os
+
+from .FileOutputValidator import FileOutputValidator
+
+
+class SingleOrMultiFileOutputValidator(FileOutputValidator):
+    """
+    Validates the content of a single or multiple files in the given directory.
+    """
+
+    def __init__(self, expected_content):
+        self.expected_content = expected_content
+
+    def validate(self):
+        full_dir = os.path.join(self.output_dir)
+        logging.info("Output folder: %s", full_dir)
+
+        if not os.path.isdir(full_dir):
+            return False
+
+        return 0 < self.num_files_matching_content_in_dir(full_dir, self.expected_content)
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index a78dc51..cb8bce7 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -3,11 +3,12 @@ from minifi.core.RemoteProcessGroup import RemoteProcessGroup
 from minifi.core.SSLContextService import SSLContextService
 from minifi.core.SSL_cert_utils import gen_cert, rsa_gen_key_callback
 
-from minifi.processors.PublishKafka import PublishKafka
-from minifi.processors.PutS3Object import PutS3Object
+from minifi.processors.ConsumeKafka import ConsumeKafka
 from minifi.processors.DeleteS3Object import DeleteS3Object
 from minifi.processors.FetchS3Object import FetchS3Object
 from minifi.processors.PutAzureBlobStorage import PutAzureBlobStorage
+from minifi.processors.PublishKafka import PublishKafka
+from minifi.processors.PutS3Object import PutS3Object
 
 from behave import given, then, when
 from behave.model_describe import ModelDescriptor
@@ -16,6 +17,13 @@ from pytimeparse.timeparse import timeparse
 
 import logging
 import time
+import uuid
+import binascii
+
+from kafka import KafkaProducer
+from confluent_kafka.admin import AdminClient, NewTopic
+from confluent_kafka import Producer
+import socket
 
 
 # Background
@@ -31,9 +39,9 @@ def step_impl(context, processor_type, property, property_value, cluster_name):
     logging.info("Acquiring " + cluster_name)
     cluster = context.test.acquire_cluster(cluster_name)
     processor = locate("minifi.processors." + processor_type + "." + processor_type)()
+    processor.set_name(processor_type)
     if property:
         processor.set_property(property, property_value)
-    processor.set_name(processor_type)
     context.test.add_node(processor)
     # Assume that the first node declared is primary unless specified otherwise
     if cluster.get_flow() is None:
@@ -54,6 +62,11 @@ def step_impl(context, processor_type, cluster_name):
                           format(processor_type=processor_type, property=None, property_value=None, cluster_name=cluster_name))
 
 
+@given("a {processor_type} processor")
+def step_impl(context, processor_type):
+    context.execute_steps("given a {processor_type} processor in the \"{cluster_name}\" flow".format(processor_type=processor_type, cluster_name="primary_cluster"))
+
+
 @given("a set of processors in the \"{cluster_name}\" flow")
 def step_impl(context, cluster_name):
     cluster = context.test.acquire_cluster(cluster_name)
@@ -84,7 +97,6 @@ def step_impl(context, address):
 
 @given("a PutS3Object processor set up to communicate with an s3 server")
 def step_impl(context):
-    # PublishKafka is never the first node of a flow potential cluster-flow setup is omitted
     put_s3 = PutS3Object()
     put_s3.set_name("PutS3Object")
     context.test.add_node(put_s3)
@@ -121,15 +133,71 @@ def step_impl(context):
     context.test.add_node(publish_kafka)
 
 
-@given("the \"{property_name}\" of the {processor_name} processor is set to \"{property_value}\"")
+@given("a kafka producer workflow publishing files placed in \"{directory}\" to a broker exactly once")
+def step_impl(context, directory):
+    context.execute_steps("""
+        given a GetFile processor with the \"Input Directory\" property set to \"{directory}\"
+        and the \"Keep Source File\" property of the GetFile processor is set to \"false\"
+        and a PublishKafka processor set up to communicate with a kafka broker instance
+        and the "success" relationship of the GetFile processor is connected to the PublishKafka""".format(directory=directory))
+
+
+@given("a ConsumeKafka processor set up in a \"{cluster_name}\" flow")
+def step_impl(context, cluster_name):
+    consume_kafka = ConsumeKafka()
+    consume_kafka.set_name("ConsumeKafka")
+    context.test.add_node(consume_kafka)
+    logging.info("Acquiring " + cluster_name)
+    cluster = context.test.acquire_cluster(cluster_name)
+    # Assume that the first node declared is primary unless specified otherwise
+    if cluster.get_flow() is None:
+        cluster.set_name(cluster_name)
+        cluster.set_flow(consume_kafka)
+
+
+@given("the \"{property_name}\" property of the {processor_name} processor is set to \"{property_value}\"")
 def step_impl(context, property_name, processor_name, property_value):
     processor = context.test.get_node_by_name(processor_name)
-    processor.set_property(property_name, property_value)
+    if property_value == "(not set)":
+        processor.unset_property(property_name)
+    else:
+        processor.set_property(property_name, property_value)
+
+
+@given("the \"{property_name}\" property of the {processor_name} processor is set to match {key_attribute_encoding} encoded kafka message key \"{message_key}\"")
+def step_impl(context, property_name, processor_name, key_attribute_encoding, message_key):
+    encoded_key = ""
+    if(key_attribute_encoding.lower() == "hex"):
+        # Hex is presented upper-case to be in sync with NiFi
+        encoded_key = binascii.hexlify(message_key.encode("utf-8")).upper()
+    elif(key_attribute_encoding.lower() == "(not set)"):
+        encoded_key = message_key.encode("utf-8")
+    else:
+        encoded_key = message_key.encode(key_attribute_encoding)
+    logging.info("%s processor is set up to match encoded key \"%s\"", processor_name, encoded_key)
+    filtering = "${kafka.key:equals('" + encoded_key.decode("utf-8") + "')}"
+    logging.info("Filter: \"%s\"", filtering)
+    processor = context.test.get_node_by_name(processor_name)
+    processor.set_property(property_name, filtering)
+
+
+@given("the \"{property_name}\" property of the {processor_name} processor is set to match the attribute \"{attribute_key}\" to \"{attribute_value}\"")
+def step_impl(context, property_name, processor_name, attribute_key, attribute_value):
+    processor = context.test.get_node_by_name(processor_name)
+    if attribute_value == "(not set)":
+        # Ignore filtering
+        processor.set_property(property_name, "true")
+        return
+    filtering = "${" + attribute_key + ":equals('" + attribute_value + "')}"
+    logging.info("Filter: \"%s\"", filtering)
+    logging.info("Key: \"%s\", value: \"%s\"", attribute_key, attribute_value)
+    processor.set_property(property_name, filtering)
 
 
 @given("the scheduling period of the {processor_name} processor is set to \"{sceduling_period}\"")
 def step_impl(context, processor_name, sceduling_period):
     processor = context.test.get_node_by_name(processor_name)
+    processor.set_scheduling_strategy("TIMER_DRIVEN")
     processor.set_scheduling_period(sceduling_period)
 
 
@@ -231,6 +299,8 @@ def step_impl(context, producer_name, consumer_name):
 
 # Kafka setup
 @given("a kafka broker \"{cluster_name}\" is set up in correspondence with the PublishKafka")
+@given("a kafka broker \"{cluster_name}\" is set up in correspondence with the third-party kafka publisher")
+@given("a kafka broker \"{cluster_name}\" is set up in correspondence with the publisher flow")
 def step_impl(context, cluster_name):
     cluster = context.test.acquire_cluster(cluster_name)
     cluster.set_name(cluster_name)
@@ -257,9 +327,29 @@ def step_impl(context, cluster_name):
     cluster.set_flow(None)
 
 
+@given("the kafka broker \"{cluster_name}\" is started")
+def step_impl(context, cluster_name):
+    context.test.start_single_cluster(cluster_name)
+
+
+@given("the topic \"{topic_name}\" is initialized on the kafka broker")
+def step_impl(context, topic_name):
+    admin = AdminClient({'bootstrap.servers': "localhost:29092"})
+    new_topics = [NewTopic(topic_name, num_partitions=3, replication_factor=1)]
+    futures = admin.create_topics(new_topics)
+    # Block until the topic is created
+    for topic, future in futures.items():
+        try:
+            future.result()
+            print("Topic {} created".format(topic))
+        except Exception as e:
+            print("Failed to create topic {}: {}".format(topic, e))
+
+
 @when("the MiNiFi instance starts up")
 @when("both instances start up")
 @when("all instances start up")
+@when("all other processes start up")
 def step_impl(context):
     context.test.start()
 
@@ -270,9 +360,124 @@ def step_impl(context, content, file_name, path, seconds):
     context.test.add_test_data(path, content, file_name)
 
 
+# Kafka
+def delivery_report(err, msg):
+    if err is not None:
+        logging.info('Message delivery failed: {}'.format(err))
+    else:
+        logging.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
+
+
+@when("a message with content \"{content}\" is published to the \"{topic_name}\" topic")
+def step_impl(context, content, topic_name):
+    producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()})
+    producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report)
+    producer.flush(10)
+
+
+# Used for testing transactional message consumption
+@when("the publisher performs a {transaction_type} transaction publishing to the \"{topic_name}\" topic these messages: {messages}")
+def step_impl(context, transaction_type, topic_name, messages):
+    producer = Producer({"bootstrap.servers": "localhost:29092", "transactional.id": "1001"})
+    producer.init_transactions()
+    logging.info("Transaction type: %s", transaction_type)
+    logging.info("Messages: %s", messages.split(", "))
+    if transaction_type == "SINGLE_COMMITTED_TRANSACTION":
+        producer.begin_transaction()
+        for content in messages.split(", "):
+            producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report)
+        producer.commit_transaction()
+        producer.flush(10)
+    elif transaction_type == "TWO_SEPARATE_TRANSACTIONS":
+        for content in messages.split(", "):
+            producer.begin_transaction()
+            producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report)
+            producer.commit_transaction()
+        producer.flush(10)
+    elif transaction_type == "NON_COMMITTED_TRANSACTION":
+        producer.begin_transaction()
+        for content in messages.split(", "):
+            producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report)
+        producer.flush(10)
+    elif transaction_type == "CANCELLED_TRANSACTION":
+        producer.begin_transaction()
+        for content in messages.split(", "):
+            producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report)
+        producer.flush(10)
+        producer.abort_transaction()
+    else:
+        raise Exception("Unknown transaction type.")
+
+
+@when("a message with content \"{content}\" is published to the \"{topic_name}\" topic with key \"{message_key}\"")
+def step_impl(context, content, topic_name, message_key):
+    producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()})
+    # Asynchronously produce a message, the delivery report callback
+    # will be triggered from poll() above, or flush() below, when the message has
+    # been successfully delivered or failed permanently.
+    producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report, key=message_key.encode("utf-8"))
+    # Wait for any outstanding messages to be delivered and delivery report
+    # callbacks to be triggered.
+    producer.flush(10)
+
+
+@when("{number_of_messages} kafka messages are sent to the topic \"{topic_name}\"")
+def step_impl(context, number_of_messages, topic_name):
+    producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()})
+    for i in range(0, int(number_of_messages)):
+        producer.produce(topic_name, str(uuid.uuid4()).encode("utf-8"))
+    producer.flush(10)
+
+
+@when("a message with content \"{content}\" is published to the \"{topic_name}\" topic with headers \"{semicolon_separated_headers}\"")
+def step_impl(context, content, topic_name, semicolon_separated_headers):
+    # Confluent kafka does not support multiple headers with same key, another API must be used here.
+    headers = []
+    for header in semicolon_separated_headers.split(";"):
+        kv = header.split(":")
+        headers.append((kv[0].strip(), kv[1].strip().encode("utf-8")))
+    producer = KafkaProducer(bootstrap_servers='localhost:29092')
+    future = producer.send(topic_name, content.encode("utf-8"), headers=headers)
+    assert future.get(timeout=60)
+
+
+@when("the Kafka consumer is registered in kafka broker \"{cluster_name}\"")
+def step_impl(context, cluster_name):
+    context.test.wait_for_kafka_consumer_to_be_registered(cluster_name)
+
+
 @then("a flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration):
-    context.test.check_for_file_with_content_generated(content, timeparse(duration))
+    context.test.check_for_single_file_with_content_generated(content, timeparse(duration))
+
+
+@then("at least one flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
+def step_impl(context, content, duration):
+    context.test.check_for_at_least_one_file_with_content_generated(content, timeparse(duration))
+
+
+@then("{num_flowfiles} flowfiles are placed in the monitored directory in less than {duration}")
+def step_impl(context, num_flowfiles, duration):
+    if num_flowfiles == 0:
+        context.execute_steps("""no files are placed in the monitored directory in {duration} of running time""".format(duration=duration))
+        return
+    context.test.check_for_num_files_generated(int(num_flowfiles), timeparse(duration))
+
+
+@then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are placed in the monitored directory in less than {duration}")
+def step_impl(context, content_1, content_2, duration):
+    context.test.check_for_multiple_files_generated(2, timeparse(duration), [content_1, content_2])
+
+
+@then("flowfiles with these contents are placed in the monitored directory in less than {duration}: \"{contents}\"")
+def step_impl(context, duration, contents):
+    contents_arr = contents.split(",")
+    context.test.check_for_multiple_files_generated(len(contents_arr), timeparse(duration), contents_arr)
+
+
+@then("after a wait of {duration}, at least {lower_bound:d} and at most {upper_bound:d} flowfiles are produced and placed in the monitored directory")
+def step_impl(context, lower_bound, upper_bound, duration):
+    context.test.check_for_num_file_range_generated(lower_bound, upper_bound, timeparse(duration))
 
 
 @then("{number_of_files:d} flowfiles are placed in the monitored directory in {duration}")
@@ -283,7 +488,7 @@ def step_impl(context, number_of_files, duration):
 
 @then("at least one empty flowfile is placed in the monitored directory in less than {duration}")
 def step_impl(context, duration):
-    context.test.check_for_multiple_empty_files_generated(timeparse(duration))
+    context.test.check_for_an_empty_file_generated(timeparse(duration))
 
 
 @then("no files are placed in the monitored directory in {duration} of running time")
diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp
index 38f7f99..a79e7b6 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -78,7 +78,8 @@ core::Property ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("T
   ->build());
 
 core::Property ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name Format")
-  ->withDescription("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression.")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression. "
+                    "Using regular expressions does not automatically discover Kafka topics created after the processor started.")
   ->withAllowableValues<std::string>({TOPIC_FORMAT_NAMES, TOPIC_FORMAT_PATTERNS})
   ->withDefaultValue(TOPIC_FORMAT_NAMES)
   ->isRequired(true)
@@ -166,7 +167,7 @@ core::Property ConsumeKafka::SessionTimeout(core::PropertyBuilder::createPropert
   ->withDefaultValue<core::TimePeriodValue>("60 seconds")
   ->build());
 
-const core::Relationship ConsumeKafka::Success("success", "Incoming kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.");
+const core::Relationship ConsumeKafka::Success("success", "Incoming Kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.");
 
 void ConsumeKafka::initialize() {
   setSupportedProperties({
@@ -318,10 +319,12 @@ void ConsumeKafka::configure_new_connection(const core::ProcessContext& context)
   rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb);
 
   // Uncomment this for librdkafka debug logs:
-  // setKafkaConfigurationField(conf_.get(), "debug", "all");
+  // logger_->log_info("Enabling all debug logs for kafka consumer.");
+  // setKafkaConfigurationField(*conf_, "debug", "all");
 
   setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_);
-  setKafkaConfigurationField(*conf_, "auto.offset.reset", "latest");
+  setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true");
+  setKafkaConfigurationField(*conf_, "auto.offset.reset", offset_reset_);
   setKafkaConfigurationField(*conf_, "enable.auto.commit", "false");
   setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false");
   setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ? "read_committed" : "read_uncommitted");
@@ -359,25 +362,6 @@ void ConsumeKafka::configure_new_connection(const core::ProcessContext& context)
   if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) {
     logger_->log_error("rd_kafka_poll_set_consumer error %d: %s", poll_set_consumer_response, rd_kafka_err2str(poll_set_consumer_response));
   }
-
-  // There is no rd_kafka_seek alternative for rd_kafka_topic_partition_list_t, only rd_kafka_topic_t
-  // rd_kafka_topic_partition_list_set_offset should reset the offsets to the latest (or whatever is set in the config),
-  // Also, rd_kafka_committed should also fetch and set latest the latest offset
-  // In reality, neither of them seem to work (not even with calling rd_kafka_position())
-  logger_->log_info("Resetting offset manually.");
-  while (true) {
-    std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>
-        message_wrapper{ rd_kafka_consumer_poll(consumer_.get(), max_poll_time_milliseconds_.count()), utils::rd_kafka_message_deleter() };
-
-    if (!message_wrapper || RD_KAFKA_RESP_ERR_NO_ERROR != message_wrapper->err) {
-      break;
-    }
-    utils::print_kafka_message(*message_wrapper, *logger_);
-    // Commit offsets on broker for the provided list of partitions
-    logger_->log_info("Committing offset: %" PRId64 ".", message_wrapper->offset);
-    rd_kafka_commit_message(consumer_.get(), message_wrapper.get(), /* async = */ 0);
-  }
-  logger_->log_info("Done resetting offset manually.");
 }
 
 std::string ConsumeKafka::extract_message(const rd_kafka_message_t& rkmessage) const {