You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/03/03 15:46:28 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1734 Add integration test for Kubernetes log collection

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f811c79  MINIFICPP-1734 Add integration test for Kubernetes log collection
f811c79 is described below

commit f811c79c5b6df20ecb5eb2cbdf0f1ad2c77df328
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Thu Mar 3 16:44:55 2022 +0100

    MINIFICPP-1734 Add integration test for Kubernetes log collection
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1270
---
 .github/workflows/ci.yml                           |   2 +-
 cmake/DockerConfig.cmake                           |   1 +
 docker/Dockerfile                                  |   3 +
 .../test/integration/features/kubernetes.feature   |  58 ++++++++++
 .../controllers/KubernetesControllerService.py     |  28 +++++
 docker/test/integration/minifi/core/Container.py   |  21 ++++
 .../integration/minifi/core/DockerTestCluster.py   |  26 +++++
 docker/test/integration/minifi/core/ImageStore.py  |  27 +++++
 docker/test/integration/minifi/core/KindProxy.py   | 124 +++++++++++++++++++++
 docker/test/integration/minifi/core/LogSource.py   |  19 ++++
 .../minifi/core/MinifiAsPodInKubernetesCluster.py  |  59 ++++++++++
 .../integration/minifi/core/MinifiContainer.py     |  20 +++-
 .../minifi/core/SingleNodeDockerCluster.py         |  19 ++++
 .../kubernetes/minifi-conf/minifi-log.properties   |   3 +
 .../kubernetes/pods-etc/daemon.namespace.yml       |   6 +
 .../kubernetes/pods-etc/hello-world-one.pod.yml    |  12 ++
 .../kubernetes/pods-etc/hello-world-two.pod.yml    |  12 ++
 .../kubernetes/pods-etc/log-collector.pod.yml      |  29 +++++
 .../pods-etc/namespace-reader.clusterrole.yml      |   9 ++
 .../namespace-reader.clusterrolebinding.yml        |  12 ++
 .../kubernetes/pods-etc/pod-reader.clusterrole.yml |   9 ++
 .../pods-etc/pod-reader.clusterrolebinding.yml     |  12 ++
 docker/test/integration/steps/steps.py             | 105 +++++++++++------
 23 files changed, 581 insertions(+), 35 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 60628c6..1c2e75e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -202,7 +202,7 @@ jobs:
           if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
           mkdir build
           cd build
-          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
           make docker
       - id: install_deps
         run: |
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index dd05f65..226439f 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -47,6 +47,7 @@ add_custom_target(
         -c ENABLE_SPLUNK=${ENABLE_SPLUNK}
         -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
+        -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
         -c DISABLE_CURL=${DISABLE_CURL}
         -c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
         -c DISABLE_CIVET=${DISABLE_CIVET}
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 9a8f2e1..3ec9856 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -62,6 +62,7 @@ ARG DISABLE_BZIP2=OFF
 ARG ENABLE_SCRIPTING=OFF
 ARG DISABLE_PYTHON_SCRIPTING=
 ARG ENABLE_LUA_SCRIPTING=
+ARG ENABLE_KUBERNETES=OFF
 ARG DISABLE_CONTROLLER=OFF
 ARG CMAKE_BUILD_TYPE=Release
 
@@ -121,6 +122,7 @@ RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL="${ENABLE_ALL}" -DENABL
     -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
+    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" \
     -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
     make -j "$(nproc)" package && \
     tar -xzvf "${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}.tar.gz" -C "${MINIFI_BASE_DIR}"
@@ -144,6 +146,7 @@ ARG ENABLE_BUSTACHE=OFF
 ARG ENABLE_SCRIPTING=OFF
 ARG DISABLE_PYTHON_SCRIPTING=
 ARG ENABLE_LUA_SCRIPTING=
+ARG ENABLE_KUBERNETES=OFF
 
 # Add testing repo for rocksdb
 RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >> /etc/apk/repositories
diff --git a/docker/test/integration/features/kubernetes.feature b/docker/test/integration/features/kubernetes.feature
new file mode 100644
index 0000000..af7ac29
--- /dev/null
+++ b/docker/test/integration/features/kubernetes.feature
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: TailFile can collect logs from Kubernetes pods
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: Collect all logs from the default namespace
+    Given a TailFile processor in a Kubernetes cluster
+    And the "tail-mode" property of the TailFile processor is set to "Multiple file"
+    And the "tail-base-directory" property of the TailFile processor is set to "/var/log/pods/${namespace}_${pod}_${uid}/${container}"
+    And the "File to Tail" property of the TailFile processor is set to ".*\.log"
+    And the "Lookup frequency" property of the TailFile processor is set to "1s"
+    And the TailFile processor has an Attribute Provider Service which is a Kubernetes Controller Service
+    And a PutFile processor in the Kubernetes cluster
+    And the "Directory" property of the PutFile processor is set to "/tmp/output"
+    And the "success" relationship of the TailFile processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then two flowfiles with the contents "Hello World!" and "Hello again, World!" are placed in the monitored directory in less than 30 seconds
+
+  Scenario: Collect logs from selected pods
+    Given a TailFile processor in a Kubernetes cluster
+    And the "tail-mode" property of the TailFile processor is set to "Multiple file"
+    And the "tail-base-directory" property of the TailFile processor is set to "/var/log/pods/${namespace}_${pod}_${uid}/${container}"
+    And the "File to Tail" property of the TailFile processor is set to ".*\.log"
+    And the "Lookup frequency" property of the TailFile processor is set to "1s"
+    And the TailFile processor has an Attribute Provider Service which is a Kubernetes Controller Service with the "Pod Name Filter" property set to ".*one"
+    And a PutFile processor in the Kubernetes cluster
+    And the "Directory" property of the PutFile processor is set to "/tmp/output"
+    And the "success" relationship of the TailFile processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then one flowfile with the contents "Hello World!" is placed in the monitored directory in less than 30 seconds
+
+  Scenario: Collect logs from selected containers
+    Given a TailFile processor in a Kubernetes cluster
+    And the "tail-mode" property of the TailFile processor is set to "Multiple file"
+    And the "tail-base-directory" property of the TailFile processor is set to "/var/log/pods/${namespace}_${pod}_${uid}/${container}"
+    And the "File to Tail" property of the TailFile processor is set to ".*\.log"
+    And the "Lookup frequency" property of the TailFile processor is set to "1s"
+    And the TailFile processor has an Attribute Provider Service which is a Kubernetes Controller Service with the "Container Name Filter" property set to "echo-[^o].."
+    And a PutFile processor in the Kubernetes cluster
+    And the "Directory" property of the PutFile processor is set to "/tmp/output"
+    And the "success" relationship of the TailFile processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then one flowfile with the contents "Hello again, World!" is placed in the monitored directory in less than 30 seconds
diff --git a/docker/test/integration/minifi/controllers/KubernetesControllerService.py b/docker/test/integration/minifi/controllers/KubernetesControllerService.py
new file mode 100644
index 0000000..5b024d8
--- /dev/null
+++ b/docker/test/integration/minifi/controllers/KubernetesControllerService.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.ControllerService import ControllerService
+
+
+class KubernetesControllerService(ControllerService):
+    def __init__(self, name=None, properties=None):
+        super(KubernetesControllerService, self).__init__(name=name)
+
+        self.service_class = 'KubernetesControllerService'
+
+        if properties is not None:
+            for key, value in properties.items():
+                self.properties[key] = value
diff --git a/docker/test/integration/minifi/core/Container.py b/docker/test/integration/minifi/core/Container.py
index 1457230..700dddb 100644
--- a/docker/test/integration/minifi/core/Container.py
+++ b/docker/test/integration/minifi/core/Container.py
@@ -1,6 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 import docker
 import logging
 
+from .LogSource import LogSource
+
 
 class Container:
     def __init__(self, name, engine, vols, network, image_store, command):
@@ -41,6 +59,9 @@ class Container:
     def deploy(self):
         raise NotImplementedError()
 
+    def log_source(self):
+        return LogSource.FROM_DOCKER_CONTAINER
+
     def get_startup_finished_log_entry(self):
         raise NotImplementedError()
 
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index ed5983a..be2a32b 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 import json
 import logging
 import subprocess
@@ -9,6 +25,7 @@ import tarfile
 import io
 import tempfile
 
+from .LogSource import LogSource
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
 from .utils import retry_check
 from azure.storage.blob import BlobServiceClient
@@ -34,6 +51,15 @@ class DockerTestCluster(SingleNodeDockerCluster):
         return encoding
 
     def get_app_log(self, container_name):
+        log_source = self.containers[container_name].log_source()
+        if log_source == LogSource.FROM_DOCKER_CONTAINER:
+            return self.__get_app_log_from_docker_container(container_name)
+        elif log_source == LogSource.FROM_GET_APP_LOG_METHOD:
+            return self.containers[container_name].get_app_log()
+        else:
+            raise Exception("Unexpected log source '%s'" % log_source)
+
+    def __get_app_log_from_docker_container(self, container_name):
         try:
             container = self.client.containers.get(container_name)
         except Exception:
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index c602344..28e003c 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 from .NifiContainer import NifiContainer
 from .MinifiContainer import MinifiContainer
 import logging
@@ -29,6 +45,8 @@ class ImageStore:
 
         if container_engine == "minifi-cpp":
             image = self.__build_minifi_cpp_image()
+        elif container_engine == "minifi-cpp-in-kubernetes":
+            image = self.__build_simple_minifi_cpp_image_with_root()
         elif container_engine == "http-proxy":
             image = self.__build_http_proxy_image()
         elif container_engine == "nifi":
@@ -84,6 +102,15 @@ class ImageStore:
 
         return self.__build_image(dockerfile)
 
+    def __build_simple_minifi_cpp_image_with_root(self):
+        dockerfile = dedent(r"""\
+                FROM {base_image}
+                USER root
+                CMD ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml /tmp/minifi_config/minifi-log.properties ./conf/ && ./bin/minifi.sh run"]
+                """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION))
+
+        return self.__build_image(dockerfile)
+
     def __build_http_proxy_image(self):
         dockerfile = dedent("""FROM {base_image}
                 RUN apt -y update && apt install -y apache2-utils
diff --git a/docker/test/integration/minifi/core/KindProxy.py b/docker/test/integration/minifi/core/KindProxy.py
new file mode 100644
index 0000000..d304cc8
--- /dev/null
+++ b/docker/test/integration/minifi/core/KindProxy.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import docker
+import glob
+import os
+import stat
+import subprocess
+import time
+from textwrap import dedent
+
+
+class KindProxy:
+    def __init__(self, temp_directory, resources_directory, image_name, image_repository, image_tag):
+        self.temp_directory = temp_directory
+        self.resources_directory = resources_directory
+        self.image_name = image_name
+        self.image_repository = image_repository
+        self.image_tag = image_tag
+
+        self.kind_binary_path = os.path.join(self.temp_directory, 'kind')
+        self.kind_config_path = os.path.join(self.temp_directory, 'kind-config.yml')
+        self.__download_kind()
+        self.docker_client = docker.from_env()
+
+    def __download_kind(self):
+        if subprocess.run(['curl', '-Lo', self.kind_binary_path, 'https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64']).returncode != 0:
+            raise Exception("Could not download kind")
+        os.chmod(self.kind_binary_path, stat.S_IXUSR)
+
+    def create_config(self, volumes):
+        kind_config = dedent("""\
+                apiVersion: kind.x-k8s.io/v1alpha4
+                kind: Cluster
+                nodes:
+                   - role: control-plane
+                """)
+
+        if volumes:
+            kind_config += "     extraMounts:\n"
+
+        for host_path, container_path in volumes.items():
+            kind_config += "      - hostPath: {path}\n".format(path=host_path)
+            kind_config += "        containerPath: {path}\n".format(path=container_path['bind'])
+            if container_path['mode'] != 'rw':
+                kind_config += "        readOnly: true\n"
+
+        with open(self.kind_config_path, 'wb') as config_file:
+            config_file.write(kind_config.encode('utf-8'))
+
+    def start_cluster(self):
+        subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
+
+        if subprocess.run([self.kind_binary_path, 'create', 'cluster', '--config=' + self.kind_config_path]).returncode != 0:
+            raise Exception("Could not start the kind cluster")
+
+    def load_docker_image(self, image_store):
+        image = image_store.get_image(self.image_name)
+        image.tag(repository=self.image_repository, tag=self.image_tag)
+
+        if subprocess.run([self.kind_binary_path, 'load', 'docker-image', self.image_repository + ':' + self.image_tag]).returncode != 0:
+            raise Exception("Could not load the %s docker image (%s:%s) into the kind cluster" % (self.image_name, self.image_repository, self.image_tag))
+
+    def create_objects(self):
+        self.__wait_for_default_service_account('default')
+        namespaces = self.__create_objects_of_type(self.resources_directory, 'namespace')
+        for namespace in namespaces:
+            self.__wait_for_default_service_account(namespace)
+
+        self.__create_objects_of_type(self.resources_directory, 'pod')
+        self.__create_objects_of_type(self.resources_directory, 'clusterrole')
+        self.__create_objects_of_type(self.resources_directory, 'clusterrolebinding')
+
+    def __wait_for_default_service_account(self, namespace):
+        for _ in range(120):
+            (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
+            if code == 0:
+                return
+            time.sleep(1)
+        raise Exception("Default service account for namespace '%s' not found" % namespace)
+
+    def __create_objects_of_type(self, directory, type):
+        found_objects = []
+        for full_file_name in glob.iglob(os.path.join(directory, f'*.{type}.yml')):
+            file_name = os.path.basename(full_file_name)
+            file_name_in_container = os.path.join('/var/tmp', file_name)
+            self.__copy_file_to_container(full_file_name, 'kind-control-plane', file_name_in_container)
+
+            (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
+            if code != 0:
+                raise Exception("Could not create kubernetes object from file '%s'" % full_file_name)
+
+            object_name = file_name.replace(f'.{type}.yml', '')
+            found_objects.append(object_name)
+        return found_objects
+
+    def __copy_file_to_container(self, host_file, container_name, container_file):
+        if subprocess.run(['docker', 'cp', host_file, container_name + ':' + container_file]).returncode != 0:
+            raise Exception("Could not copy file '%s' into container '%s' as '%s'" % (host_file, container_name, container_file))
+
+    def get_logs(self, namespace, pod_name):
+        (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'logs', pod_name])
+        if code == 0:
+            return output
+        else:
+            return None
+
+    def cleanup(self):
+        # cleanup gets called multiple times, also after the temp directories had been removed
+        if os.path.exists(self.kind_binary_path):
+            subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
diff --git a/docker/test/integration/minifi/core/LogSource.py b/docker/test/integration/minifi/core/LogSource.py
new file mode 100644
index 0000000..c5ff698
--- /dev/null
+++ b/docker/test/integration/minifi/core/LogSource.py
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class LogSource:
+    FROM_DOCKER_CONTAINER = "from docker container"
+    FROM_GET_APP_LOG_METHOD = "from get_app_log() method"
diff --git a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
new file mode 100644
index 0000000..43132b2
--- /dev/null
+++ b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import os
+import shutil
+
+from .KindProxy import KindProxy
+from .LogSource import LogSource
+from .MinifiContainer import MinifiContainer
+
+
+class MinifiAsPodInKubernetesCluster(MinifiContainer):
+    def __init__(self, config_dir, name, vols, network, image_store, command=None):
+        super().__init__(config_dir, name, vols, network, image_store, command)
+
+        test_dir = os.environ['TEST_DIRECTORY']
+        shutil.copy(os.path.join(test_dir, 'resources', 'kubernetes', 'minifi-conf', 'minifi-log.properties'), self.config_dir)
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Setting up container: %s', self.name)
+
+        self._create_config()
+
+        resources_directory = os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc')
+
+        self.kind = KindProxy(self.config_dir, resources_directory, 'minifi-cpp-in-kubernetes', 'minifi-kubernetes-test', 'v1')
+        self.kind.create_config(self.vols)
+        self.kind.start_cluster()
+        self.kind.load_docker_image(self.image_store)
+        self.kind.create_objects()
+
+        logging.info('Finished setting up container: %s', self.name)
+
+    def log_source(self):
+        return LogSource.FROM_GET_APP_LOG_METHOD
+
+    def get_app_log(self):
+        return 'OK', self.kind.get_logs('daemon', 'log-collector')
+
+    def cleanup(self):
+        logging.info('Cleaning up container: %s', self.name)
+        self.kind.cleanup()
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
index 8b971bf..770bf75 100644
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiContainer.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 import os
 import logging
 from .FlowContainer import FlowContainer
@@ -19,7 +35,7 @@ class MinifiContainer(FlowContainer):
     def get_log_file_path(self):
         return MinifiContainer.MINIFI_ROOT + '/logs/minifi-app.log'
 
-    def __create_config(self):
+    def _create_config(self):
         serializer = Minifi_flow_yaml_serializer()
         test_flow_yaml = serializer.serialize(self.start_nodes)
         logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
@@ -31,7 +47,7 @@ class MinifiContainer(FlowContainer):
             return
 
         logging.info('Creating and running minifi docker container...')
-        self.__create_config()
+        self._create_config()
 
         self.client.containers.run(
             self.image_store.get_image(self.get_engine()),
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index acc375c..d1cbb6b 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 import docker
 import logging
 import uuid
@@ -14,6 +30,7 @@ from .PostgreSQLServerContainer import PostgreSQLServerContainer
 from .MqttBrokerContainer import MqttBrokerContainer
 from .OPCUAServerContainer import OPCUAServerContainer
 from .SplunkContainer import SplunkContainer
+from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
 
 
 class SingleNodeDockerCluster(Cluster):
@@ -68,6 +85,8 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, NifiContainer(self.data_directories["nifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'minifi-cpp':
             return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+        elif engine == 'kubernetes':
+            return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'kafka-broker':
             if 'zookeeper' not in self.containers:
                 self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store, command))
diff --git a/docker/test/integration/resources/kubernetes/minifi-conf/minifi-log.properties b/docker/test/integration/resources/kubernetes/minifi-conf/minifi-log.properties
new file mode 100644
index 0000000..f350eb8
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/minifi-conf/minifi-log.properties
@@ -0,0 +1,3 @@
+spdlog.pattern=[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v
+appender.stdout=stdout
+logger.root=INFO,stdout
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/daemon.namespace.yml b/docker/test/integration/resources/kubernetes/pods-etc/daemon.namespace.yml
new file mode 100644
index 0000000..f628b21
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/daemon.namespace.yml
@@ -0,0 +1,6 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: daemon
+  labels:
+    name: daemon
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/hello-world-one.pod.yml b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-one.pod.yml
new file mode 100644
index 0000000..22a6634
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-one.pod.yml
@@ -0,0 +1,12 @@
+apiVersion: v1
+kind: Pod
+metadata:
+  name: hello-world-one
+spec:
+  containers:
+  - name: echo-one
+    image: busybox
+    args:
+    - /bin/sh
+    - -c
+    - 'echo "Hello World!"; sleep 10000'
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/hello-world-two.pod.yml b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-two.pod.yml
new file mode 100644
index 0000000..14b0c22
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-two.pod.yml
@@ -0,0 +1,12 @@
+apiVersion: v1
+kind: Pod
+metadata:
+  name: hello-world-two
+spec:
+  containers:
+  - name: echo-two
+    image: busybox
+    args:
+    - /bin/sh
+    - -c
+    - 'echo "Hello again, World!"; sleep 10000'
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/log-collector.pod.yml b/docker/test/integration/resources/kubernetes/pods-etc/log-collector.pod.yml
new file mode 100644
index 0000000..2ea287c
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/log-collector.pod.yml
@@ -0,0 +1,29 @@
+apiVersion: v1
+kind: Pod
+metadata:
+  namespace: daemon
+  name: log-collector
+spec:
+  containers:
+  - name: minifi
+    image: minifi-kubernetes-test:v1
+    imagePullPolicy: Never
+    volumeMounts:
+    - name: var-log-pods
+      mountPath: /var/log/pods
+      readOnly: true
+    - name: tmp-minifi-config
+      mountPath: /tmp/minifi_config
+      readOnly: true
+    - name: tmp-output
+      mountPath: /tmp/output
+  volumes:
+  - name: var-log-pods
+    hostPath:
+      path: /var/log/pods
+  - name: tmp-minifi-config
+    hostPath:
+      path: /tmp/minifi_config
+  - name: tmp-output
+    hostPath:
+      path: /tmp/output
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrole.yml b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrole.yml
new file mode 100644
index 0000000..863439a
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrole.yml
@@ -0,0 +1,9 @@
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  namespace: default
+  name: namespace-reader
+rules:
+- apiGroups: [""]  # "" indicates the core API group
+  resources: ["namespaces"]
+  verbs: ["get", "watch", "list"]
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrolebinding.yml b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrolebinding.yml
new file mode 100644
index 0000000..98de6c4
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrolebinding.yml
@@ -0,0 +1,12 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: namespace-reader-binding
+subjects:
+- kind: ServiceAccount
+  name: default
+  namespace: daemon
+roleRef:
+  kind: ClusterRole
+  name: namespace-reader
+  apiGroup: rbac.authorization.k8s.io
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrole.yml b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrole.yml
new file mode 100644
index 0000000..a832314
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrole.yml
@@ -0,0 +1,9 @@
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  namespace: default
+  name: pod-reader
+rules:
+- apiGroups: [""]  # "" indicates the core API group
+  resources: ["pods"]
+  verbs: ["get", "watch", "list"]
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrolebinding.yml b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrolebinding.yml
new file mode 100644
index 0000000..1d32a19
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrolebinding.yml
@@ -0,0 +1,12 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: pod-reader-binding
+subjects:
+- kind: ServiceAccount
+  name: default
+  namespace: daemon
+roleRef:
+  kind: ClusterRole
+  name: pod-reader
+  apiGroup: rbac.authorization.k8s.io
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 4ac41e4..cc56a70 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 from minifi.core.FileSystemObserver import FileSystemObserver
 from minifi.core.RemoteProcessGroup import RemoteProcessGroup
 from minifi.core.SSL_cert_utils import gen_cert, rsa_gen_key_callback, make_ca, make_cert, dump_certificate, dump_privatekey
@@ -5,6 +21,7 @@ from minifi.core.Funnel import Funnel
 
 from minifi.controllers.SSLContextService import SSLContextService
 from minifi.controllers.ODBCService import ODBCService
+from minifi.controllers.KubernetesControllerService import KubernetesControllerService
 
 from behave import given, then, when
 from behave.model_describe import ModelDescriptor
@@ -29,63 +46,56 @@ def step_impl(context, directory):
     context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
 
 
-# MiNiFi cluster setups
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
-def step_impl(context, processor_type, processor_name, property, property_value, minifi_container_name):
-    container = context.test.acquire_container(minifi_container_name)
+def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'):
+    container = context.test.acquire_container(container_name, engine)
     processor = locate("minifi.processors." + processor_type + "." + processor_type)()
     processor.set_name(processor_name)
-    if property:
-        processor.set_property(property, property_value)
+    if property_name is not None:
+        processor.set_property(property_name, property_value)
     context.test.add_node(processor)
     # Assume that the first node declared is primary unless specified otherwise
     if not container.get_start_nodes():
         container.add_start_node(processor)
 
 
-@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
-@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
-def step_impl(context, processor_type, property, property_value, minifi_container_name):
-    context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow".
-                          format(processor_type=processor_type, property=property, property_value=property_value, minifi_container_name=minifi_container_name, processor_name=processor_type))
+# MiNiFi cluster setups
+@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
+@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
+def step_impl(context, processor_type, processor_name, property_name, property_value, minifi_container_name):
+    __create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name)
+
+
+@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
+@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
+def step_impl(context, processor_type, property_name, property_value, minifi_container_name):
+    __create_processor(context, processor_type, processor_type, property_name, property_value, minifi_container_name)
 
 
-@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\"")
-def step_impl(context, processor_type, property, property_value):
-    context.execute_steps("given a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow".
-                          format(processor_type=processor_type, property=property, property_value=property_value, minifi_container_name="minifi-cpp-flow"))
+@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\"")
+def step_impl(context, processor_type, property_name, property_value):
+    __create_processor(context, processor_type, processor_type, property_name, property_value, "minifi-cpp-flow")
 
 
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\"")
-def step_impl(context, processor_type, property, property_value, processor_name):
-    context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow".
-                          format(processor_type=processor_type, property=property, property_value=property_value, minifi_container_name="minifi-cpp-flow", processor_name=processor_name))
+@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\"")
+def step_impl(context, processor_type, property_name, property_value, processor_name):
+    __create_processor(context, processor_type, processor_name, property_name, property_value, "minifi-cpp-flow")
 
 
 @given("a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow")
 def step_impl(context, processor_type, processor_name, minifi_container_name):
-    container = context.test.acquire_container(minifi_container_name)
-    processor = locate("minifi.processors." + processor_type + "." + processor_type)()
-    processor.set_name(processor_name)
-    context.test.add_node(processor)
-    # Assume that the first node declared is primary unless specified otherwise
-    if not container.get_start_nodes():
-        container.add_start_node(processor)
+    __create_processor(context, processor_type, processor_name, None, None, minifi_container_name)
 
 
 @given("a {processor_type} processor with the name \"{processor_name}\"")
 def step_impl(context, processor_type, processor_name):
-    context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow".
-                          format(processor_type=processor_type, processor_name=processor_name, minifi_container_name="minifi-cpp-flow"))
+    __create_processor(context, processor_type, processor_name, None, None, "minifi-cpp-flow")
 
 
 @given("a {processor_type} processor in the \"{minifi_container_name}\" flow")
 @given("a {processor_type} processor in a \"{minifi_container_name}\" flow")
 @given("a {processor_type} processor set up in a \"{minifi_container_name}\" flow")
 def step_impl(context, processor_type, minifi_container_name):
-    context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow".
-                          format(processor_type=processor_type, processor_name=processor_type, minifi_container_name=minifi_container_name))
+    __create_processor(context, processor_type, processor_type, None, None, minifi_container_name)
 
 
 @given("a {processor_type} processor")
@@ -96,7 +106,13 @@ def step_impl(context, processor_type, minifi_container_name):
 @given("a {processor_type} processor set up to communicate with an MQTT broker instance")
 @given("a {processor_type} processor set up to communicate with the Splunk HEC instance")
 def step_impl(context, processor_type):
-    context.execute_steps("given a {processor_type} processor in the \"{minifi_container_name}\" flow".format(processor_type=processor_type, minifi_container_name="minifi-cpp-flow"))
+    __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow")
+
+
+@given("a {processor_type} processor in a Kubernetes cluster")
+@given("a {processor_type} processor in the Kubernetes cluster")
+def step_impl(context, processor_type):
+    __create_processor(context, processor_type, processor_type, None, None, "kubernetes", "kubernetes")
 
 
 @given("a set of processors in the \"{minifi_container_name}\" flow")
@@ -309,6 +325,26 @@ def step_impl(context, producer_name):
     producer.set_property("SSL Context Service", ssl_context_service.name)
 
 
+# Kubernetes
+def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
+    kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
+    processor = context.test.get_node_by_name(processor_name)
+    processor.controller_services.append(kubernetes_controller_service)
+    processor.set_property(service_property_name, kubernetes_controller_service.name)
+
+
+@given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service")
+@given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service")
+def step_impl(context, processor_name, service_property_name):
+    __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {})
+
+
+@given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"")
+@given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"")
+def step_impl(context, processor_name, service_property_name, property_name, property_value):
+    __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {property_name: property_value})
+
+
 # Kafka setup
 @given("a kafka broker is set up in correspondence with the PublishKafka")
 @given("a kafka broker is set up in correspondence with the third-party kafka publisher")
@@ -553,6 +589,11 @@ def step_impl(context, num_flowfiles, duration):
     context.test.check_for_num_files_generated(int(num_flowfiles), timeparse(duration))
 
 
+@then("one flowfile with the contents \"{content}\" is placed in the monitored directory in less than {duration}")
+def step_impl(context, content, duration):
+    context.test.check_for_multiple_files_generated(1, timeparse(duration), [content])
+
+
 @then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are placed in the monitored directory in less than {duration}")
 def step_impl(context, content_1, content_2, duration):
     context.test.check_for_multiple_files_generated(2, timeparse(duration), [content_1, content_2])