You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 15:21:02 UTC

[nifi-minifi-cpp] 03/04: MINIFICPP-1765 Instantiate kind cluster only once in a test run

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 740fcc7739643fb50e74e19a8d0255224367b938
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 17:17:49 2022 +0200

    MINIFICPP-1765 Instantiate kind cluster only once in a test run
    
    Closes #1322
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../integration/MiNiFi_integration_test_driver.py  | 15 +++------
 docker/test/integration/environment.py             | 26 ++++++++++++++-
 .../test/integration/features/kubernetes.feature   |  1 +
 .../integration/minifi/core/DockerTestCluster.py   |  4 +--
 .../minifi/core/DockerTestDirectoryBindings.py     | 37 ++++++++++++++--------
 .../core/{KindProxy.py => KubernetesProxy.py}      | 29 ++++++++++++-----
 .../minifi/core/MinifiAsPodInKubernetesCluster.py  | 19 ++++-------
 .../integration/minifi/core/OutputEventHandler.py  |  3 --
 .../minifi/core/SingleNodeDockerCluster.py         |  7 ++--
 docker/test/integration/steps/steps.py             |  4 ++-
 10 files changed, 92 insertions(+), 53 deletions(-)

diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 363ade89b..f450bf751 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -23,7 +23,6 @@ from pydoc import locate
 from minifi.core.InputPort import InputPort
 
 from minifi.core.DockerTestCluster import DockerTestCluster
-from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
 
 from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
 from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
@@ -37,18 +36,17 @@ from minifi.validators.SingleJSONFileOutputValidator import SingleJSONFileOutput
 from minifi.core.utils import decode_escaped_str
 
 
-class MiNiFi_integration_test():
-    def __init__(self, image_store):
-        self.test_id = str(uuid.uuid4())
-        self.cluster = DockerTestCluster(image_store)
+class MiNiFi_integration_test:
+    def __init__(self, context):
+        self.test_id = context.test_id
+        self.cluster = DockerTestCluster(context)
 
         self.connectable_nodes = []
         # Remote process groups are not connectables
         self.remote_process_groups = []
         self.file_system_observer = None
 
-        self.docker_directory_bindings = DockerTestDirectoryBindings()
-        self.docker_directory_bindings.create_new_data_directories(self.test_id)
+        self.docker_directory_bindings = context.directory_bindings
         self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id), self.docker_directory_bindings.get_data_directories(self.test_id))
 
     def __del__(self):
@@ -57,9 +55,6 @@ class MiNiFi_integration_test():
     def cleanup(self):
         self.cluster.cleanup()
 
-    def docker_path_to_local_path(self, docker_path):
-        return self.docker_directory_bindings.docker_path_to_local_path(self.test_id, docker_path)
-
     def acquire_container(self, name, engine='minifi-cpp', command=None):
         return self.cluster.acquire_container(name, engine, command)
 
diff --git a/docker/test/integration/environment.py b/docker/test/integration/environment.py
index a53b86aab..ce1b5fd5a 100644
--- a/docker/test/integration/environment.py
+++ b/docker/test/integration/environment.py
@@ -17,11 +17,15 @@
 import logging
 import datetime
 import sys
+import uuid
+import os
 sys.path.append('../minifi')
 
 from MiNiFi_integration_test_driver import MiNiFi_integration_test  # noqa: E402
 from minifi import *  # noqa
 from minifi.core.ImageStore import ImageStore # noqa
+from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings # noqa
+from minifi.core.KubernetesProxy import KubernetesProxy # noqa
 
 
 def before_scenario(context, scenario):
@@ -30,7 +34,7 @@ def before_scenario(context, scenario):
         return
 
     logging.info("Integration test setup at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
-    context.test = MiNiFi_integration_test(context.image_store)
+    context.test = MiNiFi_integration_test(context)
 
 
 def after_scenario(context, scenario):
@@ -40,8 +44,28 @@ def after_scenario(context, scenario):
 
     logging.info("Integration test teardown at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
     context.test.cleanup()
+    context.directory_bindings.cleanup_io()
+    if context.kubernetes_proxy:
+        context.kubernetes_proxy.delete_pods()
 
 
 def before_all(context):
+    context.test_id = str(uuid.uuid4())
     context.config.setup_logging()
     context.image_store = ImageStore()
+    context.directory_bindings = DockerTestDirectoryBindings(context.test_id)
+    context.directory_bindings.create_new_data_directories()
+    context.kubernetes_proxy = None
+
+
+def before_tag(context, tag):
+    if tag == "requires.kubernetes.cluster":
+        context.kubernetes_proxy = KubernetesProxy(context.directory_bindings.get_data_directories(context.test_id)["kubernetes_temp_dir"], os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc'))
+        context.kubernetes_proxy.create_config(context.directory_bindings.get_directory_bindings(context.test_id))
+        context.kubernetes_proxy.start_cluster()
+
+
+def after_tag(context, tag):
+    if tag == "requires.kubernetes.cluster" and context.kubernetes_proxy:
+        context.kubernetes_proxy.cleanup()
+        context.kubernetes_proxy = None
diff --git a/docker/test/integration/features/kubernetes.feature b/docker/test/integration/features/kubernetes.feature
index 5d14b1982..5a24755e3 100644
--- a/docker/test/integration/features/kubernetes.feature
+++ b/docker/test/integration/features/kubernetes.feature
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+@requires.kubernetes.cluster
 Feature: TailFile can collect logs from Kubernetes pods
 
   Background:
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index c29e0baa5..7158b7a45 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -36,8 +36,8 @@ class DockerTestCluster(SingleNodeDockerCluster):
         ("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
          "BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;")
 
-    def __init__(self, image_store):
-        super(DockerTestCluster, self).__init__(image_store)
+    def __init__(self, context):
+        super(DockerTestCluster, self).__init__(context)
         self.segfault = False
 
     @staticmethod
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index 91a13bb2a..3781138a9 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -20,29 +20,40 @@ import shutil
 
 
 class DockerTestDirectoryBindings:
-    def __init__(self):
+    def __init__(self, test_id):
         self.data_directories = {}
+        self.test_id = test_id
 
     def __del__(self):
         self.delete_data_directories()
 
-    def create_new_data_directories(self, test_id):
-        self.data_directories[test_id] = {
-            "input_dir": "/tmp/.nifi-test-input." + test_id,
-            "output_dir": "/tmp/.nifi-test-output." + test_id,
-            "resources_dir": "/tmp/.nifi-test-resources." + test_id,
-            "minifi_config_dir": "/tmp/.nifi-test-minifi-config-dir." + test_id,
-            "nifi_config_dir": "/tmp/.nifi-test-nifi-config-dir." + test_id
+    def cleanup_io(self):
+        for folder in [self.data_directories[self.test_id]["input_dir"], self.data_directories[self.test_id]["output_dir"]]:
+            for filename in os.listdir(folder):
+                file_path = os.path.join(folder, filename)
+                if os.path.isfile(file_path) or os.path.islink(file_path):
+                    os.unlink(file_path)
+                elif os.path.isdir(file_path):
+                    shutil.rmtree(file_path)
+
+    def create_new_data_directories(self):
+        self.data_directories[self.test_id] = {
+            "input_dir": "/tmp/.nifi-test-input." + self.test_id,
+            "output_dir": "/tmp/.nifi-test-output." + self.test_id,
+            "resources_dir": "/tmp/.nifi-test-resources." + self.test_id,
+            "minifi_config_dir": "/tmp/.nifi-test-minifi-config-dir." + self.test_id,
+            "nifi_config_dir": "/tmp/.nifi-test-nifi-config-dir." + self.test_id,
+            "kubernetes_temp_dir": "/tmp/.nifi-test-kubernetes-temp-dir." + self.test_id
         }
 
-        [self.create_directory(directory) for directory in self.data_directories[test_id].values()]
+        [self.create_directory(directory) for directory in self.data_directories[self.test_id].values()]
 
         # Add resources
         test_dir = os.environ['TEST_DIRECTORY']  # Based on DockerVerify.sh
-        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
-        shutil.copytree(test_dir + "/resources/python", self.data_directories[test_id]["resources_dir"] + "/python")
-        shutil.copytree(test_dir + "/resources/opcua", self.data_directories[test_id]["resources_dir"] + "/opcua")
-        shutil.copytree(test_dir + "/resources/lua", self.data_directories[test_id]["resources_dir"] + "/lua")
+        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[self.test_id]["resources_dir"] + "/certs")
+        shutil.copytree(test_dir + "/resources/python", self.data_directories[self.test_id]["resources_dir"] + "/python")
+        shutil.copytree(test_dir + "/resources/opcua", self.data_directories[self.test_id]["resources_dir"] + "/opcua")
+        shutil.copytree(test_dir + "/resources/lua", self.data_directories[self.test_id]["resources_dir"] + "/lua")
 
     def get_data_directories(self, test_id):
         return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/core/KindProxy.py b/docker/test/integration/minifi/core/KubernetesProxy.py
similarity index 81%
rename from docker/test/integration/minifi/core/KindProxy.py
rename to docker/test/integration/minifi/core/KubernetesProxy.py
index b8da30bd5..0af95fc81 100644
--- a/docker/test/integration/minifi/core/KindProxy.py
+++ b/docker/test/integration/minifi/core/KubernetesProxy.py
@@ -23,7 +23,7 @@ import time
 from textwrap import dedent
 
 
-class KindProxy:
+class KubernetesProxy:
     def __init__(self, temp_directory, resources_directory):
         self.temp_directory = temp_directory
         self.resources_directory = resources_directory
@@ -33,6 +33,12 @@ class KindProxy:
         self.__download_kind()
         self.docker_client = docker.from_env()
 
+    def __del__(self):
+        self.cleanup()
+
+    def cleanup(self):
+        subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
+
     def __download_kind(self):
         if subprocess.run(['curl', '-Lo', self.kind_binary_path, 'https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64']).returncode != 0:
             raise Exception("Could not download kind")
@@ -78,9 +84,12 @@ class KindProxy:
         self.__create_objects_of_type(self.resources_directory, 'clusterrole')
         self.__create_objects_of_type(self.resources_directory, 'clusterrolebinding')
 
+    def delete_pods(self):
+        self.__delete_objects_of_type('pod')
+
     def __wait_for_default_service_account(self, namespace):
         for _ in range(120):
-            (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
+            (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
             if code == 0:
                 return
             time.sleep(1)
@@ -93,7 +102,7 @@ class KindProxy:
             file_name_in_container = os.path.join('/var/tmp', file_name)
             self.__copy_file_to_container(full_file_name, 'kind-control-plane', file_name_in_container)
 
-            (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
+            (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
             if code != 0:
                 raise Exception("Could not create kubernetes object from file '%s'" % full_file_name)
 
@@ -101,6 +110,15 @@ class KindProxy:
             found_objects.append(object_name)
         return found_objects
 
+    def __delete_objects_of_type(self, type):
+        for full_file_name in glob.iglob(os.path.join(self.resources_directory, f'*.{type}.yml')):
+            file_name = os.path.basename(full_file_name)
+            file_name_in_container = os.path.join('/var/tmp', file_name)
+
+            (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'delete', '-f', file_name_in_container, '--grace-period=0', '--force'])
+            if code != 0:
+                raise Exception("Could not delete kubernetes object from file '%s'" % file_name_in_container)
+
     def __copy_file_to_container(self, host_file, container_name, container_file):
         if subprocess.run(['docker', 'cp', host_file, container_name + ':' + container_file]).returncode != 0:
             raise Exception("Could not copy file '%s' into container '%s' as '%s'" % (host_file, container_name, container_file))
@@ -111,8 +129,3 @@ class KindProxy:
             return output
         else:
             return None
-
-    def cleanup(self):
-        # cleanup gets called multiple times, also after the temp directories had been removed
-        if os.path.exists(self.kind_binary_path):
-            subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
diff --git a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
index 81ba5b02e..e9c821367 100644
--- a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
+++ b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
@@ -19,7 +19,6 @@ import logging
 import os
 import shutil
 
-from .KindProxy import KindProxy
 from .LogSource import LogSource
 from .MinifiContainer import MinifiContainer
 
@@ -28,11 +27,9 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
     MINIFI_IMAGE_NAME = 'apacheminificpp'
     MINIFI_IMAGE_TAG = 'docker_test'
 
-    def __init__(self, config_dir, name, vols, network, image_store, command=None):
+    def __init__(self, kubernetes_proxy, config_dir, name, vols, network, image_store, command=None):
         super().__init__(config_dir, name, vols, network, image_store, command)
-
-        resources_directory = os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc')
-        self.kind = KindProxy(self.config_dir, resources_directory)
+        self.kubernetes_proxy = kubernetes_proxy
 
         test_dir = os.environ['TEST_DIRECTORY']
         shutil.copy(os.path.join(test_dir, os.pardir, os.pardir, os.pardir, 'conf', 'minifi.properties'), self.config_dir)
@@ -50,10 +47,8 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
 
         self._create_config()
 
-        self.kind.create_config(self.vols)
-        self.kind.start_cluster()
-        self.kind.load_docker_image(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
-        self.kind.create_objects()
+        self.kubernetes_proxy.load_docker_image(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
+        self.kubernetes_proxy.create_objects()
 
         logging.info('Finished setting up container: %s', self.name)
 
@@ -61,8 +56,8 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
         return LogSource.FROM_GET_APP_LOG_METHOD
 
     def get_app_log(self):
-        return 'OK', self.kind.get_logs('daemon', 'log-collector')
+        return 'OK', self.kubernetes_proxy.get_logs('daemon', 'log-collector')
 
     def cleanup(self):
-        logging.info('Cleaning up container: %s', self.name)
-        self.kind.cleanup()
+        # cleanup is done through the kubernetes cluster in the environment.py
+        pass
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/minifi/core/OutputEventHandler.py
index 735295fd8..f4d570298 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -68,6 +68,3 @@ class OutputEventHandler(FileSystemEventHandler):
 
             if file_count_modified:
                 self.done_event.set()
-
-    def on_deleted(self, event):
-        logging.info("Output file deleted: " + event.src_path)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 50dc9f3eb..b622685b0 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -44,12 +44,13 @@ class SingleNodeDockerCluster(Cluster):
     testing or use-cases which do not span multiple compute nodes.
     """
 
-    def __init__(self, image_store):
+    def __init__(self, context):
         self.vols = {}
         self.network = self.create_docker_network()
         self.containers = {}
-        self.image_store = image_store
+        self.image_store = context.image_store
         self.data_directories = {}
+        self.kubernetes_proxy = context.kubernetes_proxy
 
         # Get docker client
         self.client = docker.from_env()
@@ -91,7 +92,7 @@ class SingleNodeDockerCluster(Cluster):
         elif engine == 'minifi-cpp':
             return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'kubernetes':
-            return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+            return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.kubernetes_proxy, self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'transient-minifi':
             return self.containers.setdefault(name, TransientMinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'minifi-cpp-with-provenance-repo':
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 8a64acadf..f588c8cc7 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -44,7 +44,7 @@ import os
 # Background
 @given("the content of \"{directory}\" is monitored")
 def step_impl(context, directory):
-    context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
+    context.test.add_file_system_observer(FileSystemObserver(context.directory_bindings.docker_path_to_local_path(context.test_id, directory)))
 
 
 def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'):
@@ -394,12 +394,14 @@ def step_impl(context):
 def step_impl(context):
     context.test.acquire_container("azure-storage-server", "azure-storage-server")
 
+
 # syslog client
 @given(u'a Syslog client with {protocol} protocol is setup to send logs to minifi')
 def step_impl(context, protocol):
     client_name = "syslog-" + protocol.lower() + "-client"
     context.test.acquire_container(client_name, client_name)
 
+
 # google cloud storage setup
 @given("a Google Cloud storage server is set up with some test data")
 @given("a Google Cloud storage server is set up")