You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/24 13:36:11 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1983 Mount files in minifi test containers

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dc589158 MINIFICPP-1983 Mount files in minifi test containers
2dc589158 is described below

commit 2dc589158c77bc9f0f8171026940be0f17745a55
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Thu Nov 17 17:27:25 2022 +0100

    MINIFICPP-1983 Mount files in minifi test containers
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1461
---
 .../integration/MiNiFi_integration_test_driver.py  | 146 ++++-----
 .../ContainerStore.py}                             | 134 ++++----
 .../test/integration/cluster/DockerCommunicator.py |  69 ++++
 .../test/integration/cluster/DockerTestCluster.py  | 267 ++++++++++++++++
 .../DockerTestDirectoryBindings.py                 |   5 +-
 .../{minifi/core => cluster}/ImageStore.py         |  65 +---
 .../{minifi/core => cluster}/KubernetesProxy.py    |   0
 .../{minifi/core => cluster}/LogSource.py          |   0
 docker/test/integration/cluster/__init__.py        |   0
 .../integration/cluster/checkers/AwsChecker.py     |  49 +++
 .../integration/cluster/checkers/AzureChecker.py   |  59 ++++
 .../cluster/checkers/ElasticSearchChecker.py       |  46 +++
 .../checkers/GcsChecker.py}                        |  17 +-
 .../checkers/PostgresChecker.py}                   |  14 +-
 .../core => cluster/checkers}/PrometheusChecker.py |  30 +-
 .../integration/cluster/checkers/SplunkChecker.py  |  80 +++++
 .../test/integration/cluster/checkers/__init__.py  |   0
 .../containers}/AzureStorageServerContainer.py     |   0
 .../core => cluster/containers}/Container.py       |  20 +-
 .../containers}/ElasticsearchContainer.py          |   0
 .../containers/FakeGcsServerContainer.py}          |  19 +-
 .../core => cluster/containers}/FlowContainer.py   |   0
 .../containers}/HttpProxyContainer.py              |   0
 .../containers}/KafkaBrokerContainer.py            |   0
 .../containers}/MinifiAsPodInKubernetesCluster.py  |  19 +-
 .../containers}/MinifiC2ServerContainer.py         |   0
 .../cluster/containers/MinifiContainer.py          | 125 ++++++++
 .../containers}/MqttBrokerContainer.py             |   0
 .../core => cluster/containers}/NifiContainer.py   |   2 +-
 .../containers}/OPCUAServerContainer.py            |   0
 .../containers}/OpensearchContainer.py             |   0
 .../containers}/PostgreSQLServerContainer.py       |   0
 .../containers}/PrometheusContainer.py             |   0
 .../containers}/S3ServerContainer.py               |   0
 .../core => cluster/containers}/SplunkContainer.py |   0
 .../containers/SyslogTcpClientContainer.py}        |  15 +-
 .../containers/SyslogUdpClientContainer.py}        |  15 +-
 .../containers/TcpClientContainer.py}              |  11 +-
 .../containers}/ZookeeperContainer.py              |   0
 .../integration/cluster/containers/__init__.py     |   0
 docker/test/integration/environment.py             |   6 +-
 .../features/attributes_to_json.feature            |  15 +
 .../integration/features/azure_storage.feature     |  15 +
 .../features/core_functionality.feature            |  21 +-
 .../features/defragtextflowfiles.feature           |  15 +
 .../features/google_cloud_storage.feature          |  15 +
 .../test/integration/features/hashcontent.feature  |  15 +
 docker/test/integration/features/http.feature      |  15 +
 docker/test/integration/features/https.feature     |  15 +
 docker/test/integration/features/kafka.feature     |  15 +
 .../integration/features/minifi_c2_server.feature  |  16 +
 .../integration/features/network_listener.feature  |  15 +
 .../test/integration/features/prometheus.feature   |  17 +
 .../integration/features/syslog_listener.feature   |  15 +
 .../FileSystemObserver.py                          |   0
 .../OutputEventHandler.py                          |   2 +-
 .../integration/filesystem_validation/__init__.py  |   0
 .../controllers/ElasticsearchCredentialsService.py |  14 +
 .../controllers/GCPCredentialsControllerService.py |  14 +
 docker/test/integration/minifi/core/Cluster.py     |  53 ----
 .../integration/minifi/core/DockerTestCluster.py   | 347 ---------------------
 .../minifi/core/FakeGcsServerContainer.py          |  27 --
 .../integration/minifi/core/MinifiContainer.py     |  74 -----
 .../minifi/core/SyslogTcpClientContainer.py        |  23 --
 .../minifi/core/SyslogUdpClientContainer.py        |  23 --
 .../integration/minifi/core/TcpClientContainer.py  |  23 --
 .../minifi/core/TransientMinifiContainer.py        |  24 --
 .../minifi/processors/DeleteGCSObject.py           |  14 +
 .../minifi/processors/FetchGCSObject.py            |  14 +
 .../minifi/processors/ListAzureBlobStorage.py      |  14 +
 .../integration/minifi/processors/ListGCSBucket.py |  14 +
 .../integration/minifi/processors/ListenSyslog.py  |  14 +
 .../integration/minifi/processors/ListenTCP.py     |  14 +
 .../minifi/processors/LogOnDestructionProcessor.py |  14 +
 .../integration/minifi/processors/PutGCSObject.py  |  14 +
 .../minifi/validators/FileOutputValidator.py       |   2 +-
 .../minifi/validators/MultiFileOutputValidator.py  |   2 +-
 .../validators/SingleJSONFileOutputValidator.py    |   2 +-
 .../kubernetes/pods-etc/minifi.test-pod.yml        |   2 +-
 .../resources/minifi/minifi-log.properties         |   4 +
 .../integration/resources/minifi/minifi.properties |  17 +
 .../minifi.properties                              |  29 --
 .../{minifi/core => ssl_utils}/SSL_cert_utils.py   |   0
 docker/test/integration/ssl_utils/__init__.py      |   0
 docker/test/integration/steps/steps.py             |  30 +-
 docker/test/integration/{minifi/core => }/utils.py |  12 +
 86 files changed, 1340 insertions(+), 913 deletions(-)

diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index f16a29d32..e3153dffe 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -22,7 +22,7 @@ from pydoc import locate
 
 from minifi.core.InputPort import InputPort
 
-from minifi.core.DockerTestCluster import DockerTestCluster
+from cluster.DockerTestCluster import DockerTestCluster
 
 from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
 from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
@@ -32,8 +32,7 @@ from minifi.validators.SingleOrMultiFileOutputValidator import SingleOrMultiFile
 from minifi.validators.NoContentCheckFileNumberValidator import NoContentCheckFileNumberValidator
 from minifi.validators.NumFileRangeValidator import NumFileRangeValidator
 from minifi.validators.SingleJSONFileOutputValidator import SingleJSONFileOutputValidator
-
-from minifi.core.utils import decode_escaped_str, get_minifi_pid, get_peak_memory_usage
+from utils import decode_escaped_str, get_minifi_pid, get_peak_memory_usage
 
 
 class MiNiFi_integration_test:
@@ -58,57 +57,49 @@ class MiNiFi_integration_test:
     def acquire_container(self, name, engine='minifi-cpp', command=None):
         return self.cluster.acquire_container(name, engine, command)
 
-    def wait_for_container_startup_to_finish(self, container_name):
-        startup_success = self.cluster.wait_for_startup_log(container_name, 120)
-        if not startup_success:
-            logging.error("Cluster startup failed for %s", container_name)
-            self.cluster.log_app_output()
-        return startup_success
-
     def start_kafka_broker(self):
         self.cluster.acquire_container('kafka-broker', 'kafka-broker')
-        self.cluster.deploy('zookeeper')
-        self.cluster.deploy('kafka-broker')
-        assert self.wait_for_container_startup_to_finish('kafka-broker')
+        self.cluster.deploy_container('zookeeper')
+        self.cluster.deploy_container('kafka-broker')
+        assert self.cluster.wait_for_container_startup_to_finish('kafka-broker')
 
     def start_splunk(self):
         self.cluster.acquire_container('splunk', 'splunk')
-        self.cluster.deploy('splunk')
-        assert self.wait_for_container_startup_to_finish('splunk')
+        self.cluster.deploy_container('splunk')
+        assert self.cluster.wait_for_container_startup_to_finish('splunk')
         assert self.cluster.enable_splunk_hec_indexer('splunk', 'splunk_hec_token')
 
     def start_elasticsearch(self):
         self.cluster.acquire_container('elasticsearch', 'elasticsearch')
-        self.cluster.deploy('elasticsearch')
-        assert self.wait_for_container_startup_to_finish('elasticsearch')
+        self.cluster.deploy_container('elasticsearch')
+        assert self.cluster.wait_for_container_startup_to_finish('elasticsearch')
 
     def start_opensearch(self):
         self.cluster.acquire_container('opensearch', 'opensearch')
-        self.cluster.deploy('opensearch')
-        assert self.wait_for_container_startup_to_finish('opensearch')
+        self.cluster.deploy_container('opensearch')
+        assert self.cluster.wait_for_container_startup_to_finish('opensearch')
 
     def start(self, container_name=None):
         if container_name is not None:
             logging.info("Starting container %s", container_name)
-            self.cluster.deploy_flow(container_name)
-            assert self.wait_for_container_startup_to_finish(container_name)
+            self.cluster.deploy_container(container_name)
+            assert self.cluster.wait_for_container_startup_to_finish(container_name)
             return
         logging.info("MiNiFi_integration_test start")
-        self.cluster.deploy_flow()
-        for container_name in self.cluster.containers:
-            assert self.wait_for_container_startup_to_finish(container_name)
+        self.cluster.deploy_all()
+        assert self.cluster.wait_for_all_containers_to_finish_startup()
 
     def stop(self, container_name):
         logging.info("Stopping container %s", container_name)
-        self.cluster.stop_flow(container_name)
+        self.cluster.stop_container(container_name)
 
     def kill(self, container_name):
         logging.info("Killing container %s", container_name)
-        self.cluster.kill_flow(container_name)
+        self.cluster.kill_container(container_name)
 
     def restart(self, container_name):
         logging.info("Restarting container %s", container_name)
-        self.cluster.restart_flow(container_name)
+        self.cluster.restart_container(container_name)
 
     def add_node(self, processor):
         if processor.get_name() in (elem.get_name() for elem in self.connectable_nodes):
@@ -210,116 +201,117 @@ class MiNiFi_integration_test:
 
     def __check_output(self, timeout_seconds, output_validator, max_files=0):
         result = self.file_system_observer.validate_output(timeout_seconds, output_validator, max_files)
-        self.cluster.log_app_output()
-        assert not self.cluster.segfault_happened()
-        assert result
+        assert not self.cluster.segfault_happened() or self.cluster.log_app_output()
+        assert result or self.cluster.log_app_output()
 
     def __validate(self, validator):
-        self.cluster.log_app_output()
-        assert not self.cluster.segfault_happened()
-        assert validator.validate()
+        assert not self.cluster.segfault_happened() or self.cluster.log_app_output()
+        assert validator.validate() or self.cluster.log_app_output()
 
     def check_s3_server_object_data(self, s3_container_name, object_data):
-        assert self.cluster.check_s3_server_object_data(s3_container_name, object_data)
+        assert self.cluster.check_s3_server_object_data(s3_container_name, object_data) or self.cluster.log_app_output()
 
     def check_s3_server_object_metadata(self, s3_container_name, content_type):
-        assert self.cluster.check_s3_server_object_metadata(s3_container_name, content_type)
+        assert self.cluster.check_s3_server_object_metadata(s3_container_name, content_type) or self.cluster.log_app_output()
 
     def check_empty_s3_bucket(self, s3_container_name):
-        assert self.cluster.is_s3_bucket_empty(s3_container_name)
+        assert self.cluster.is_s3_bucket_empty(s3_container_name) or self.cluster.log_app_output()
 
     def check_http_proxy_access(self, http_proxy_container_name, url):
-        assert self.cluster.check_http_proxy_access(http_proxy_container_name, url)
+        assert self.cluster.check_http_proxy_access(http_proxy_container_name, url) or self.cluster.log_app_output()
 
     def check_azure_storage_server_data(self, azure_container_name, object_data):
-        assert self.cluster.check_azure_storage_server_data(azure_container_name, object_data)
+        assert self.cluster.check_azure_storage_server_data(azure_container_name, object_data) or self.cluster.log_app_output()
 
     def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name):
-        assert self.cluster.wait_for_kafka_consumer_to_be_registered(kafka_container_name)
+        assert self.cluster.wait_for_kafka_consumer_to_be_registered(kafka_container_name) or self.cluster.log_app_output()
 
     def check_splunk_event(self, splunk_container_name, query):
-        assert self.cluster.check_splunk_event(splunk_container_name, query)
+        assert self.cluster.check_splunk_event(splunk_container_name, query) or self.cluster.log_app_output()
 
     def check_splunk_event_with_attributes(self, splunk_container_name, query, attributes):
-        assert self.cluster.check_splunk_event_with_attributes(splunk_container_name, query, attributes)
+        assert self.cluster.check_splunk_event_with_attributes(splunk_container_name, query, attributes) or self.cluster.log_app_output()
 
     def check_google_cloud_storage(self, gcs_container_name, content):
-        assert self.cluster.check_google_cloud_storage(gcs_container_name, content)
+        assert self.cluster.check_google_cloud_storage(gcs_container_name, content) or self.cluster.log_app_output()
 
     def check_empty_gcs_bucket(self, gcs_container_name):
-        assert self.cluster.is_gcs_bucket_empty(gcs_container_name)
+        assert self.cluster.is_gcs_bucket_empty(gcs_container_name) or self.cluster.log_app_output()
 
     def check_empty_elastic(self, elastic_container_name):
-        assert self.cluster.is_elasticsearch_empty(elastic_container_name)
+        assert self.cluster.is_elasticsearch_empty(elastic_container_name) or self.cluster.log_app_output()
 
     def elastic_generate_apikey(self, elastic_container_name):
-        return self.cluster.elastic_generate_apikey(elastic_container_name)
+        return self.cluster.elastic_generate_apikey(elastic_container_name) or self.cluster.log_app_output()
 
     def create_doc_elasticsearch(self, elastic_container_name, index_name, doc_id):
-        assert self.cluster.create_doc_elasticsearch(elastic_container_name, index_name, doc_id)
+        assert self.cluster.create_doc_elasticsearch(elastic_container_name, index_name, doc_id) or self.cluster.log_app_output()
 
     def check_elastic_field_value(self, elastic_container_name, index_name, doc_id, field_name, field_value):
-        assert self.cluster.check_elastic_field_value(elastic_container_name, index_name, doc_id, field_name, field_value)
+        assert self.cluster.check_elastic_field_value(elastic_container_name, index_name, doc_id, field_name, field_value) or self.cluster.log_app_output()
 
     def add_elastic_user_to_opensearch(self, container_name):
-        assert self.cluster.add_elastic_user_to_opensearch(container_name)
+        assert self.cluster.add_elastic_user_to_opensearch(container_name) or self.cluster.log_app_output()
 
     def check_minifi_log_contents(self, line, timeout_seconds=60, count=1):
         self.check_container_log_contents("minifi-cpp", line, timeout_seconds, count)
 
     def check_minifi_log_matches_regex(self, regex, timeout_seconds=60, count=1):
-        for container in self.cluster.containers.values():
-            if container.get_engine() == "minifi-cpp":
-                line_found = self.cluster.wait_for_app_logs_regex(container.get_name(), regex, timeout_seconds, count)
-                if line_found:
-                    return
-        assert False
+        assert self.cluster.check_minifi_log_matches_regex(regex, timeout_seconds, count) or self.cluster.log_app_output()
 
     def check_container_log_contents(self, container_engine, line, timeout_seconds=60, count=1):
-        for container in self.cluster.containers.values():
-            if container.get_engine() == container_engine:
-                line_found = self.cluster.wait_for_app_logs(container.get_name(), line, timeout_seconds, count)
-                if line_found:
-                    return
-        assert False
+        assert self.cluster.check_container_log_contents(container_engine, line, timeout_seconds, count) or self.cluster.log_app_output()
 
     def check_minifi_log_does_not_contain(self, line, wait_time_seconds):
-        time.sleep(wait_time_seconds)
-        for container in self.cluster.containers.values():
-            if container.get_engine() == "minifi-cpp":
-                _, logs = self.cluster.get_app_log(container.get_name())
-                if logs is not None and 1 <= logs.decode("utf-8").count(line):
-                    assert False
+        assert self.cluster.check_minifi_log_does_not_contain(line, wait_time_seconds) or self.cluster.log_app_output()
 
     def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
-        assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds)
+        assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds) or self.cluster.log_app_output()
 
     def check_container_log_matches_regex(self, container_name, log_pattern, timeout_seconds, count=1):
-        assert self.cluster.wait_for_app_logs_regex(container_name, log_pattern, timeout_seconds, count)
+        assert self.cluster.wait_for_app_logs_regex(container_name, log_pattern, timeout_seconds, count) or self.cluster.log_app_output()
 
     def add_test_blob(self, blob_name, content, with_snapshot):
         self.cluster.add_test_blob(blob_name, content, with_snapshot)
 
     def check_azure_blob_storage_is_empty(self, timeout_seconds):
-        assert self.cluster.check_azure_blob_storage_is_empty(timeout_seconds)
+        assert self.cluster.check_azure_blob_storage_is_empty(timeout_seconds) or self.cluster.log_app_output()
 
     def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_seconds):
-        assert self.cluster.check_azure_blob_and_snapshot_count(blob_and_snapshot_count, timeout_seconds)
+        assert self.cluster.check_azure_blob_and_snapshot_count(blob_and_snapshot_count, timeout_seconds) or self.cluster.log_app_output()
 
     def check_metric_class_on_prometheus(self, metric_class, timeout_seconds):
-        assert self.cluster.wait_for_metric_class_on_prometheus(metric_class, timeout_seconds)
+        assert self.cluster.wait_for_metric_class_on_prometheus(metric_class, timeout_seconds) or self.cluster.log_app_output()
 
     def check_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
-        assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
+        assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name) or self.cluster.log_app_output()
 
     def check_if_peak_memory_usage_exceeded(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> None:
-        assert self.cluster.wait_for_peak_memory_usage_to_exceed(minimum_peak_memory_usage, timeout_seconds)
+        assert self.cluster.wait_for_peak_memory_usage_to_exceed(minimum_peak_memory_usage, timeout_seconds) or self.cluster.log_app_output()
 
     def check_if_memory_usage_is_below(self, maximum_memory_usage: int, timeout_seconds: int) -> None:
-        assert self.cluster.wait_for_memory_usage_to_drop_below(maximum_memory_usage, timeout_seconds)
+        assert self.cluster.wait_for_memory_usage_to_drop_below(maximum_memory_usage, timeout_seconds) or self.cluster.log_app_output()
 
     def check_memory_usage_compared_to_peak(self, peak_multiplier: float, timeout_seconds: int) -> None:
         peak_memory = get_peak_memory_usage(get_minifi_pid())
-        assert (peak_memory is not None)
-        assert (1.0 > peak_multiplier > 0.0)
-        assert self.cluster.wait_for_memory_usage_to_drop_below(peak_memory * peak_multiplier, timeout_seconds)
+        assert (peak_memory is not None) or self.cluster.log_app_output()
+        assert (1.0 > peak_multiplier > 0.0) or self.cluster.log_app_output()
+        assert self.cluster.wait_for_memory_usage_to_drop_below(peak_memory * peak_multiplier, timeout_seconds) or self.cluster.log_app_output()
+
+    def enable_provenance_repository_in_minifi(self):
+        self.cluster.enable_provenance_repository_in_minifi()
+
+    def enable_c2_in_minifi(self):
+        self.cluster.enable_c2_in_minifi()
+
+    def enable_c2_with_ssl_in_minifi(self):
+        self.cluster.enable_c2_with_ssl_in_minifi()
+
+    def enable_prometheus_in_minifi(self):
+        self.cluster.enable_prometheus_in_minifi()
+
+    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem):
+        self.cluster.enable_splunk_hec_ssl(container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem)
+
+    def enable_sql_in_minifi(self):
+        self.cluster.enable_sql_in_minifi()
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/cluster/ContainerStore.py
similarity index 65%
rename from docker/test/integration/minifi/core/SingleNodeDockerCluster.py
rename to docker/test/integration/cluster/ContainerStore.py
index aca62ee05..867e41076 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -12,54 +12,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
-import docker
-import logging
 import uuid
-
-from .Cluster import Cluster
-from .MinifiContainer import MinifiContainer
-from .TransientMinifiContainer import TransientMinifiContainer
-from .MinifiWithProvenanceRepoContainer import MinifiWithProvenanceRepoContainer
-from .NifiContainer import NifiContainer
-from .ZookeeperContainer import ZookeeperContainer
-from .KafkaBrokerContainer import KafkaBrokerContainer
-from .S3ServerContainer import S3ServerContainer
-from .AzureStorageServerContainer import AzureStorageServerContainer
-from .FakeGcsServerContainer import FakeGcsServerContainer
-from .HttpProxyContainer import HttpProxyContainer
-from .PostgreSQLServerContainer import PostgreSQLServerContainer
-from .MqttBrokerContainer import MqttBrokerContainer
-from .OPCUAServerContainer import OPCUAServerContainer
-from .SplunkContainer import SplunkContainer
-from .ElasticsearchContainer import ElasticsearchContainer
-from .OpensearchContainer import OpensearchContainer
-from .SyslogUdpClientContainer import SyslogUdpClientContainer
-from .SyslogTcpClientContainer import SyslogTcpClientContainer
-from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
-from .TcpClientContainer import TcpClientContainer
-from .PrometheusContainer import PrometheusContainer
-from .MinifiC2ServerContainer import MinifiC2ServerContainer
-from .MinifiWithHttpsC2Config import MinifiWithHttpsC2Config
-
-
-class SingleNodeDockerCluster(Cluster):
-    """
-    A "cluster" which consists of a single docker node. Useful for
-    testing or use-cases which do not span multiple compute nodes.
-    """
-
-    def __init__(self, context):
-        self.vols = {}
-        self.network = self.create_docker_network()
+import logging
+from .containers.MinifiContainer import MinifiOptions
+from .containers.MinifiContainer import MinifiContainer
+from .containers.NifiContainer import NifiContainer
+from .containers.ZookeeperContainer import ZookeeperContainer
+from .containers.KafkaBrokerContainer import KafkaBrokerContainer
+from .containers.S3ServerContainer import S3ServerContainer
+from .containers.AzureStorageServerContainer import AzureStorageServerContainer
+from .containers.FakeGcsServerContainer import FakeGcsServerContainer
+from .containers.HttpProxyContainer import HttpProxyContainer
+from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer
+from .containers.MqttBrokerContainer import MqttBrokerContainer
+from .containers.OPCUAServerContainer import OPCUAServerContainer
+from .containers.SplunkContainer import SplunkContainer
+from .containers.ElasticsearchContainer import ElasticsearchContainer
+from .containers.OpensearchContainer import OpensearchContainer
+from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer
+from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer
+from .containers.MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
+from .containers.TcpClientContainer import TcpClientContainer
+from .containers.PrometheusContainer import PrometheusContainer
+from .containers.MinifiC2ServerContainer import MinifiC2ServerContainer
+
+
+class ContainerStore:
+    def __init__(self, network, image_store, kubernetes_proxy):
+        self.minifi_options = MinifiOptions()
         self.containers = {}
-        self.image_store = context.image_store
         self.data_directories = {}
-        self.kubernetes_proxy = context.kubernetes_proxy
-
-        # Get docker client
-        self.client = docker.from_env()
+        self.network = network
+        self.image_store = image_store
+        self.kubernetes_proxy = kubernetes_proxy
 
     def __del__(self):
         self.cleanup()
@@ -79,12 +64,6 @@ class SingleNodeDockerCluster(Cluster):
         for container in self.containers.values():
             container.vols = self.vols
 
-    @staticmethod
-    def create_docker_network():
-        net_name = 'minifi_integration_test_network-' + str(uuid.uuid4())
-        logging.debug('Creating network: %s', net_name)
-        return docker.from_env().networks.create(net_name)
-
     def acquire_container(self, name, engine='minifi-cpp', command=None):
         if name is not None and name in self.containers:
             return self.containers[name]
@@ -96,15 +75,9 @@ class SingleNodeDockerCluster(Cluster):
         if engine == 'nifi':
             return self.containers.setdefault(name, NifiContainer(self.data_directories["nifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'minifi-cpp':
-            return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+            return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], self.minifi_options, name, self.vols, self.network, self.image_store, command))
         elif engine == 'kubernetes':
-            return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.kubernetes_proxy, self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
-        elif engine == 'transient-minifi':
-            return self.containers.setdefault(name, TransientMinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
-        elif engine == 'minifi-cpp-with-provenance-repo':
-            return self.containers.setdefault(name, MinifiWithProvenanceRepoContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
-        elif engine == 'minifi-cpp-with-https-c2-config':
-            return self.containers.setdefault(name, MinifiWithHttpsC2Config(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+            return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.kubernetes_proxy, self.data_directories["kubernetes_config_dir"], self.minifi_options, name, self.vols, self.network, self.image_store, command))
         elif engine == 'kafka-broker':
             if 'zookeeper' not in self.containers:
                 self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store, command))
@@ -144,36 +117,57 @@ class SingleNodeDockerCluster(Cluster):
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
-    def deploy(self, name):
+    def deploy_container(self, name):
         if name is None or name not in self.containers:
             raise Exception('Invalid container to deploy: \'%s\'' % name)
 
         self.containers[name].deploy()
 
-    def deploy_flow(self, container_name=None):
-        if container_name is not None:
-            if container_name not in self.containers:
-                logging.error('Could not start container because it is not found: \'%s\'', container_name)
-                return
-            self.containers[container_name].deploy()
-            return
+    def deploy_all(self):
         for container in self.containers.values():
             container.deploy()
 
-    def stop_flow(self, container_name):
+    def stop_container(self, container_name):
         if container_name not in self.containers:
             logging.error('Could not stop container because it is not found: \'%s\'', container_name)
             return
         self.containers[container_name].stop()
 
-    def kill_flow(self, container_name):
+    def kill_container(self, container_name):
         if container_name not in self.containers:
             logging.error('Could not kill container because it is not found: \'%s\'', container_name)
             return
         self.containers[container_name].kill()
 
-    def restart_flow(self, container_name):
+    def restart_container(self, container_name):
         if container_name not in self.containers:
             logging.error('Could not restart container because it is not found: \'%s\'', container_name)
             return
         self.containers[container_name].restart()
+
+    def enable_provenance_repository_in_minifi(self):
+        self.minifi_options.enable_provenance = True
+
+    def enable_c2_in_minifi(self):
+        self.minifi_options.enable_c2 = True
+
+    def enable_c2_with_ssl_in_minifi(self):
+        self.minifi_options.enable_c2_with_ssl = True
+
+    def enable_prometheus_in_minifi(self):
+        self.minifi_options.enable_prometheus = True
+
+    def enable_sql_in_minifi(self):
+        self.minifi_options.enable_sql = True
+
+    def get_startup_finished_log_entry(self, container_name):
+        return self.containers[container_name].get_startup_finished_log_entry()
+
+    def log_source(self, container_name):
+        return self.containers[container_name].log_source()
+
+    def get_app_log(self, container_name):
+        return self.containers[container_name].get_app_log()
+
+    def get_container_names(self, engine=None):
+        return [key for key in self.containers.keys() if not engine or self.containers[key].get_engine() == engine]
diff --git a/docker/test/integration/cluster/DockerCommunicator.py b/docker/test/integration/cluster/DockerCommunicator.py
new file mode 100644
index 000000000..e0fa305d0
--- /dev/null
+++ b/docker/test/integration/cluster/DockerCommunicator.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import docker
+import uuid
+import logging
+import sys
+import tempfile
+import tarfile
+import os
+import io
+
+
+class DockerCommunicator:
+    def __init__(self):
+        self.client = docker.from_env()
+
+    def create_docker_network(self):
+        net_name = 'minifi_integration_test_network-' + str(uuid.uuid4())
+        logging.debug('Creating network: %s', net_name)
+        return self.client.networks.create(net_name)
+
+    @staticmethod
+    def get_stdout_encoding():
+        # Use UTF-8 both when sys.stdout present but set to None (explicitly piped output
+        # and also some CI such as GitHub Actions).
+        encoding = getattr(sys.stdout, "encoding", None)
+        if encoding is None:
+            encoding = "utf8"
+        return encoding
+
+    def execute_command(self, container_name, command):
+        (code, output) = self.client.containers.get(container_name).exec_run(command)
+        return (code, output.decode(self.get_stdout_encoding()))
+
+    def get_app_log_from_docker_container(self, container_name):
+        try:
+            container = self.client.containers.get(container_name)
+        except Exception:
+            return 'not started', None
+
+        if b'Segmentation fault' in container.logs():
+            logging.warning('Container segfaulted: %s', container.name)
+            self.segfault = True
+
+        return container.status, container.logs()
+
+    def __put_archive(self, container_name, path, data):
+        return self.client.containers.get(container_name).put_archive(path, data)
+
+    def write_content_to_container(self, content, container_name, dst_path):
+        with tempfile.TemporaryDirectory() as td:
+            with tarfile.open(os.path.join(td, 'content.tar'), mode='w') as tar:
+                info = tarfile.TarInfo(name=os.path.basename(dst_path))
+                info.size = len(content)
+                tar.addfile(info, io.BytesIO(content.encode('utf-8')))
+            with open(os.path.join(td, 'content.tar'), 'rb') as data:
+                return self.__put_archive(container_name, os.path.dirname(dst_path), data.read())
diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py
new file mode 100644
index 000000000..9688c732e
--- /dev/null
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -0,0 +1,267 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import time
+import re
+
+from .LogSource import LogSource
+from .ContainerStore import ContainerStore
+from .DockerCommunicator import DockerCommunicator
+from .checkers.AwsChecker import AwsChecker
+from .checkers.AzureChecker import AzureChecker
+from .checkers.ElasticSearchChecker import ElasticSearchChecker
+from .checkers.GcsChecker import GcsChecker
+from .checkers.PostgresChecker import PostgresChecker
+from .checkers.PrometheusChecker import PrometheusChecker
+from .checkers.SplunkChecker import SplunkChecker
+from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage
+
+
+class DockerTestCluster:
+    def __init__(self, context):
+        self.segfault = False
+        self.vols = {}
+        self.container_communicator = DockerCommunicator()
+        self.container_store = ContainerStore(self.container_communicator.create_docker_network(), context.image_store, context.kubernetes_proxy)
+        self.aws_checker = AwsChecker(self.container_communicator)
+        self.azure_checker = AzureChecker(self.container_communicator)
+        self.elastic_search_checker = ElasticSearchChecker(self.container_communicator)
+        self.gcs_checker = GcsChecker(self.container_communicator)
+        self.postgres_checker = PostgresChecker(self.container_communicator)
+        self.splunk_checker = SplunkChecker(self.container_communicator)
+        self.prometheus_checker = PrometheusChecker()
+
+    def __del__(self):
+        self.cleanup()
+
+    def cleanup(self):
+        self.container_store.cleanup()
+
+    def set_directory_bindings(self, volumes, data_directories):
+        self.container_store.set_directory_bindings(volumes, data_directories)
+
+    def acquire_container(self, name, engine='minifi-cpp', command=None):
+        return self.container_store.acquire_container(name, engine, command)
+
+    def deploy_container(self, name):
+        self.container_store.deploy_container(name)
+
+    def deploy_all(self):
+        self.container_store.deploy_all()
+
+    def stop_container(self, container_name):
+        self.container_store.stop_container(container_name)
+
+    def kill_container(self, container_name):
+        self.container_store.kill_container(container_name)
+
+    def restart_container(self, container_name):
+        self.container_store.restart_container(container_name)
+
+    def enable_provenance_repository_in_minifi(self):
+        self.container_store.enable_provenance_repository_in_minifi()
+
+    def enable_c2_in_minifi(self):
+        self.container_store.enable_c2_in_minifi()
+
+    def enable_c2_with_ssl_in_minifi(self):
+        self.container_store.enable_c2_with_ssl_in_minifi()
+
+    def enable_prometheus_in_minifi(self):
+        self.container_store.enable_prometheus_in_minifi()
+
+    def enable_sql_in_minifi(self):
+        self.container_store.enable_sql_in_minifi()
+
+    def get_app_log(self, container_name):
+        log_source = self.container_store.log_source(container_name)
+        if log_source == LogSource.FROM_DOCKER_CONTAINER:
+            return self.container_communicator.get_app_log_from_docker_container(container_name)
+        elif log_source == LogSource.FROM_GET_APP_LOG_METHOD:
+            return self.container_store.get_app_log(container_name)
+        else:
+            raise Exception("Unexpected log source '%s'" % log_source)
+
+    def __wait_for_app_logs_impl(self, container_name, log_entry, timeout_seconds, count, use_regex):
+        wait_start_time = time.perf_counter()
+        while True:
+            logging.info('Waiting for app-logs `%s` in container `%s`', log_entry, container_name)
+            status, logs = self.get_app_log(container_name)
+            if logs is not None:
+                if not use_regex and logs.decode("utf-8").count(log_entry) >= count:
+                    return True
+                elif use_regex and len(re.findall(log_entry, logs.decode("utf-8"))) >= count:
+                    return True
+            elif status == 'exited':
+                return False
+            time.sleep(1)
+            if timeout_seconds < (time.perf_counter() - wait_start_time):
+                break
+        return False
+
+    def wait_for_app_logs_regex(self, container_name, log_entry, timeout_seconds, count=1):
+        return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, True)
+
+    def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1):
+        return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, False)
+
+    def wait_for_startup_log(self, container_name, timeout_seconds):
+        return self.wait_for_app_logs_regex(container_name, self.container_store.get_startup_finished_log_entry(container_name), timeout_seconds, 1)
+
+    def log_app_output(self):
+        for container_name in self.container_store.get_container_names():
+            _, logs = self.get_app_log(container_name)
+            if logs is not None:
+                logging.info("Logs of container '%s':", container_name)
+                for line in logs.decode("utf-8").splitlines():
+                    logging.info(line)
+
+    def check_http_proxy_access(self, container_name, url):
+        (code, output) = self.container_communicator.execute_command(container_name, ["cat", "/var/log/squid/access.log"])
+        return code == 0 and url in output \
+            and ((output.count("TCP_DENIED") != 0
+                 and output.count("TCP_MISS") >= output.count("TCP_DENIED"))
+                 or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output)
+
+    def check_s3_server_object_data(self, container_name, test_data):
+        return self.aws_checker.check_s3_server_object_data(container_name, test_data)
+
+    def check_s3_server_object_metadata(self, container_name, content_type="application/octet-stream", metadata=dict()):
+        return self.aws_checker.check_s3_server_object_metadata(container_name, content_type, metadata)
+
+    def is_s3_bucket_empty(self, container_name):
+        return self.aws_checker.is_s3_bucket_empty(container_name)
+
+    def check_azure_storage_server_data(self, container_name, test_data):
+        return self.azure_checker.check_azure_storage_server_data(container_name, test_data)
+
+    def add_test_blob(self, blob_name, content="", with_snapshot=False):
+        return self.azure_checker.add_test_blob(blob_name, content, with_snapshot)
+
+    def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_seconds):
+        return self.azure_checker.check_azure_blob_and_snapshot_count(blob_and_snapshot_count, timeout_seconds)
+
+    def check_azure_blob_storage_is_empty(self, timeout_seconds):
+        return self.azure_checker.check_azure_blob_storage_is_empty(timeout_seconds)
+
+    def check_splunk_event(self, container_name, query):
+        return self.splunk_checker.check_splunk_event(container_name, query)
+
+    def check_splunk_event_with_attributes(self, container_name, query, attributes):
+        return self.splunk_checker.check_splunk_event_with_attributes(container_name, query, attributes)
+
+    def enable_splunk_hec_indexer(self, container_name, hec_name):
+        return self.splunk_checker.enable_splunk_hec_indexer(container_name, hec_name)
+
+    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem):
+        return self.splunk_checker.enable_splunk_hec_ssl(container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem)
+
+    def check_google_cloud_storage(self, gcs_container_name, content):
+        return self.gcs_checker.check_google_cloud_storage(gcs_container_name, content)
+
+    def is_gcs_bucket_empty(self, container_name):
+        return self.gcs_checker.is_gcs_bucket_empty(container_name)
+
+    def is_elasticsearch_empty(self, container_name):
+        return self.elastic_search_checker.is_elasticsearch_empty(container_name)
+
+    def create_doc_elasticsearch(self, container_name, index_name, doc_id):
+        return self.elastic_search_checker.create_doc_elasticsearch(container_name, index_name, doc_id)
+
+    def check_elastic_field_value(self, container_name, index_name, doc_id, field_name, field_value):
+        return self.elastic_search_checker.check_elastic_field_value(container_name, index_name, doc_id, field_name, field_value)
+
+    def elastic_generate_apikey(self, elastic_container_name):
+        return self.elastic_search_checker.elastic_generate_apikey(elastic_container_name)
+
+    def add_elastic_user_to_opensearch(self, container_name):
+        return self.elastic_search_checker.add_elastic_user_to_opensearch(container_name)
+
+    def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
+        return self.postgres_checker.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds)
+
+    def segfault_happened(self):
+        return self.segfault
+
+    def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name):
+        return self.wait_for_app_logs(kafka_container_name, "Assignment received from leader for group docker_test_group", 60)
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
+        return self.prometheus_checker.wait_for_metric_class_on_prometheus(metric_class, timeout_seconds)
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
+        return self.prometheus_checker.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
+
+    def check_minifi_log_matches_regex(self, regex, timeout_seconds=60, count=1):
+        for container_name in self.container_store.get_container_names("minifi-cpp"):
+            line_found = self.wait_for_app_logs_regex(container_name, regex, timeout_seconds, count)
+            if line_found:
+                return True
+        return False
+
+    def check_container_log_contents(self, container_engine, line, timeout_seconds=60, count=1):
+        for container_name in self.container_store.get_container_names(container_engine):
+            line_found = self.wait_for_app_logs(container_name, line, timeout_seconds, count)
+            if line_found:
+                return True
+        return False
+
+    def check_minifi_log_does_not_contain(self, line, wait_time_seconds):
+        time.sleep(wait_time_seconds)
+        for container_name in self.container_store.get_container_names("minifi-cpp"):
+            _, logs = self.get_app_log(container_name)
+            if logs is not None and 1 <= logs.decode("utf-8").count(line):
+                return False
+        return True
+
+    def wait_for_container_startup_to_finish(self, container_name):
+        startup_success = self.wait_for_startup_log(container_name, 120)
+        if not startup_success:
+            logging.error("Cluster startup failed for %s", container_name)
+            self.log_app_output()
+        return startup_success
+
+    def wait_for_all_containers_to_finish_startup(self):
+        for container_name in self.container_store.get_container_names():
+            if not self.wait_for_container_startup_to_finish(container_name):
+                return False
+        return True
+
+    def wait_for_peak_memory_usage_to_exceed(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> bool:
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            current_peak_memory_usage = get_peak_memory_usage(get_minifi_pid())
+            if current_peak_memory_usage is None:
+                logging.warning("Failed to determine peak memory usage")
+                return False
+            if current_peak_memory_usage > minimum_peak_memory_usage:
+                return True
+            time.sleep(1)
+        logging.warning(f"Peak memory usage ({current_peak_memory_usage}) didnt exceed minimum asserted peak memory usage {minimum_peak_memory_usage}")
+        return False
+
+    def wait_for_memory_usage_to_drop_below(self, max_memory_usage: int, timeout_seconds: int) -> bool:
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            current_memory_usage = get_memory_usage(get_minifi_pid())
+            if current_memory_usage is None:
+                logging.warning("Failed to determine memory usage")
+                return False
+            if current_memory_usage < max_memory_usage:
+                return True
+            current_memory_usage = get_memory_usage(get_minifi_pid())
+            time.sleep(1)
+        logging.warning(f"Memory usage ({current_memory_usage}) is more than the maximum asserted memory usage ({max_memory_usage})")
+        return False
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/cluster/DockerTestDirectoryBindings.py
similarity index 94%
rename from docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
rename to docker/test/integration/cluster/DockerTestDirectoryBindings.py
index be39110d2..228e3837a 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/cluster/DockerTestDirectoryBindings.py
@@ -43,7 +43,8 @@ class DockerTestDirectoryBindings:
             "resources_dir": "/tmp/.nifi-test-resources." + self.test_id,
             "minifi_config_dir": "/tmp/.nifi-test-minifi-config-dir." + self.test_id,
             "nifi_config_dir": "/tmp/.nifi-test-nifi-config-dir." + self.test_id,
-            "kubernetes_temp_dir": "/tmp/.nifi-test-kubernetes-temp-dir." + self.test_id
+            "kubernetes_temp_dir": "/tmp/.nifi-test-kubernetes-temp-dir." + self.test_id,
+            "kubernetes_config_dir": "/tmp/.nifi-test-kubernetes-config-dir." + self.test_id
         }
 
         [self.create_directory(directory) for directory in self.data_directories[self.test_id].values()]
@@ -57,6 +58,7 @@ class DockerTestDirectoryBindings:
         shutil.copytree(test_dir + "/resources/elasticsearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/elasticsearch")
         shutil.copytree(test_dir + "/resources/opensearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/opensearch")
         shutil.copytree(test_dir + "/resources/minifi-c2-server-ssl/certs", self.data_directories[self.test_id]["resources_dir"] + "/minifi-c2-server-ssl")
+        shutil.copytree(test_dir + "/resources/minifi", self.data_directories[self.test_id]["minifi_config_dir"], dirs_exist_ok=True)
 
     def get_data_directories(self, test_id):
         return self.data_directories[test_id]
@@ -89,6 +91,7 @@ class DockerTestDirectoryBindings:
         vols[self.data_directories[test_id]["resources_dir"]] = {"bind": "/tmp/resources", "mode": "rw"}
         vols[self.data_directories[test_id]["minifi_config_dir"]] = {"bind": "/tmp/minifi_config", "mode": "rw"}
         vols[self.data_directories[test_id]["nifi_config_dir"]] = {"bind": "/tmp/nifi_config", "mode": "rw"}
+        vols[self.data_directories[test_id]["kubernetes_config_dir"]] = {"bind": "/tmp/kubernetes_config", "mode": "rw"}
         return vols
 
     @staticmethod
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/cluster/ImageStore.py
similarity index 71%
rename from docker/test/integration/minifi/core/ImageStore.py
rename to docker/test/integration/cluster/ImageStore.py
index 20d212c69..4a012c867 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 
-from .MinifiContainer import MinifiContainer
+from .containers.MinifiContainer import MinifiContainer
 import logging
 import tarfile
 import docker
@@ -42,12 +42,8 @@ class ImageStore:
         if container_engine in self.images:
             return self.images[container_engine]
 
-        if container_engine == "minifi-cpp" or container_engine == "transient-minifi":
-            image = self.__build_minifi_cpp_image()
-        elif container_engine == "minifi-cpp-with-provenance-repo":
-            image = self.__build_minifi_cpp_image_with_provenance_repo()
-        elif container_engine == "minifi-cpp-with-https-c2-config":
-            image = self.__build_minifi_cpp_image_with_https_c2_config()
+        if container_engine == "minifi-cpp-sql":
+            image = self.__build_minifi_cpp_sql_image()
         elif container_engine == "http-proxy":
             image = self.__build_http_proxy_image()
         elif container_engine == "postgresql-server":
@@ -76,7 +72,7 @@ class ImageStore:
         self.images[container_engine] = image
         return image
 
-    def __build_minifi_cpp_image(self):
+    def __build_minifi_cpp_sql_image(self):
         dockerfile = dedent("""\
                 FROM {base_image}
                 USER root
@@ -106,59 +102,8 @@ class ImageStore:
                     echo "UserName = postgres" >> /etc/odbc.ini && \
                     echo "Password = password" >> /etc/odbc.ini && \
                     echo "Database = postgres" >> /etc/odbc.ini
-                RUN sed -i -e 's/INFO/TRACE/g' {minifi_root}/conf/minifi-log.properties
-                RUN echo nifi.flow.engine.threads=5 >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.enable=true  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.flow.base.url=http://minifi-c2-server:10090/c2/config/  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.full.heartbeat=false  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.agent.class=minifi-test-class  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.agent.identifier=minifi-test-id  >> {minifi_root}/conf/minifi.properties
                 USER minificpp
-                """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
-                           minifi_root=MinifiContainer.MINIFI_ROOT))
-
-        return self.__build_image(dockerfile)
-
-    def __build_minifi_cpp_image_with_provenance_repo(self):
-        dockerfile = dedent("""\
-                FROM {base_image}
-                USER root
-                COPY minifi.properties {minifi_root}/conf/minifi.properties
-                USER minificpp
-                """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
-                           minifi_root=MinifiContainer.MINIFI_ROOT))
-
-        properties_path = self.test_dir + "/resources/minifi_cpp_with_provenance_repo/minifi.properties"
-        properties_context = {'name': 'minifi.properties', 'size': os.path.getsize(properties_path)}
-
-        with open(properties_path, 'rb') as properties_file:
-            properties_context['file_obj'] = properties_file
-            image = self.__build_image(dockerfile, [properties_context])
-        return image
-
-    def __build_minifi_cpp_image_with_https_c2_config(self):
-        dockerfile = dedent("""\
-                FROM {base_image}
-                USER root
-                RUN sed -i -e 's/INFO/DEBUG/g' {minifi_root}/conf/minifi-log.properties
-                RUN echo nifi.c2.enable=true  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.rest.url=https://minifi-c2-server:10090/c2/config/heartbeat  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.rest.url.ack=https://minifi-c2-server:10090/c2/config/acknowledge  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.rest.ssl.context.service=SSLContextService  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.flow.base.url=https://minifi-c2-server:10090/c2/config/  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.full.heartbeat=false  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.agent.class=minifi-test-class  >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.c2.agent.identifier=minifi-test-id  >> {minifi_root}/conf/minifi.properties
-                USER minificpp
-                """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
-                           minifi_root=MinifiContainer.MINIFI_ROOT))
+                """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION))
 
         return self.__build_image(dockerfile)
 
diff --git a/docker/test/integration/minifi/core/KubernetesProxy.py b/docker/test/integration/cluster/KubernetesProxy.py
similarity index 100%
rename from docker/test/integration/minifi/core/KubernetesProxy.py
rename to docker/test/integration/cluster/KubernetesProxy.py
diff --git a/docker/test/integration/minifi/core/LogSource.py b/docker/test/integration/cluster/LogSource.py
similarity index 100%
rename from docker/test/integration/minifi/core/LogSource.py
rename to docker/test/integration/cluster/LogSource.py
diff --git a/docker/test/integration/cluster/__init__.py b/docker/test/integration/cluster/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/docker/test/integration/cluster/checkers/AwsChecker.py b/docker/test/integration/cluster/checkers/AwsChecker.py
new file mode 100644
index 000000000..272335a85
--- /dev/null
+++ b/docker/test/integration/cluster/checkers/AwsChecker.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from utils import retry_check
+
+
+class AwsChecker:
+    def __init__(self, container_communicator):
+        self.container_communicator = container_communicator
+
+    @retry_check()
+    def check_s3_server_object_data(self, container_name, test_data):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/tmp/", "-type", "d", "-name", "s3mock*"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/test_bucket/test_object_key/fileData"])
+        return code == 0 and file_data == test_data
+
+    @retry_check()
+    def check_s3_server_object_metadata(self, container_name, content_type="application/octet-stream", metadata=dict()):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/tmp/", "-type", "d", "-name", "s3mock*"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, output) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/test_bucket/test_object_key/metadata"])
+        server_metadata = json.loads(output)
+        return code == 0 and server_metadata["contentType"] == content_type and metadata == server_metadata["userMetadata"]
+
+    @retry_check()
+    def is_s3_bucket_empty(self, container_name):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/tmp/", "-type", "d", "-name", "s3mock*"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, ls_result) = self.container_communicator.execute_command(container_name, ["ls", s3_mock_dir + "/test_bucket/"])
+        return code == 0 and not ls_result
diff --git a/docker/test/integration/cluster/checkers/AzureChecker.py b/docker/test/integration/cluster/checkers/AzureChecker.py
new file mode 100644
index 000000000..ddeb1c934
--- /dev/null
+++ b/docker/test/integration/cluster/checkers/AzureChecker.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from azure.storage.blob import BlobServiceClient
+from azure.core.exceptions import ResourceExistsError
+from utils import retry_check, wait_for
+
+
+class AzureChecker:
+    AZURE_CONNECTION_STRING = \
+        ("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
+         "BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;")
+
+    def __init__(self, container_communicator):
+        self.container_communicator = container_communicator
+        self.blob_service_client = BlobServiceClient.from_connection_string(AzureChecker.AZURE_CONNECTION_STRING)
+
+    @retry_check()
+    def check_azure_storage_server_data(self, container_name, test_data):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/data/__blobstorage__", "-type", "f"])
+        if code != 0:
+            return False
+        data_file = output.strip()
+        (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", data_file])
+        return code == 0 and test_data in file_data
+
+    def add_test_blob(self, blob_name, content="", with_snapshot=False):
+        try:
+            self.blob_service_client.create_container("test-container")
+        except ResourceExistsError:
+            logging.debug('test-container already exists')
+
+        blob_client = self.blob_service_client.get_blob_client(container="test-container", blob=blob_name)
+        blob_client.upload_blob(content)
+
+        if with_snapshot:
+            blob_client.create_snapshot()
+
+    def __get_blob_and_snapshot_count(self):
+        container_client = self.blob_service_client.get_container_client("test-container")
+        return len(list(container_client.list_blobs(include=['deleted'])))
+
+    def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_seconds):
+        return wait_for(lambda: self.__get_blob_and_snapshot_count() == blob_and_snapshot_count, timeout_seconds)
+
+    def check_azure_blob_storage_is_empty(self, timeout_seconds):
+        return wait_for(lambda: self.__get_blob_and_snapshot_count() == 0, timeout_seconds)
diff --git a/docker/test/integration/cluster/checkers/ElasticSearchChecker.py b/docker/test/integration/cluster/checkers/ElasticSearchChecker.py
new file mode 100644
index 000000000..7149545ac
--- /dev/null
+++ b/docker/test/integration/cluster/checkers/ElasticSearchChecker.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+
+
+class ElasticSearchChecker:
+    def __init__(self, container_communicator):
+        self.container_communicator = container_communicator
+
+    def is_elasticsearch_empty(self, container_name):
+        (code, output) = self.container_communicator.execute_command(container_name, ["curl", "-u", "elastic:password", "-k", "-XGET", "https://localhost:9200/_search"])
+        return code == 0 and '"hits":[]' in output
+
+    def create_doc_elasticsearch(self, container_name, index_name, doc_id):
+        (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c",
+                                                                                      "curl -u elastic:password -k -XPUT https://localhost:9200/" + index_name + "/_doc/" + doc_id + " -H Content-Type:application/json -d'{\"field1\":\"value1\"}'"])
+        return code == 0 and ('"_id":"' + doc_id + '"').encode() in output
+
+    def check_elastic_field_value(self, container_name, index_name, doc_id, field_name, field_value):
+        (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c",
+                                                                                      "curl -u elastic:password -k -XGET https://localhost:9200/" + index_name + "/_doc/" + doc_id])
+        return code == 0 and (field_name + '":"' + field_value).encode() in output
+
+    def elastic_generate_apikey(self, elastic_container_name):
+        (_, output) = self.container_communicator.execute_command(elastic_container_name, ["/bin/bash", "-c",
+                                                                                           "curl -u elastic:password -k -XPOST https://localhost:9200/_security/api_key -H Content-Type:application/json -d'{\"name\":\"my-api-key\",\"expiration\":\"1d\",\"role_descriptors\":{\"role-a\": {\"cluster\": [\"all\"],\"index\": [{\"names\": [\"my_index\"],\"privileges\": [\"all\"]}]}}}'"])
+        output_lines = output.splitlines()
+        result = json.loads(output_lines[-1])
+        return result["encoded"]
+
+    def add_elastic_user_to_opensearch(self, container_name):
+        (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c",
+                                                                                      'curl -u admin:admin -k -XPUT https://opensearch:9200/_plugins/_security/api/internalusers/elastic -H Content-Type:application/json -d\'{"password":"password","backend_roles":["admin"]}\''])
+        return code == 0 and '"status":"CREATED"'.encode() in output
diff --git a/docker/test/integration/minifi/core/MinifiWithHttpsC2Config.py b/docker/test/integration/cluster/checkers/GcsChecker.py
similarity index 55%
rename from docker/test/integration/minifi/core/MinifiWithHttpsC2Config.py
rename to docker/test/integration/cluster/checkers/GcsChecker.py
index 79a4d44b8..923c07936 100644
--- a/docker/test/integration/minifi/core/MinifiWithHttpsC2Config.py
+++ b/docker/test/integration/cluster/checkers/GcsChecker.py
@@ -12,10 +12,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from utils import retry_check
 
-from .MinifiContainer import MinifiContainer
 
+class GcsChecker:
+    def __init__(self, container_communicator):
+        self.container_communicator = container_communicator
 
-class MinifiWithHttpsC2Config(MinifiContainer):
-    def __init__(self, config_dir, name, vols, network, image_store, command=None):
-        super().__init__(config_dir, name, vols, network, image_store, command, engine='minifi-cpp-with-https-c2-config')
+    @retry_check()
+    def check_google_cloud_storage(self, gcs_container_name, content):
+        (code, _) = self.container_communicator.execute_command(gcs_container_name, ["grep", "-r", content, "/storage"])
+        return code == 0
+
+    @retry_check()
+    def is_gcs_bucket_empty(self, container_name):
+        (code, output) = self.container_communicator.execute_command(container_name, ["ls", "/storage/test-bucket"])
+        return code == 0 and output == ""
diff --git a/docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py b/docker/test/integration/cluster/checkers/PostgresChecker.py
similarity index 53%
rename from docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
rename to docker/test/integration/cluster/checkers/PostgresChecker.py
index dd8529b03..80b84e89f 100644
--- a/docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
+++ b/docker/test/integration/cluster/checkers/PostgresChecker.py
@@ -12,10 +12,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from utils import wait_for
 
-from .MinifiContainer import MinifiContainer
 
+class PostgresChecker:
+    def __init__(self, container_communicator):
+        self.container_communicator = container_communicator
 
-class MinifiWithProvenanceRepoContainer(MinifiContainer):
-    def __init__(self, config_dir, name, vols, network, image_store, command=None):
-        super().__init__(config_dir, name, vols, network, image_store, command, engine='minifi-cpp-with-provenance-repo')
+    def __query_postgres_server(self, postgresql_container_name, query, number_of_rows):
+        (code, output) = self.container_communicator.execute_command(postgresql_container_name, ["psql", "-U", "postgres", "-c", query])
+        return code == 0 and str(number_of_rows) + " rows" in output
+
+    def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
+        return wait_for(lambda: self.__query_postgres_server(postgresql_container_name, query, number_of_rows), timeout_seconds)
diff --git a/docker/test/integration/minifi/core/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py
similarity index 83%
rename from docker/test/integration/minifi/core/PrometheusChecker.py
rename to docker/test/integration/cluster/checkers/PrometheusChecker.py
index 14dc8eda6..8df4bf565 100644
--- a/docker/test/integration/minifi/core/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -1,5 +1,19 @@
-import time
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from prometheus_api_client import PrometheusConnect
+from utils import wait_for
 
 
 class PrometheusChecker:
@@ -7,20 +21,10 @@ class PrometheusChecker:
         self.prometheus_client = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
 
     def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            if self.verify_metric_class(metric_class):
-                return True
-            time.sleep(1)
-        return False
+        return wait_for(lambda: self.verify_metric_class(metric_class), timeout_seconds)
 
     def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            if self.verify_processor_metric(metric_class, processor_name):
-                return True
-            time.sleep(1)
-        return False
+        return wait_for(lambda: self.verify_processor_metric(metric_class, processor_name), timeout_seconds)
 
     def verify_processor_metric(self, metric_class, processor_name):
         if metric_class == "GetFileMetrics":
diff --git a/docker/test/integration/cluster/checkers/SplunkChecker.py b/docker/test/integration/cluster/checkers/SplunkChecker.py
new file mode 100644
index 000000000..8cc2e2758
--- /dev/null
+++ b/docker/test/integration/cluster/checkers/SplunkChecker.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from utils import retry_check
+
+
+class SplunkChecker:
+    def __init__(self, container_communicator):
+        self.container_communicator = container_communicator
+
+    @retry_check()
+    def check_splunk_event(self, container_name, query):
+        (code, output) = self.container_communicator.execute_command(container_name, ["sudo", "/opt/splunk/bin/splunk", "search", query, "-auth", "admin:splunkadmin"])
+        if code != 0:
+            return False
+        return query in output.decode("utf-8")
+
+    @retry_check()
+    def check_splunk_event_with_attributes(self, container_name, query, attributes):
+        (code, output) = self.container_communicator.execute_command(container_name, ["sudo", "/opt/splunk/bin/splunk", "search", query, "-output", "json", "-auth", "admin:splunkadmin"])
+        if code != 0:
+            return False
+        result_lines = output.splitlines()
+        for result_line in result_lines:
+            try:
+                result_line_json = json.loads(result_line)
+            except json.decoder.JSONDecodeError:
+                continue
+            if "result" not in result_line_json:
+                continue
+            if "host" in attributes:
+                if result_line_json["result"]["host"] != attributes["host"]:
+                    continue
+            if "source" in attributes:
+                if result_line_json["result"]["source"] != attributes["source"]:
+                    continue
+            if "sourcetype" in attributes:
+                if result_line_json["result"]["sourcetype"] != attributes["sourcetype"]:
+                    continue
+            if "index" in attributes:
+                if result_line_json["result"]["index"] != attributes["index"]:
+                    continue
+            return True
+        return False
+
+    def enable_splunk_hec_indexer(self, container_name, hec_name):
+        (code, _) = self.container_communicator.execute_command(container_name, ["sudo",
+                                                                                 "/opt/splunk/bin/splunk", "http-event-collector",
+                                                                                 "update", hec_name,
+                                                                                 "-uri", "https://localhost:8089",
+                                                                                 "-use-ack", "1",
+                                                                                 "-disabled", "0",
+                                                                                 "-auth", "admin:splunkadmin"])
+        return code == 0
+
+    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem):
+        assert self.container_communicator.write_content_to_container(splunk_cert_pem.decode() + splunk_key_pem.decode() + root_ca_cert_pem.decode(), container_name, '/opt/splunk/etc/auth/splunk_cert.pem')
+        assert self.container_communicator.write_content_to_container(root_ca_cert_pem.decode(), container_name, '/opt/splunk/etc/auth/root_ca.pem')
+        (code, _) = self.container_communicator.execute_command(container_name, ["sudo",
+                                                                                 "/opt/splunk/bin/splunk", "http-event-collector",
+                                                                                 "update",
+                                                                                 "-uri", "https://localhost:8089",
+                                                                                 "-enable-ssl", "1",
+                                                                                 "-server-cert", "/opt/splunk/etc/auth/splunk_cert.pem",
+                                                                                 "-ca-cert-file", "/opt/splunk/etc/auth/root_ca.pem",
+                                                                                 "-require-client-cert", "1",
+                                                                                 "-auth", "admin:splunkadmin"])
+        return code == 0
diff --git a/docker/test/integration/cluster/checkers/__init__.py b/docker/test/integration/cluster/checkers/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/docker/test/integration/minifi/core/AzureStorageServerContainer.py b/docker/test/integration/cluster/containers/AzureStorageServerContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/AzureStorageServerContainer.py
rename to docker/test/integration/cluster/containers/AzureStorageServerContainer.py
diff --git a/docker/test/integration/minifi/core/Container.py b/docker/test/integration/cluster/containers/Container.py
similarity index 73%
rename from docker/test/integration/minifi/core/Container.py
rename to docker/test/integration/cluster/containers/Container.py
index d027fd696..d1eb087c8 100644
--- a/docker/test/integration/minifi/core/Container.py
+++ b/docker/test/integration/cluster/containers/Container.py
@@ -17,7 +17,7 @@
 import docker
 import logging
 
-from .LogSource import LogSource
+from ..LogSource import LogSource
 
 
 class Container:
@@ -63,13 +63,25 @@ class Container:
         return LogSource.FROM_DOCKER_CONTAINER
 
     def stop(self):
-        raise NotImplementedError()
+        logging.info('Stopping docker container "%s"...', self.name)
+        self.client.containers.get(self.name).stop()
+        logging.info('Successfully stopped docker container "%s"', self.name)
+        self.deployed = False
 
     def kill(self):
-        raise NotImplementedError()
+        logging.info('Killing docker container "%s"...', self.name)
+        self.client.containers.get(self.name).kill()
+        logging.info('Successfully killed docker container "%s"', self.name)
+        self.deployed = False
 
     def restart(self):
-        raise NotImplementedError()
+        logging.info('Restarting docker container "%s"...', self.name)
+        self.client.containers.get(self.name).restart()
+        logging.info('Successfully restarted docker container "%s"', self.name)
+        self.deployed = True
 
     def get_startup_finished_log_entry(self):
         raise NotImplementedError()
+
+    def get_app_log(self):
+        raise NotImplementedError()
diff --git a/docker/test/integration/minifi/core/ElasticsearchContainer.py b/docker/test/integration/cluster/containers/ElasticsearchContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/ElasticsearchContainer.py
rename to docker/test/integration/cluster/containers/ElasticsearchContainer.py
diff --git a/docker/test/integration/minifi/core/HttpProxyContainer.py b/docker/test/integration/cluster/containers/FakeGcsServerContainer.py
similarity index 69%
copy from docker/test/integration/minifi/core/HttpProxyContainer.py
copy to docker/test/integration/cluster/containers/FakeGcsServerContainer.py
index 17ebed547..e3858a401 100644
--- a/docker/test/integration/minifi/core/HttpProxyContainer.py
+++ b/docker/test/integration/cluster/containers/FakeGcsServerContainer.py
@@ -12,29 +12,30 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 import logging
+import os
 from .Container import Container
 
 
-class HttpProxyContainer(Container):
+class FakeGcsServerContainer(Container):
     def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'http-proxy', vols, network, image_store, command)
+        super().__init__(name, 'fake-gcs-server', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
-        return "Accepting HTTP Socket connections at"
+        return "server started at http"
 
     def deploy(self):
         if not self.set_deployed():
             return
 
-        logging.info('Creating and running http-proxy docker container...')
+        logging.info('Creating and running google cloud storage server docker container...')
         self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
+            "fsouza/fake-gcs-server:latest",
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'3128/tcp': 3128},
-            entrypoint=self.command)
+            entrypoint=self.command,
+            ports={'4443/tcp': 4443},
+            volumes=[os.environ['TEST_DIRECTORY'] + "/resources/fake-gcs-server-data:/data"],
+            command='-scheme http -host fake-gcs-server')
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/FlowContainer.py b/docker/test/integration/cluster/containers/FlowContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/FlowContainer.py
rename to docker/test/integration/cluster/containers/FlowContainer.py
diff --git a/docker/test/integration/minifi/core/HttpProxyContainer.py b/docker/test/integration/cluster/containers/HttpProxyContainer.py
similarity index 100%
copy from docker/test/integration/minifi/core/HttpProxyContainer.py
copy to docker/test/integration/cluster/containers/HttpProxyContainer.py
diff --git a/docker/test/integration/minifi/core/KafkaBrokerContainer.py b/docker/test/integration/cluster/containers/KafkaBrokerContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/KafkaBrokerContainer.py
rename to docker/test/integration/cluster/containers/KafkaBrokerContainer.py
diff --git a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py b/docker/test/integration/cluster/containers/MinifiAsPodInKubernetesCluster.py
similarity index 83%
rename from docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
rename to docker/test/integration/cluster/containers/MinifiAsPodInKubernetesCluster.py
index 66bc734a0..b404a3234 100644
--- a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
+++ b/docker/test/integration/cluster/containers/MinifiAsPodInKubernetesCluster.py
@@ -19,7 +19,7 @@ import logging
 import os
 import shutil
 
-from .LogSource import LogSource
+from ..LogSource import LogSource
 from .MinifiContainer import MinifiContainer
 
 
@@ -27,26 +27,29 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
     MINIFI_IMAGE_NAME = 'apacheminificpp'
     MINIFI_IMAGE_TAG = 'docker_test'
 
-    def __init__(self, kubernetes_proxy, config_dir, name, vols, network, image_store, command=None):
-        super().__init__(config_dir, name, vols, network, image_store, command)
-        self.kubernetes_proxy = kubernetes_proxy
-
+    def __init__(self, kubernetes_proxy, config_dir, minifi_options, name, vols, network, image_store, command=None):
         test_dir = os.environ['TEST_DIRECTORY']
-        shutil.copy(os.path.join(test_dir, os.pardir, os.pardir, os.pardir, 'conf', 'minifi.properties'), self.config_dir)
-        shutil.copy(os.path.join(test_dir, 'resources', 'kubernetes', 'minifi-conf', 'minifi-log.properties'), self.config_dir)
+        shutil.copy(os.path.join(test_dir, os.pardir, os.pardir, os.pardir, 'conf', 'minifi.properties'), config_dir)
+        shutil.copy(os.path.join(test_dir, 'resources', 'kubernetes', 'minifi-conf', 'minifi-log.properties'), config_dir)
+        super().__init__(config_dir, minifi_options, name, vols, network, image_store, command)
+
+        self.kubernetes_proxy = kubernetes_proxy
 
         docker_client = docker.from_env()
         minifi_image = docker_client.images.get(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME + ':' + os.environ['MINIFI_VERSION'])
         minifi_image.tag(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
 
+    def _create_container_config_dir(self, config_dir):
+        return config_dir
+
     def deploy(self):
         if not self.set_deployed():
             return
 
         logging.info('Setting up container: %s', self.name)
 
-        self.kubernetes_proxy.create_helper_objects()
         self._create_config()
+        self.kubernetes_proxy.create_helper_objects()
         self.kubernetes_proxy.load_docker_image(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
         self.kubernetes_proxy.create_minifi_pod()
 
diff --git a/docker/test/integration/minifi/core/MinifiC2ServerContainer.py b/docker/test/integration/cluster/containers/MinifiC2ServerContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/MinifiC2ServerContainer.py
rename to docker/test/integration/cluster/containers/MinifiC2ServerContainer.py
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py
new file mode 100644
index 000000000..f5dde6a62
--- /dev/null
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import logging
+import uuid
+import shutil
+import copy
+from .FlowContainer import FlowContainer
+from minifi.flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer
+
+
+class MinifiOptions:
+    def __init__(self):
+        self.enable_c2 = False
+        self.enable_c2_with_ssl = False
+        self.enable_provenance = False
+        self.enable_prometheus = False
+        self.enable_sql = False
+
+
+class MinifiContainer(FlowContainer):
+    MINIFI_VERSION = os.environ['MINIFI_VERSION']
+    MINIFI_ROOT = '/opt/minifi/nifi-minifi-cpp-' + MINIFI_VERSION
+
+    def __init__(self, config_dir, options, name, vols, network, image_store, command=None):
+        if not command:
+            command = ["/bin/sh", "-c", "/opt/minifi/minifi-current/bin/minifi.sh run"]
+        self.options = options
+
+        super().__init__(config_dir, name, 'minifi-cpp', copy.copy(vols), network, image_store, command)
+        self.container_specific_config_dir = self._create_container_config_dir(self.config_dir)
+
+    def _create_container_config_dir(self, config_dir):
+        container_config_dir = os.path.join(config_dir, str(uuid.uuid4()))
+        os.makedirs(container_config_dir)
+        for file_name in os.listdir(config_dir):
+            source = os.path.join(config_dir, file_name)
+            destination = os.path.join(container_config_dir, file_name)
+            if os.path.isfile(source):
+                shutil.copy(source, destination)
+        return container_config_dir
+
+    def get_startup_finished_log_entry(self):
+        return "Starting Flow Controller"
+
+    def _create_config(self):
+        serializer = Minifi_flow_yaml_serializer()
+        test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers)
+        logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
+        with open(os.path.join(self.container_specific_config_dir, "config.yml"), 'wb') as config_file:
+            config_file.write(test_flow_yaml.encode('utf-8'))
+
+    def _create_properties(self):
+        properties_file_path = os.path.join(self.config_dir, 'minifi.properties')
+        with open(properties_file_path, 'a') as f:
+            if self.options.enable_c2:
+                f.write("nifi.c2.enable=true\n")
+                f.write("nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat\n")
+                f.write("nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge\n")
+                f.write("nifi.c2.flow.base.url=http://minifi-c2-server:10090/c2/config/\n")
+                f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n")
+                f.write("nifi.c2.full.heartbeat=false\n")
+                f.write("nifi.c2.agent.class=minifi-test-class\n")
+                f.write("nifi.c2.agent.identifier=minifi-test-id\n")
+            elif self.options.enable_c2_with_ssl:
+                f.write("nifi.c2.enable=true\n")
+                f.write("nifi.c2.rest.url=https://minifi-c2-server:10090/c2/config/heartbeat\n")
+                f.write("nifi.c2.rest.url.ack=https://minifi-c2-server:10090/c2/config/acknowledge\n")
+                f.write("nifi.c2.rest.ssl.context.service=SSLContextService\n")
+                f.write("nifi.c2.flow.base.url=https://minifi-c2-server:10090/c2/config/\n")
+                f.write("nifi.c2.full.heartbeat=false\n")
+                f.write("nifi.c2.agent.class=minifi-test-class\n")
+                f.write("nifi.c2.agent.identifier=minifi-test-id\n")
+
+            if not self.options.enable_provenance:
+                f.write("nifi.provenance.repository.class.name=NoOpRepository\n")
+
+            if self.options.enable_prometheus:
+                f.write("nifi.metrics.publisher.agent.identifier=Agent1\n")
+                f.write("nifi.metrics.publisher.class=PrometheusMetricsPublisher\n")
+                f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936\n")
+                f.write("nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus\n")
+
+    def _setup_config(self):
+        self._create_config()
+        self._create_properties()
+
+        self.vols[os.path.join(self.config_dir, 'minifi.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi.properties'), "mode": "rw"}
+        self.vols[os.path.join(self.container_specific_config_dir, 'config.yml')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'config.yml'), "mode": "rw"}
+        self.vols[os.path.join(self.config_dir, 'minifi-log.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi-log.properties'), "mode": "rw"}
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running minifi docker container...')
+        self._setup_config()
+
+        if self.options.enable_sql:
+            image = self.image_store.get_image('minifi-cpp-sql')
+        else:
+            image = 'apacheminificpp:' + MinifiContainer.MINIFI_VERSION
+
+        self.client.containers.run(
+            image,
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            entrypoint=self.command,
+            volumes=self.vols)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/MqttBrokerContainer.py b/docker/test/integration/cluster/containers/MqttBrokerContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/MqttBrokerContainer.py
rename to docker/test/integration/cluster/containers/MqttBrokerContainer.py
diff --git a/docker/test/integration/minifi/core/NifiContainer.py b/docker/test/integration/cluster/containers/NifiContainer.py
similarity index 96%
rename from docker/test/integration/minifi/core/NifiContainer.py
rename to docker/test/integration/cluster/containers/NifiContainer.py
index 0aadad37f..e2abbb455 100644
--- a/docker/test/integration/minifi/core/NifiContainer.py
+++ b/docker/test/integration/cluster/containers/NifiContainer.py
@@ -17,7 +17,7 @@
 import logging
 
 from .FlowContainer import FlowContainer
-from ..flow_serialization.Nifi_flow_xml_serializer import Nifi_flow_xml_serializer
+from minifi.flow_serialization.Nifi_flow_xml_serializer import Nifi_flow_xml_serializer
 import gzip
 import os
 
diff --git a/docker/test/integration/minifi/core/OPCUAServerContainer.py b/docker/test/integration/cluster/containers/OPCUAServerContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/OPCUAServerContainer.py
rename to docker/test/integration/cluster/containers/OPCUAServerContainer.py
diff --git a/docker/test/integration/minifi/core/OpensearchContainer.py b/docker/test/integration/cluster/containers/OpensearchContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/OpensearchContainer.py
rename to docker/test/integration/cluster/containers/OpensearchContainer.py
diff --git a/docker/test/integration/minifi/core/PostgreSQLServerContainer.py b/docker/test/integration/cluster/containers/PostgreSQLServerContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/PostgreSQLServerContainer.py
rename to docker/test/integration/cluster/containers/PostgreSQLServerContainer.py
diff --git a/docker/test/integration/minifi/core/PrometheusContainer.py b/docker/test/integration/cluster/containers/PrometheusContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/PrometheusContainer.py
rename to docker/test/integration/cluster/containers/PrometheusContainer.py
diff --git a/docker/test/integration/minifi/core/S3ServerContainer.py b/docker/test/integration/cluster/containers/S3ServerContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/S3ServerContainer.py
rename to docker/test/integration/cluster/containers/S3ServerContainer.py
diff --git a/docker/test/integration/minifi/core/SplunkContainer.py b/docker/test/integration/cluster/containers/SplunkContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/SplunkContainer.py
rename to docker/test/integration/cluster/containers/SplunkContainer.py
diff --git a/docker/test/integration/minifi/core/HttpProxyContainer.py b/docker/test/integration/cluster/containers/SyslogTcpClientContainer.py
similarity index 73%
copy from docker/test/integration/minifi/core/HttpProxyContainer.py
copy to docker/test/integration/cluster/containers/SyslogTcpClientContainer.py
index 17ebed547..699676f77 100644
--- a/docker/test/integration/minifi/core/HttpProxyContainer.py
+++ b/docker/test/integration/cluster/containers/SyslogTcpClientContainer.py
@@ -12,29 +12,26 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 import logging
 from .Container import Container
 
 
-class HttpProxyContainer(Container):
+class SyslogTcpClientContainer(Container):
     def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'http-proxy', vols, network, image_store, command)
+        super().__init__(name, 'syslog-tcp-client', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
-        return "Accepting HTTP Socket connections at"
+        return "Syslog TCP client started"
 
     def deploy(self):
         if not self.set_deployed():
             return
 
-        logging.info('Creating and running http-proxy docker container...')
+        logging.info('Creating and running a Syslog tcp client docker container...')
         self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
+            "ubuntu:20.04",
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'3128/tcp': 3128},
-            entrypoint=self.command)
+            entrypoint='/bin/bash -c "echo Syslog TCP client started; while true; do logger --tcp -n minifi-cpp-flow -P 514 sample_log; sleep 1; done"')
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/HttpProxyContainer.py b/docker/test/integration/cluster/containers/SyslogUdpClientContainer.py
similarity index 73%
copy from docker/test/integration/minifi/core/HttpProxyContainer.py
copy to docker/test/integration/cluster/containers/SyslogUdpClientContainer.py
index 17ebed547..f1f4a9bfd 100644
--- a/docker/test/integration/minifi/core/HttpProxyContainer.py
+++ b/docker/test/integration/cluster/containers/SyslogUdpClientContainer.py
@@ -12,29 +12,26 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 import logging
 from .Container import Container
 
 
-class HttpProxyContainer(Container):
+class SyslogUdpClientContainer(Container):
     def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'http-proxy', vols, network, image_store, command)
+        super().__init__(name, 'syslog-udp-client', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
-        return "Accepting HTTP Socket connections at"
+        return "Syslog UDP client started"
 
     def deploy(self):
         if not self.set_deployed():
             return
 
-        logging.info('Creating and running http-proxy docker container...')
+        logging.info('Creating and running a Syslog udp client docker container...')
         self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
+            "ubuntu:20.04",
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'3128/tcp': 3128},
-            entrypoint=self.command)
+            entrypoint='/bin/bash -c "echo Syslog UDP client started; while true; do logger --udp -n minifi-cpp-flow -P 514 sample_log; sleep 1; done"')
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/HttpProxyContainer.py b/docker/test/integration/cluster/containers/TcpClientContainer.py
similarity index 83%
rename from docker/test/integration/minifi/core/HttpProxyContainer.py
rename to docker/test/integration/cluster/containers/TcpClientContainer.py
index 17ebed547..c29c00fa2 100644
--- a/docker/test/integration/minifi/core/HttpProxyContainer.py
+++ b/docker/test/integration/cluster/containers/TcpClientContainer.py
@@ -12,29 +12,26 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 import logging
 from .Container import Container
 
 
-class HttpProxyContainer(Container):
+class TcpClientContainer(Container):
     def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'http-proxy', vols, network, image_store, command)
+        super().__init__(name, 'tcp-client', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
-        return "Accepting HTTP Socket connections at"
+        return "TCP client container started"
 
     def deploy(self):
         if not self.set_deployed():
             return
 
-        logging.info('Creating and running http-proxy docker container...')
+        logging.info('Creating and running a tcp client docker container...')
         self.client.containers.run(
             self.image_store.get_image(self.get_engine()),
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'3128/tcp': 3128},
             entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/ZookeeperContainer.py b/docker/test/integration/cluster/containers/ZookeeperContainer.py
similarity index 100%
rename from docker/test/integration/minifi/core/ZookeeperContainer.py
rename to docker/test/integration/cluster/containers/ZookeeperContainer.py
diff --git a/docker/test/integration/cluster/containers/__init__.py b/docker/test/integration/cluster/containers/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/docker/test/integration/environment.py b/docker/test/integration/environment.py
index ce1b5fd5a..8f245afd2 100644
--- a/docker/test/integration/environment.py
+++ b/docker/test/integration/environment.py
@@ -23,9 +23,9 @@ sys.path.append('../minifi')
 
 from MiNiFi_integration_test_driver import MiNiFi_integration_test  # noqa: E402
 from minifi import *  # noqa
-from minifi.core.ImageStore import ImageStore # noqa
-from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings # noqa
-from minifi.core.KubernetesProxy import KubernetesProxy # noqa
+from cluster.ImageStore import ImageStore # noqa
+from cluster.DockerTestDirectoryBindings import DockerTestDirectoryBindings # noqa
+from cluster.KubernetesProxy import KubernetesProxy # noqa
 
 
 def before_scenario(context, scenario):
diff --git a/docker/test/integration/features/attributes_to_json.feature b/docker/test/integration/features/attributes_to_json.feature
index fdbb17931..5d63f8fd4 100644
--- a/docker/test/integration/features/attributes_to_json.feature
+++ b/docker/test/integration/features/attributes_to_json.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Writing attribute data using AttributesToJSON processor
   Background:
     Given the content of "/tmp/output" is monitored
diff --git a/docker/test/integration/features/azure_storage.feature b/docker/test/integration/features/azure_storage.feature
index efded934e..c18e29be9 100644
--- a/docker/test/integration/features/azure_storage.feature
+++ b/docker/test/integration/features/azure_storage.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Sending data from MiNiFi-C++ to an Azure storage server
   In order to transfer data to interact with Azure servers
   As a user of MiNiFi
diff --git a/docker/test/integration/features/core_functionality.feature b/docker/test/integration/features/core_functionality.feature
index ce5ce667f..8802c8355 100644
--- a/docker/test/integration/features/core_functionality.feature
+++ b/docker/test/integration/features/core_functionality.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Core flow functionalities
   Test core flow configuration functionalities
 
@@ -32,12 +47,14 @@ Feature: Core flow functionalities
 
 
   Scenario: Processors are destructed when agent is stopped
-    Given a LogOnDestructionProcessor processor with the name "logOnDestruction" in the "transient-minifi" flow with engine "transient-minifi"
+    Given a transient MiNiFi flow with the name "transient-minifi" is set up
+    And a LogOnDestructionProcessor processor with the name "logOnDestruction" in the "transient-minifi" flow
     When the MiNiFi instance starts up
     Then the Minifi logs contain the following message: "LogOnDestructionProcessor is being destructed" in less than 100 seconds
 
   Scenario: Agent does not crash when using provenance repositories
-    Given a GenerateFlowFile processor with the name "generateFlowFile" in the "minifi-cpp-with-provenance-repo" flow with engine "minifi-cpp-with-provenance-repo"
+    Given a GenerateFlowFile processor with the name "generateFlowFile" in the "minifi-cpp-with-provenance-repo" flow
+    And the provenance repository is enabled in MiNiFi
     When the MiNiFi instance starts up
     Then the "minifi-cpp-with-provenance-repo" flow has a log line matching "MiNiFi started" in less than 30 seconds
 
diff --git a/docker/test/integration/features/defragtextflowfiles.feature b/docker/test/integration/features/defragtextflowfiles.feature
index 04220cce9..65fd31664 100644
--- a/docker/test/integration/features/defragtextflowfiles.feature
+++ b/docker/test/integration/features/defragtextflowfiles.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: DefragmentText can defragment fragmented data from TailFile
   Background:
     Given the content of "/tmp/output" is monitored
diff --git a/docker/test/integration/features/google_cloud_storage.feature b/docker/test/integration/features/google_cloud_storage.feature
index ad2b6f1fc..e69d5cd5c 100644
--- a/docker/test/integration/features/google_cloud_storage.feature
+++ b/docker/test/integration/features/google_cloud_storage.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Sending data to Google Cloud Storage using PutGCSObject
 
   Background:
diff --git a/docker/test/integration/features/hashcontent.feature b/docker/test/integration/features/hashcontent.feature
index bc7a0a513..e25545657 100644
--- a/docker/test/integration/features/hashcontent.feature
+++ b/docker/test/integration/features/hashcontent.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Hash value is added to Flowfiles by HashContent processor
   In order to avoid duplication of content of Flowfiles
   As a user of MiNiFi
diff --git a/docker/test/integration/features/http.feature b/docker/test/integration/features/http.feature
index 62844dcaa..1acb19efc 100644
--- a/docker/test/integration/features/http.feature
+++ b/docker/test/integration/features/http.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
   In order to send and receive data via HTTP
   As a user of MiNiFi
diff --git a/docker/test/integration/features/https.feature b/docker/test/integration/features/https.feature
index c07127467..8997ed929 100644
--- a/docker/test/integration/features/https.feature
+++ b/docker/test/integration/features/https.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Using SSL context service to send data with TLS
   In order to send data via HTTPS
   As a user of MiNiFi
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
index fb512c88c..40a83cfe9 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Sending data to using Kafka streaming platform using PublishKafka
   In order to send data to a Kafka stream
   As a user of MiNiFi
diff --git a/docker/test/integration/features/minifi_c2_server.feature b/docker/test/integration/features/minifi_c2_server.feature
index f9bc3f5dd..2a32260f2 100644
--- a/docker/test/integration/features/minifi_c2_server.feature
+++ b/docker/test/integration/features/minifi_c2_server.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: MiNiFi can communicate with Apache NiFi MiNiFi C2 server
 
   Background:
@@ -5,6 +20,7 @@ Feature: MiNiFi can communicate with Apache NiFi MiNiFi C2 server
 
   Scenario: MiNiFi flow config is updated from MiNiFi C2 server
     Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/non-existent"
+    And C2 is enabled in MiNiFi
     And a file with the content "test" is present in "/tmp/input"
     And a MiNiFi C2 server is set up
     When all instances start up
diff --git a/docker/test/integration/features/network_listener.feature b/docker/test/integration/features/network_listener.feature
index 63de76f98..ffefaa466 100644
--- a/docker/test/integration/features/network_listener.feature
+++ b/docker/test/integration/features/network_listener.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Minifi C++ can act as a network listener
 
   Background:
diff --git a/docker/test/integration/features/prometheus.feature b/docker/test/integration/features/prometheus.feature
index e1af51ede..c98946ee4 100644
--- a/docker/test/integration/features/prometheus.feature
+++ b/docker/test/integration/features/prometheus.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: MiNiFi can publish metrics to Prometheus server
 
   Background:
@@ -8,6 +23,7 @@ Feature: MiNiFi can publish metrics to Prometheus server
     And a file with the content "test" is present in "/tmp/input"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile1 processor is connected to the PutFile
+    And Prometheus is enabled in MiNiFi
     And a Prometheus server is set up
     When all instances start up
     Then "RepositoryMetrics" is published to the Prometheus server in less than 60 seconds
@@ -28,6 +44,7 @@ Feature: MiNiFi can publish metrics to Prometheus server
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile1 processor is connected to the PutFile
     And the "success" relationship of the GetFile2 processor is connected to the PutFile
+    And Prometheus is enabled in MiNiFi
     And a Prometheus server is set up
     When all instances start up
     Then "GetFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "GetFile1" processor
diff --git a/docker/test/integration/features/syslog_listener.feature b/docker/test/integration/features/syslog_listener.feature
index 5683728f6..7ffe70e9a 100644
--- a/docker/test/integration/features/syslog_listener.feature
+++ b/docker/test/integration/features/syslog_listener.feature
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 Feature: Minifi C++ can act as a syslog listener
 
   Background:
diff --git a/docker/test/integration/minifi/core/FileSystemObserver.py b/docker/test/integration/filesystem_validation/FileSystemObserver.py
similarity index 100%
rename from docker/test/integration/minifi/core/FileSystemObserver.py
rename to docker/test/integration/filesystem_validation/FileSystemObserver.py
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/filesystem_validation/OutputEventHandler.py
similarity index 98%
rename from docker/test/integration/minifi/core/OutputEventHandler.py
rename to docker/test/integration/filesystem_validation/OutputEventHandler.py
index f4d570298..6a9309ab4 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/filesystem_validation/OutputEventHandler.py
@@ -17,7 +17,7 @@
 import logging
 import threading
 import os
-from .utils import is_temporary_output_file
+from utils import is_temporary_output_file
 
 from watchdog.events import FileSystemEventHandler
 
diff --git a/docker/test/integration/filesystem_validation/__init__.py b/docker/test/integration/filesystem_validation/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py b/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py
index 7923a4abb..99592bf2a 100644
--- a/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py
+++ b/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.ControllerService import ControllerService
 
 
diff --git a/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py b/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py
index 56b260f26..18428e6ec 100644
--- a/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py
+++ b/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.ControllerService import ControllerService
 
 
diff --git a/docker/test/integration/minifi/core/Cluster.py b/docker/test/integration/minifi/core/Cluster.py
deleted file mode 100644
index 76c79df0d..000000000
--- a/docker/test/integration/minifi/core/Cluster.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class Cluster(object):
-    """
-    Base Cluster class. This is intended to be a generic interface
-    to different types of clusters. Clusters could be Kubernetes clusters,
-    Docker swarms, or cloud compute/container services.
-    """
-
-    def deploy_flow(self, name=None):
-        """
-        Deploys a flow to the cluster.
-        """
-
-    def stop_flow(self, container_name):
-        """
-        Stops a flow in the cluster.
-        """
-
-    def kill_flow(self, container_name):
-        """
-        Kills (ungracefully stops) a flow in the cluster.
-        """
-
-    def restart_flow(self, container_name):
-        """
-        Stops a flow in the cluster.
-        """
-
-    def __enter__(self):
-        """
-        Allocate ephemeral cluster resources.
-        """
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """
-        Clean up ephemeral cluster resources.
-        """
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
deleted file mode 100644
index 0d03f8037..000000000
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ /dev/null
@@ -1,347 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import json
-import logging
-import sys
-import time
-import os
-import re
-import tarfile
-import io
-import tempfile
-
-from .LogSource import LogSource
-from .SingleNodeDockerCluster import SingleNodeDockerCluster
-from .PrometheusChecker import PrometheusChecker
-from .utils import retry_check, get_peak_memory_usage, get_minifi_pid, get_memory_usage
-from azure.storage.blob import BlobServiceClient
-from azure.core.exceptions import ResourceExistsError
-
-
-class DockerTestCluster(SingleNodeDockerCluster):
-    AZURE_CONNECTION_STRING = \
-        ("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
-         "BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;")
-
-    def __init__(self, context):
-        super(DockerTestCluster, self).__init__(context)
-        self.segfault = False
-
-    @staticmethod
-    def get_stdout_encoding():
-        # Use UTF-8 both when sys.stdout present but set to None (explicitly piped output
-        # and also some CI such as GitHub Actions).
-        encoding = getattr(sys.stdout, "encoding", None)
-        if encoding is None:
-            encoding = "utf8"
-        return encoding
-
-    def get_app_log(self, container_name):
-        log_source = self.containers[container_name].log_source()
-        if log_source == LogSource.FROM_DOCKER_CONTAINER:
-            return self.__get_app_log_from_docker_container(container_name)
-        elif log_source == LogSource.FROM_GET_APP_LOG_METHOD:
-            return self.containers[container_name].get_app_log()
-        else:
-            raise Exception("Unexpected log source '%s'" % log_source)
-
-    def __get_app_log_from_docker_container(self, container_name):
-        try:
-            container = self.client.containers.get(container_name)
-        except Exception:
-            return 'not started', None
-
-        if b'Segmentation fault' in container.logs():
-            logging.warning('Container segfaulted: %s', container.name)
-            self.segfault = True
-
-        return container.status, container.logs()
-
-    def __wait_for_app_logs_impl(self, container_name, log_entry, timeout_seconds, count, use_regex):
-        wait_start_time = time.perf_counter()
-        while (time.perf_counter() - wait_start_time) < timeout_seconds:
-            logging.info('Waiting for app-logs `%s` in container `%s`', log_entry, container_name)
-            status, logs = self.get_app_log(container_name)
-            if logs is not None:
-                if not use_regex and logs.decode("utf-8").count(log_entry) >= count:
-                    return True
-                elif use_regex and len(re.findall(log_entry, logs.decode("utf-8"))) >= count:
-                    return True
-            elif status == 'exited':
-                return False
-            time.sleep(1)
-        return False
-
-    def wait_for_app_logs_regex(self, container_name, log_entry, timeout_seconds, count=1):
-        return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, True)
-
-    def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1):
-        return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, False)
-
-    def wait_for_startup_log(self, container_name, timeout_seconds):
-        return self.wait_for_app_logs_regex(container_name, self.containers[container_name].get_startup_finished_log_entry(), timeout_seconds, 1)
-
-    def log_app_output(self):
-        for container_name in self.containers:
-            _, logs = self.get_app_log(container_name)
-            if logs is not None:
-                logging.info("Logs of container '%s':", container_name)
-                for line in logs.decode("utf-8").splitlines():
-                    logging.info(line)
-
-    def check_http_proxy_access(self, container_name, url):
-        (code, output) = self.client.containers.get(container_name).exec_run(["cat", "/var/log/squid/access.log"])
-        output = output.decode(self.get_stdout_encoding())
-        return code == 0 and url in output \
-            and ((output.count("TCP_DENIED") != 0
-                 and output.count("TCP_MISS") >= output.count("TCP_DENIED"))
-                 or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output)
-
-    @retry_check()
-    def check_s3_server_object_data(self, container_name, test_data):
-        (code, output) = self.client.containers.get(container_name).exec_run(["find", "/tmp/", "-type", "d", "-name", "s3mock*"])
-        if code != 0:
-            return False
-        s3_mock_dir = output.decode(self.get_stdout_encoding()).strip()
-        (code, output) = self.client.containers.get(container_name).exec_run(["cat", s3_mock_dir + "/test_bucket/test_object_key/fileData"])
-        file_data = output.decode(self.get_stdout_encoding())
-        return code == 0 and file_data == test_data
-
-    @retry_check()
-    def check_s3_server_object_metadata(self, container_name, content_type="application/octet-stream", metadata=dict()):
-        (code, output) = self.client.containers.get(container_name).exec_run(["find", "/tmp/", "-type", "d", "-name", "s3mock*"])
-        if code != 0:
-            return False
-        s3_mock_dir = output.decode(self.get_stdout_encoding()).strip()
-        (code, output) = self.client.containers.get(container_name).exec_run(["cat", s3_mock_dir + "/test_bucket/test_object_key/metadata"])
-        server_metadata = json.loads(output.decode(self.get_stdout_encoding()))
-        return code == 0 and server_metadata["contentType"] == content_type and metadata == server_metadata["userMetadata"]
-
-    @retry_check()
-    def check_azure_storage_server_data(self, container_name, test_data):
-        (code, output) = self.client.containers.get(container_name).exec_run(["find", "/data/__blobstorage__", "-type", "f"])
-        if code != 0:
-            return False
-        data_file = output.decode(self.get_stdout_encoding()).strip()
-        (code, output) = self.client.containers.get(container_name).exec_run(["cat", data_file])
-        file_data = output.decode(self.get_stdout_encoding())
-        return code == 0 and test_data in file_data
-
-    def add_test_blob(self, blob_name, content="", with_snapshot=False):
-        blob_service_client = BlobServiceClient.from_connection_string(DockerTestCluster.AZURE_CONNECTION_STRING)
-        try:
-            blob_service_client.create_container("test-container")
-        except ResourceExistsError:
-            logging.debug('test-container already exists')
-
-        blob_client = blob_service_client.get_blob_client(container="test-container", blob=blob_name)
-        blob_client.upload_blob(content)
-
-        if with_snapshot:
-            blob_client.create_snapshot()
-
-    def get_blob_and_snapshot_count(self):
-        blob_service_client = BlobServiceClient.from_connection_string(DockerTestCluster.AZURE_CONNECTION_STRING)
-        container_client = blob_service_client.get_container_client("test-container")
-        return len(list(container_client.list_blobs(include=['deleted'])))
-
-    def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_seconds):
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            if self.get_blob_and_snapshot_count() == blob_and_snapshot_count:
-                return True
-            time.sleep(1)
-        return False
-
-    def is_azure_blob_storage_empty(self):
-        return self.get_blob_and_snapshot_count() == 0
-
-    def check_azure_blob_storage_is_empty(self, timeout_seconds):
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            if self.is_azure_blob_storage_empty():
-                return True
-            time.sleep(1)
-        return False
-
-    @retry_check()
-    def is_s3_bucket_empty(self, container_name):
-        (code, output) = self.client.containers.get(container_name).exec_run(["find", "/tmp/", "-type", "d", "-name", "s3mock*"])
-        if code != 0:
-            return False
-        s3_mock_dir = output.decode(self.get_stdout_encoding()).strip()
-        (code, output) = self.client.containers.get(container_name).exec_run(["ls", s3_mock_dir + "/test_bucket/"])
-        ls_result = output.decode(self.get_stdout_encoding())
-        return code == 0 and not ls_result
-
-    @retry_check()
-    def check_splunk_event(self, container_name, query):
-        (code, output) = self.client.containers.get(container_name).exec_run(["sudo", "/opt/splunk/bin/splunk", "search", query, "-auth", "admin:splunkadmin"])
-        if code != 0:
-            return False
-        return query in output.decode("utf-8")
-
-    @retry_check()
-    def check_splunk_event_with_attributes(self, container_name, query, attributes):
-        (code, output) = self.client.containers.get(container_name).exec_run(["sudo", "/opt/splunk/bin/splunk", "search", query, "-output", "json", "-auth", "admin:splunkadmin"])
-        if code != 0:
-            return False
-        result_str = output.decode("utf-8")
-        result_lines = result_str.splitlines()
-        for result_line in result_lines:
-            try:
-                result_line_json = json.loads(result_line)
-            except json.decoder.JSONDecodeError:
-                continue
-            if "result" not in result_line_json:
-                continue
-            if "host" in attributes:
-                if result_line_json["result"]["host"] != attributes["host"]:
-                    continue
-            if "source" in attributes:
-                if result_line_json["result"]["source"] != attributes["source"]:
-                    continue
-            if "sourcetype" in attributes:
-                if result_line_json["result"]["sourcetype"] != attributes["sourcetype"]:
-                    continue
-            if "index" in attributes:
-                if result_line_json["result"]["index"] != attributes["index"]:
-                    continue
-            return True
-        return False
-
-    def enable_splunk_hec_indexer(self, container_name, hec_name):
-        (code, output) = self.client.containers.get(container_name).exec_run(["sudo",
-                                                                              "/opt/splunk/bin/splunk", "http-event-collector",
-                                                                              "update", hec_name,
-                                                                              "-uri", "https://localhost:8089",
-                                                                              "-use-ack", "1",
-                                                                              "-disabled", "0",
-                                                                              "-auth", "admin:splunkadmin"])
-        return code == 0
-
-    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem):
-        assert self.write_content_to_container(splunk_cert_pem.decode() + splunk_key_pem.decode() + root_ca_cert_pem.decode(), dst=container_name + ':/opt/splunk/etc/auth/splunk_cert.pem')
-        assert self.write_content_to_container(root_ca_cert_pem.decode(), dst=container_name + ':/opt/splunk/etc/auth/root_ca.pem')
-        (code, output) = self.client.containers.get(container_name).exec_run(["sudo",
-                                                                              "/opt/splunk/bin/splunk", "http-event-collector",
-                                                                              "update",
-                                                                              "-uri", "https://localhost:8089",
-                                                                              "-enable-ssl", "1",
-                                                                              "-server-cert", "/opt/splunk/etc/auth/splunk_cert.pem",
-                                                                              "-ca-cert-file", "/opt/splunk/etc/auth/root_ca.pem",
-                                                                              "-require-client-cert", "1",
-                                                                              "-auth", "admin:splunkadmin"])
-        return code == 0
-
-    @retry_check()
-    def check_google_cloud_storage(self, gcs_container_name, content):
-        (code, _) = self.client.containers.get(gcs_container_name).exec_run(["grep", "-r", content, "/storage"])
-        return code == 0
-
-    @retry_check()
-    def is_gcs_bucket_empty(self, container_name):
-        (code, output) = self.client.containers.get(container_name).exec_run(["ls", "/storage/test-bucket"])
-        return code == 0 and output == b''
-
-    def is_elasticsearch_empty(self, container_name):
-        (code, output) = self.client.containers.get(container_name).exec_run(["curl", "-u", "elastic:password", "-k", "-XGET", "https://localhost:9200/_search"])
-        return code == 0 and b'"hits":[]' in output
-
-    def create_doc_elasticsearch(self, container_name, index_name, doc_id):
-        (code, output) = self.client.containers.get(container_name).exec_run(["/bin/bash", "-c",
-                                                                              "curl -u elastic:password -k -XPUT https://localhost:9200/" + index_name + "/_doc/" + doc_id + " -H Content-Type:application/json -d'{\"field1\":\"value1\"}'"])
-        return code == 0 and ('"_id":"' + doc_id + '"').encode() in output
-
-    def check_elastic_field_value(self, container_name, index_name, doc_id, field_name, field_value):
-        (code, output) = self.client.containers.get(container_name).exec_run(["/bin/bash", "-c",
-                                                                              "curl -u elastic:password -k -XGET https://localhost:9200/" + index_name + "/_doc/" + doc_id])
-        return code == 0 and (field_name + '":"' + field_value).encode() in output
-
-    def elastic_generate_apikey(self, elastic_container_name):
-        (code, output) = self.client.containers.get(elastic_container_name).exec_run(["/bin/bash", "-c",
-                                                                                      "curl -u elastic:password -k -XPOST https://localhost:9200/_security/api_key -H Content-Type:application/json -d'{\"name\":\"my-api-key\",\"expiration\":\"1d\",\"role_descriptors\":{\"role-a\": {\"cluster\": [\"all\"],\"index\": [{\"names\": [\"my_index\"],\"privileges\": [\"all\"]}]}}}'"])
-        output = output.decode(self.get_stdout_encoding())
-        output_lines = output.splitlines()
-        result = json.loads(output_lines[-1])
-        return result["encoded"]
-
-    def add_elastic_user_to_opensearch(self, container_name):
-        (code, output) = self.client.containers.get(container_name).exec_run(["/bin/bash", "-c",
-                                                                              'curl -u admin:admin -k -XPUT https://opensearch:9200/_plugins/_security/api/internalusers/elastic -H Content-Type:application/json -d\'{"password":"password","backend_roles":["admin"]}\''])
-        return code == 0 and '"status":"CREATED"'.encode() in output
-
-    def query_postgres_server(self, postgresql_container_name, query, number_of_rows):
-        (code, output) = self.client.containers.get(postgresql_container_name).exec_run(["psql", "-U", "postgres", "-c", query])
-        output = output.decode(self.get_stdout_encoding())
-        return code == 0 and str(number_of_rows) + " rows" in output
-
-    def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            if self.query_postgres_server(postgresql_container_name, query, number_of_rows):
-                return True
-            time.sleep(2)
-        return False
-
-    def segfault_happened(self):
-        return self.segfault
-
-    def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name):
-        return self.wait_for_app_logs(kafka_container_name, "Assignment received from leader for group docker_test_group", 60)
-
-    def write_content_to_container(self, content, dst):
-        container_name, dst_path = dst.split(':')
-        container = self.client.containers.get(container_name)
-        with tempfile.TemporaryDirectory() as td:
-            with tarfile.open(os.path.join(td, 'content.tar'), mode='w') as tar:
-                info = tarfile.TarInfo(name=os.path.basename(dst_path))
-                info.size = len(content)
-                tar.addfile(info, io.BytesIO(content.encode('utf-8')))
-            with open(os.path.join(td, 'content.tar'), 'rb') as data:
-                return container.put_archive(os.path.dirname(dst_path), data.read())
-
-    def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
-        return PrometheusChecker().wait_for_metric_class_on_prometheus(metric_class, timeout_seconds)
-
-    def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
-        return PrometheusChecker().wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
-
-    def wait_for_peak_memory_usage_to_exceed(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> bool:
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            current_peak_memory_usage = get_peak_memory_usage(get_minifi_pid())
-            if current_peak_memory_usage is None:
-                logging.warning("Failed to determine peak memory usage")
-                return False
-            if current_peak_memory_usage > minimum_peak_memory_usage:
-                return True
-            time.sleep(1)
-        logging.warning(f"Peak memory usage ({current_peak_memory_usage}) didnt exceed minimum asserted peak memory usage {minimum_peak_memory_usage}")
-        return False
-
-    def wait_for_memory_usage_to_drop_below(self, max_memory_usage: int, timeout_seconds: int) -> bool:
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            current_memory_usage = get_memory_usage(get_minifi_pid())
-            if current_memory_usage is None:
-                logging.warning("Failed to determine memory usage")
-                return False
-            if current_memory_usage < max_memory_usage:
-                return True
-            current_memory_usage = get_memory_usage(get_minifi_pid())
-            time.sleep(1)
-        logging.warning(f"Memory usage ({current_memory_usage}) is more than the maximum asserted memory usage ({max_memory_usage})")
-        return False
diff --git a/docker/test/integration/minifi/core/FakeGcsServerContainer.py b/docker/test/integration/minifi/core/FakeGcsServerContainer.py
deleted file mode 100644
index 41af00406..000000000
--- a/docker/test/integration/minifi/core/FakeGcsServerContainer.py
+++ /dev/null
@@ -1,27 +0,0 @@
-import logging
-import os
-from .Container import Container
-
-
-class FakeGcsServerContainer(Container):
-    def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'fake-gcs-server', vols, network, image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "server started at http"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running google cloud storage server docker container...')
-        self.client.containers.run(
-            "fsouza/fake-gcs-server:latest",
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            entrypoint=self.command,
-            ports={'4443/tcp': 4443},
-            volumes=[os.environ['TEST_DIRECTORY'] + "/resources/fake-gcs-server-data:/data"],
-            command='-scheme http -host fake-gcs-server')
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
deleted file mode 100644
index 8bd376ca5..000000000
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import logging
-from .FlowContainer import FlowContainer
-from ..flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer
-
-
-class MinifiContainer(FlowContainer):
-    MINIFI_VERSION = os.environ['MINIFI_VERSION']
-    MINIFI_ROOT = '/opt/minifi/nifi-minifi-cpp-' + MINIFI_VERSION
-
-    def __init__(self, config_dir, name, vols, network, image_store, command=None, engine='minifi-cpp'):
-        if not command:
-            command = ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml " + MinifiContainer.MINIFI_ROOT + "/conf && /opt/minifi/minifi-current/bin/minifi.sh run"]
-        super().__init__(config_dir, name, engine, vols, network, image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "Starting Flow Controller"
-
-    def _create_config(self):
-        serializer = Minifi_flow_yaml_serializer()
-        test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers)
-        logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
-        with open(os.path.join(self.config_dir, "config.yml"), 'wb') as config_file:
-            config_file.write(test_flow_yaml.encode('utf-8'))
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running minifi docker container...')
-        self._create_config()
-
-        self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            entrypoint=self.command,
-            volumes=self.vols)
-        logging.info('Added container \'%s\'', self.name)
-
-    def stop(self):
-        logging.info('Stopping minifi docker container "%s"...', self.name)
-        self.client.containers.get(self.name).stop()
-        logging.info('Successfully stopped minifi docker container "%s"', self.name)
-        self.deployed = False
-
-    def kill(self):
-        logging.info('Killing minifi docker container "%s"...', self.name)
-        self.client.containers.get(self.name).kill()
-        logging.info('Successfully killed minifi docker container "%s"', self.name)
-        self.deployed = False
-
-    def restart(self):
-        logging.info('Restarting minifi docker container "%s"...', self.name)
-        self.client.containers.get(self.name).restart()
-        logging.info('Successfully restarted minifi docker container "%s"', self.name)
-        self.deployed = True
diff --git a/docker/test/integration/minifi/core/SyslogTcpClientContainer.py b/docker/test/integration/minifi/core/SyslogTcpClientContainer.py
deleted file mode 100644
index 648cc7f66..000000000
--- a/docker/test/integration/minifi/core/SyslogTcpClientContainer.py
+++ /dev/null
@@ -1,23 +0,0 @@
-import logging
-from .Container import Container
-
-
-class SyslogTcpClientContainer(Container):
-    def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'syslog-tcp-client', vols, network, image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "Syslog TCP client started"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running a Syslog tcp client docker container...')
-        self.client.containers.run(
-            "ubuntu:20.04",
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            entrypoint='/bin/bash -c "echo Syslog TCP client started; while true; do logger --tcp -n minifi-cpp-flow -P 514 sample_log; sleep 1; done"')
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/SyslogUdpClientContainer.py b/docker/test/integration/minifi/core/SyslogUdpClientContainer.py
deleted file mode 100644
index f765dfeda..000000000
--- a/docker/test/integration/minifi/core/SyslogUdpClientContainer.py
+++ /dev/null
@@ -1,23 +0,0 @@
-import logging
-from .Container import Container
-
-
-class SyslogUdpClientContainer(Container):
-    def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'syslog-udp-client', vols, network, image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "Syslog UDP client started"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running a Syslog udp client docker container...')
-        self.client.containers.run(
-            "ubuntu:20.04",
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            entrypoint='/bin/bash -c "echo Syslog UDP client started; while true; do logger --udp -n minifi-cpp-flow -P 514 sample_log; sleep 1; done"')
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/TcpClientContainer.py b/docker/test/integration/minifi/core/TcpClientContainer.py
deleted file mode 100644
index 0287263af..000000000
--- a/docker/test/integration/minifi/core/TcpClientContainer.py
+++ /dev/null
@@ -1,23 +0,0 @@
-import logging
-from .Container import Container
-
-
-class TcpClientContainer(Container):
-    def __init__(self, name, vols, network, image_store, command=None):
-        super().__init__(name, 'tcp-client', vols, network, image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "TCP client container started"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running a tcp client docker container...')
-        self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            entrypoint=self.command)
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/TransientMinifiContainer.py b/docker/test/integration/minifi/core/TransientMinifiContainer.py
deleted file mode 100644
index eb8fce447..000000000
--- a/docker/test/integration/minifi/core/TransientMinifiContainer.py
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from .MinifiContainer import MinifiContainer
-
-
-class TransientMinifiContainer(MinifiContainer):
-    def __init__(self, config_dir, name, vols, network, image_store, command=None):
-        if not command:
-            command = ["/bin/sh", "-c",
-                       "cp /tmp/minifi_config/config.yml ./conf/ && ./bin/minifi.sh start && sleep 10 && ./bin/minifi.sh stop && sleep 100"]
-        super().__init__(config_dir, name, vols, network, image_store, command)
diff --git a/docker/test/integration/minifi/processors/DeleteGCSObject.py b/docker/test/integration/minifi/processors/DeleteGCSObject.py
index f816c6f93..90d304212 100644
--- a/docker/test/integration/minifi/processors/DeleteGCSObject.py
+++ b/docker/test/integration/minifi/processors/DeleteGCSObject.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/processors/FetchGCSObject.py b/docker/test/integration/minifi/processors/FetchGCSObject.py
index 557e56517..176cf687b 100644
--- a/docker/test/integration/minifi/processors/FetchGCSObject.py
+++ b/docker/test/integration/minifi/processors/FetchGCSObject.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/processors/ListAzureBlobStorage.py b/docker/test/integration/minifi/processors/ListAzureBlobStorage.py
index 801241686..f28801481 100644
--- a/docker/test/integration/minifi/processors/ListAzureBlobStorage.py
+++ b/docker/test/integration/minifi/processors/ListAzureBlobStorage.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/processors/ListGCSBucket.py b/docker/test/integration/minifi/processors/ListGCSBucket.py
index 5c66254fd..5e3ed024e 100644
--- a/docker/test/integration/minifi/processors/ListGCSBucket.py
+++ b/docker/test/integration/minifi/processors/ListGCSBucket.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/processors/ListenSyslog.py b/docker/test/integration/minifi/processors/ListenSyslog.py
index b590ca402..5d805f7c8 100644
--- a/docker/test/integration/minifi/processors/ListenSyslog.py
+++ b/docker/test/integration/minifi/processors/ListenSyslog.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/processors/ListenTCP.py b/docker/test/integration/minifi/processors/ListenTCP.py
index 4781ca9ee..8e737be3e 100644
--- a/docker/test/integration/minifi/processors/ListenTCP.py
+++ b/docker/test/integration/minifi/processors/ListenTCP.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/processors/LogOnDestructionProcessor.py b/docker/test/integration/minifi/processors/LogOnDestructionProcessor.py
index 79c1dafa6..52e20bc67 100644
--- a/docker/test/integration/minifi/processors/LogOnDestructionProcessor.py
+++ b/docker/test/integration/minifi/processors/LogOnDestructionProcessor.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/processors/PutGCSObject.py b/docker/test/integration/minifi/processors/PutGCSObject.py
index 3dd5c7c51..f4ffdddac 100644
--- a/docker/test/integration/minifi/processors/PutGCSObject.py
+++ b/docker/test/integration/minifi/processors/PutGCSObject.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from ..core.Processor import Processor
 
 
diff --git a/docker/test/integration/minifi/validators/FileOutputValidator.py b/docker/test/integration/minifi/validators/FileOutputValidator.py
index 390a520f3..f4d59003d 100644
--- a/docker/test/integration/minifi/validators/FileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/FileOutputValidator.py
@@ -19,7 +19,7 @@ import os
 
 from os import listdir
 from os.path import join
-from ..core.utils import is_temporary_output_file
+from utils import is_temporary_output_file
 
 from .OutputValidator import OutputValidator
 
diff --git a/docker/test/integration/minifi/validators/MultiFileOutputValidator.py b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
index 76547b024..cbe454788 100644
--- a/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
@@ -19,7 +19,7 @@ import os
 
 from os import listdir
 from os.path import join
-from ..core.utils import is_temporary_output_file
+from utils import is_temporary_output_file
 
 from .FileOutputValidator import FileOutputValidator
 
diff --git a/docker/test/integration/minifi/validators/SingleJSONFileOutputValidator.py b/docker/test/integration/minifi/validators/SingleJSONFileOutputValidator.py
index a84377537..438a0d888 100644
--- a/docker/test/integration/minifi/validators/SingleJSONFileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/SingleJSONFileOutputValidator.py
@@ -19,7 +19,7 @@ import os
 import json
 
 from .FileOutputValidator import FileOutputValidator
-from ..core.utils import is_temporary_output_file
+from utils import is_temporary_output_file
 
 
 class SingleJSONFileOutputValidator(FileOutputValidator):
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/minifi.test-pod.yml b/docker/test/integration/resources/kubernetes/pods-etc/minifi.test-pod.yml
index c6f980647..deec73f56 100644
--- a/docker/test/integration/resources/kubernetes/pods-etc/minifi.test-pod.yml
+++ b/docker/test/integration/resources/kubernetes/pods-etc/minifi.test-pod.yml
@@ -27,7 +27,7 @@ spec:
       path: /var/log/pods
   - name: tmp-minifi-config
     hostPath:
-      path: /tmp/minifi_config
+      path: /tmp/kubernetes_config
   - name: tmp-output
     hostPath:
       path: /tmp/output
diff --git a/docker/test/integration/resources/minifi/minifi-log.properties b/docker/test/integration/resources/minifi/minifi-log.properties
new file mode 100644
index 000000000..88b580cf5
--- /dev/null
+++ b/docker/test/integration/resources/minifi/minifi-log.properties
@@ -0,0 +1,4 @@
+spdlog.pattern=[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v
+appender.stderr=stderr
+logger.root=TRACE,stderr
+logger.org::apache::nifi::minifi=TRACE,stderr
diff --git a/docker/test/integration/resources/minifi/minifi.properties b/docker/test/integration/resources/minifi/minifi.properties
new file mode 100644
index 000000000..22cd584b4
--- /dev/null
+++ b/docker/test/integration/resources/minifi/minifi.properties
@@ -0,0 +1,17 @@
+nifi.flow.configuration.file=./conf/config.yml
+nifi.administrative.yield.duration=30 sec
+nifi.bored.yield.duration=100 millis
+nifi.extension.path=../extensions/*
+nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository
+nifi.provenance.repository.max.storage.time=1 MIN
+nifi.provenance.repository.max.storage.size=1 MB
+nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
+nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
+nifi.content.repository.class.name=DatabaseContentRepository
+nifi.framework.dir=${MINIFI_HOME}/minifi-jni/lib
+nifi.nar.directory=${MINIFI_HOME}/minifi-jni/nars
+nifi.nar.deploy.directory=${MINIFI_HOME}/minifi-jni/nardeploy
+nifi.nar.docs.directory=${MINIFI_HOME}/minifi-jni/nardocs
+nifi.jvm.options=-Xmx1G
+nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/
+nifi.flow.engine.threads=5
diff --git a/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties b/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties
deleted file mode 100644
index 4b3763f7e..000000000
--- a/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-nifi.flow.configuration.file=./conf/config.yml
-nifi.administrative.yield.duration=30 sec
-nifi.bored.yield.duration=10 millis
-nifi.extension.path=../extensions/*
-nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository
-nifi.provenance.repository.max.storage.time=1 MIN
-nifi.provenance.repository.max.storage.size=1 MB
-nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
-nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
-nifi.remote.input.secure=false
-nifi.security.need.ClientAuth=false
-nifi.security.client.certificate=/tmp/shared/identity.pem
-nifi.security.client.private.key=/tmp/shared/identity.pem
-nifi.security.client.ca.certificate=/tmp/shared/nifi-cert.pem
-nifi.c2.enable=true
-nifi.c2.agent.class=minifi-cpp-latest
-nifi.c2.agent.listen=false
-nifi.c2.agent.heartbeat.period=30 sec
-nifi.c2.agent.heartbeat.reporter.classes=RESTReceiver
-nifi.c2.agent.protocol.class=RESTSender
-nifi.c2.full.heartbeat=false
-nifi.c2.rest.url=http://local-cem-efm:10091/efm/api/c2-protocol/heartbeat
-nifi.c2.rest.url.ack=http://local-cem-efm:10091/efm/api/c2-protocol/acknowledge
-nifi.c2.rest.listener.port=8765
-nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,RepositoryMetrics
-nifi.c2.root.class.definitions.DeviceInfo.name=deviceinfo
-nifi.c2.root.class.definitions.DeviceInfo.classes=DeviceInfoNode
-nifi.flow.metrics.class.definitions=15
-nifi.flow.engine.threads=10
diff --git a/docker/test/integration/minifi/core/SSL_cert_utils.py b/docker/test/integration/ssl_utils/SSL_cert_utils.py
similarity index 100%
rename from docker/test/integration/minifi/core/SSL_cert_utils.py
rename to docker/test/integration/ssl_utils/SSL_cert_utils.py
diff --git a/docker/test/integration/ssl_utils/__init__.py b/docker/test/integration/ssl_utils/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 52b80bcb0..ecc4b7661 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -14,9 +14,9 @@
 # limitations under the License.
 
 
-from minifi.core.FileSystemObserver import FileSystemObserver
+from filesystem_validation.FileSystemObserver import FileSystemObserver
 from minifi.core.RemoteProcessGroup import RemoteProcessGroup
-from minifi.core.SSL_cert_utils import make_ca, make_cert, dump_certificate, dump_privatekey
+from ssl_utils.SSL_cert_utils import make_ca, make_cert, dump_certificate, dump_privatekey
 from minifi.core.Funnel import Funnel
 
 from minifi.controllers.SSLContextService import SSLContextService
@@ -327,6 +327,26 @@ def step_impl(context, flow_name):
     context.test.acquire_container(flow_name, 'nifi')
 
 
+@given("a transient MiNiFi flow with the name \"{flow_name}\" is set up")
+def step_impl(context, flow_name):
+    context.test.acquire_container(flow_name, 'minifi-cpp', ["/bin/sh", "-c", "./bin/minifi.sh start && sleep 10 && ./bin/minifi.sh stop && sleep 100"])
+
+
+@given("the provenance repository is enabled in MiNiFi")
+def step_impl(context):
+    context.test.enable_provenance_repository_in_minifi()
+
+
+@given("C2 is enabled in MiNiFi")
+def step_impl(context):
+    context.test.enable_c2_in_minifi()
+
+
+@given("Prometheus is enabled in MiNiFi")
+def step_impl(context):
+    context.test.enable_prometheus_in_minifi()
+
+
 # HTTP proxy setup
 @given("the http proxy server is set up")
 @given("a http proxy server is set up accordingly")
@@ -518,7 +538,7 @@ def step_impl(context):
     query_splunk_indexing_status = context.test.get_node_by_name("QuerySplunkIndexingStatus")
     query_splunk_indexing_status.controller_services.append(ssl_context_service)
     query_splunk_indexing_status.set_property("SSL Context Service", ssl_context_service.name)
-    context.test.cluster.enable_splunk_hec_ssl('splunk', dump_certificate(splunk_cert), dump_privatekey(splunk_key), dump_certificate(root_ca_cert))
+    context.test.enable_splunk_hec_ssl('splunk', dump_certificate(splunk_cert), dump_privatekey(splunk_key), dump_certificate(root_ca_cert))
 
 
 @given(u'the {processor_one} processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server')
@@ -570,6 +590,7 @@ def step_impl(context, processor_name, service_name):
 
 @given("a PostgreSQL server is set up")
 def step_impl(context):
+    context.test.enable_sql_in_minifi()
     context.test.acquire_container("postgresql-server", "postgresql-server")
 
 
@@ -972,8 +993,9 @@ def step_impl(context, doc_id, index, value, field):
 def step_impl(context):
     ssl_context_service = SSLContextService(cert="/tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.crt", ca_cert="/tmp/resources/minifi-c2-server-ssl/root-ca.pem", key="/tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.key", passphrase="abcdefgh")
     ssl_context_service.name = "SSLContextService"
-    container = context.test.acquire_container("minifi-cpp-flow", "minifi-cpp-with-https-c2-config")
+    container = context.test.acquire_container("minifi-cpp-flow")
     container.add_controller(ssl_context_service)
+    context.test.enable_c2_with_ssl_in_minifi()
 
 
 @given(u'a MiNiFi C2 server is set up')
diff --git a/docker/test/integration/minifi/core/utils.py b/docker/test/integration/utils.py
similarity index 88%
rename from docker/test/integration/minifi/core/utils.py
rename to docker/test/integration/utils.py
index c0a5d34c1..07e388431 100644
--- a/docker/test/integration/minifi/core/utils.py
+++ b/docker/test/integration/utils.py
@@ -81,3 +81,15 @@ def get_memory_usage(pid: int) -> Optional[int]:
                 resident_set_size = [int(s) for s in line.split() if s.isdigit()].pop()
                 return resident_set_size * 1024
     return None
+
+
+def wait_for(action, timeout_seconds, check_period=1, *args, **kwargs):
+    start_time = time.perf_counter()
+    while True:
+        result = action(*args, **kwargs)
+        if result:
+            return result
+        time.sleep(check_period)
+        if timeout_seconds < (time.perf_counter() - start_time):
+            break
+    return False