You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 15:21:02 UTC
[nifi-minifi-cpp] 03/04: MINIFICPP-1765 Instantiate kind cluster only once in a test run
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 740fcc7739643fb50e74e19a8d0255224367b938
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 17:17:49 2022 +0200
MINIFICPP-1765 Instantiate kind cluster only once in a test run
Closes #1322
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.../integration/MiNiFi_integration_test_driver.py | 15 +++------
docker/test/integration/environment.py | 26 ++++++++++++++-
.../test/integration/features/kubernetes.feature | 1 +
.../integration/minifi/core/DockerTestCluster.py | 4 +--
.../minifi/core/DockerTestDirectoryBindings.py | 37 ++++++++++++++--------
.../core/{KindProxy.py => KubernetesProxy.py} | 29 ++++++++++++-----
.../minifi/core/MinifiAsPodInKubernetesCluster.py | 19 ++++-------
.../integration/minifi/core/OutputEventHandler.py | 3 --
.../minifi/core/SingleNodeDockerCluster.py | 7 ++--
docker/test/integration/steps/steps.py | 4 ++-
10 files changed, 92 insertions(+), 53 deletions(-)
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 363ade89b..f450bf751 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -23,7 +23,6 @@ from pydoc import locate
from minifi.core.InputPort import InputPort
from minifi.core.DockerTestCluster import DockerTestCluster
-from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
@@ -37,18 +36,17 @@ from minifi.validators.SingleJSONFileOutputValidator import SingleJSONFileOutput
from minifi.core.utils import decode_escaped_str
-class MiNiFi_integration_test():
- def __init__(self, image_store):
- self.test_id = str(uuid.uuid4())
- self.cluster = DockerTestCluster(image_store)
+class MiNiFi_integration_test:
+ def __init__(self, context):
+ self.test_id = context.test_id
+ self.cluster = DockerTestCluster(context)
self.connectable_nodes = []
# Remote process groups are not connectables
self.remote_process_groups = []
self.file_system_observer = None
- self.docker_directory_bindings = DockerTestDirectoryBindings()
- self.docker_directory_bindings.create_new_data_directories(self.test_id)
+ self.docker_directory_bindings = context.directory_bindings
self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id), self.docker_directory_bindings.get_data_directories(self.test_id))
def __del__(self):
@@ -57,9 +55,6 @@ class MiNiFi_integration_test():
def cleanup(self):
self.cluster.cleanup()
- def docker_path_to_local_path(self, docker_path):
- return self.docker_directory_bindings.docker_path_to_local_path(self.test_id, docker_path)
-
def acquire_container(self, name, engine='minifi-cpp', command=None):
return self.cluster.acquire_container(name, engine, command)
diff --git a/docker/test/integration/environment.py b/docker/test/integration/environment.py
index a53b86aab..ce1b5fd5a 100644
--- a/docker/test/integration/environment.py
+++ b/docker/test/integration/environment.py
@@ -17,11 +17,15 @@
import logging
import datetime
import sys
+import uuid
+import os
sys.path.append('../minifi')
from MiNiFi_integration_test_driver import MiNiFi_integration_test # noqa: E402
from minifi import * # noqa
from minifi.core.ImageStore import ImageStore # noqa
+from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings # noqa
+from minifi.core.KubernetesProxy import KubernetesProxy # noqa
def before_scenario(context, scenario):
@@ -30,7 +34,7 @@ def before_scenario(context, scenario):
return
logging.info("Integration test setup at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
- context.test = MiNiFi_integration_test(context.image_store)
+ context.test = MiNiFi_integration_test(context)
def after_scenario(context, scenario):
@@ -40,8 +44,28 @@ def after_scenario(context, scenario):
logging.info("Integration test teardown at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
context.test.cleanup()
+ context.directory_bindings.cleanup_io()
+ if context.kubernetes_proxy:
+ context.kubernetes_proxy.delete_pods()
def before_all(context):
+ context.test_id = str(uuid.uuid4())
context.config.setup_logging()
context.image_store = ImageStore()
+ context.directory_bindings = DockerTestDirectoryBindings(context.test_id)
+ context.directory_bindings.create_new_data_directories()
+ context.kubernetes_proxy = None
+
+
+def before_tag(context, tag):
+ if tag == "requires.kubernetes.cluster":
+ context.kubernetes_proxy = KubernetesProxy(context.directory_bindings.get_data_directories(context.test_id)["kubernetes_temp_dir"], os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc'))
+ context.kubernetes_proxy.create_config(context.directory_bindings.get_directory_bindings(context.test_id))
+ context.kubernetes_proxy.start_cluster()
+
+
+def after_tag(context, tag):
+ if tag == "requires.kubernetes.cluster" and context.kubernetes_proxy:
+ context.kubernetes_proxy.cleanup()
+ context.kubernetes_proxy = None
diff --git a/docker/test/integration/features/kubernetes.feature b/docker/test/integration/features/kubernetes.feature
index 5d14b1982..5a24755e3 100644
--- a/docker/test/integration/features/kubernetes.feature
+++ b/docker/test/integration/features/kubernetes.feature
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+@requires.kubernetes.cluster
Feature: TailFile can collect logs from Kubernetes pods
Background:
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index c29e0baa5..7158b7a45 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -36,8 +36,8 @@ class DockerTestCluster(SingleNodeDockerCluster):
("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
"BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;")
- def __init__(self, image_store):
- super(DockerTestCluster, self).__init__(image_store)
+ def __init__(self, context):
+ super(DockerTestCluster, self).__init__(context)
self.segfault = False
@staticmethod
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index 91a13bb2a..3781138a9 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -20,29 +20,40 @@ import shutil
class DockerTestDirectoryBindings:
- def __init__(self):
+ def __init__(self, test_id):
self.data_directories = {}
+ self.test_id = test_id
def __del__(self):
self.delete_data_directories()
- def create_new_data_directories(self, test_id):
- self.data_directories[test_id] = {
- "input_dir": "/tmp/.nifi-test-input." + test_id,
- "output_dir": "/tmp/.nifi-test-output." + test_id,
- "resources_dir": "/tmp/.nifi-test-resources." + test_id,
- "minifi_config_dir": "/tmp/.nifi-test-minifi-config-dir." + test_id,
- "nifi_config_dir": "/tmp/.nifi-test-nifi-config-dir." + test_id
+ def cleanup_io(self):
+ for folder in [self.data_directories[self.test_id]["input_dir"], self.data_directories[self.test_id]["output_dir"]]:
+ for filename in os.listdir(folder):
+ file_path = os.path.join(folder, filename)
+ if os.path.isfile(file_path) or os.path.islink(file_path):
+ os.unlink(file_path)
+ elif os.path.isdir(file_path):
+ shutil.rmtree(file_path)
+
+ def create_new_data_directories(self):
+ self.data_directories[self.test_id] = {
+ "input_dir": "/tmp/.nifi-test-input." + self.test_id,
+ "output_dir": "/tmp/.nifi-test-output." + self.test_id,
+ "resources_dir": "/tmp/.nifi-test-resources." + self.test_id,
+ "minifi_config_dir": "/tmp/.nifi-test-minifi-config-dir." + self.test_id,
+ "nifi_config_dir": "/tmp/.nifi-test-nifi-config-dir." + self.test_id,
+ "kubernetes_temp_dir": "/tmp/.nifi-test-kubernetes-temp-dir." + self.test_id
}
- [self.create_directory(directory) for directory in self.data_directories[test_id].values()]
+ [self.create_directory(directory) for directory in self.data_directories[self.test_id].values()]
# Add resources
test_dir = os.environ['TEST_DIRECTORY'] # Based on DockerVerify.sh
- shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
- shutil.copytree(test_dir + "/resources/python", self.data_directories[test_id]["resources_dir"] + "/python")
- shutil.copytree(test_dir + "/resources/opcua", self.data_directories[test_id]["resources_dir"] + "/opcua")
- shutil.copytree(test_dir + "/resources/lua", self.data_directories[test_id]["resources_dir"] + "/lua")
+ shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[self.test_id]["resources_dir"] + "/certs")
+ shutil.copytree(test_dir + "/resources/python", self.data_directories[self.test_id]["resources_dir"] + "/python")
+ shutil.copytree(test_dir + "/resources/opcua", self.data_directories[self.test_id]["resources_dir"] + "/opcua")
+ shutil.copytree(test_dir + "/resources/lua", self.data_directories[self.test_id]["resources_dir"] + "/lua")
def get_data_directories(self, test_id):
return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/core/KindProxy.py b/docker/test/integration/minifi/core/KubernetesProxy.py
similarity index 81%
rename from docker/test/integration/minifi/core/KindProxy.py
rename to docker/test/integration/minifi/core/KubernetesProxy.py
index b8da30bd5..0af95fc81 100644
--- a/docker/test/integration/minifi/core/KindProxy.py
+++ b/docker/test/integration/minifi/core/KubernetesProxy.py
@@ -23,7 +23,7 @@ import time
from textwrap import dedent
-class KindProxy:
+class KubernetesProxy:
def __init__(self, temp_directory, resources_directory):
self.temp_directory = temp_directory
self.resources_directory = resources_directory
@@ -33,6 +33,12 @@ class KindProxy:
self.__download_kind()
self.docker_client = docker.from_env()
+ def __del__(self):
+ self.cleanup()
+
+ def cleanup(self):
+ subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
+
def __download_kind(self):
if subprocess.run(['curl', '-Lo', self.kind_binary_path, 'https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64']).returncode != 0:
raise Exception("Could not download kind")
@@ -78,9 +84,12 @@ class KindProxy:
self.__create_objects_of_type(self.resources_directory, 'clusterrole')
self.__create_objects_of_type(self.resources_directory, 'clusterrolebinding')
+ def delete_pods(self):
+ self.__delete_objects_of_type('pod')
+
def __wait_for_default_service_account(self, namespace):
for _ in range(120):
- (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
+ (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
if code == 0:
return
time.sleep(1)
@@ -93,7 +102,7 @@ class KindProxy:
file_name_in_container = os.path.join('/var/tmp', file_name)
self.__copy_file_to_container(full_file_name, 'kind-control-plane', file_name_in_container)
- (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
+ (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
if code != 0:
raise Exception("Could not create kubernetes object from file '%s'" % full_file_name)
@@ -101,6 +110,15 @@ class KindProxy:
found_objects.append(object_name)
return found_objects
+ def __delete_objects_of_type(self, type):
+ for full_file_name in glob.iglob(os.path.join(self.resources_directory, f'*.{type}.yml')):
+ file_name = os.path.basename(full_file_name)
+ file_name_in_container = os.path.join('/var/tmp', file_name)
+
+ (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'delete', '-f', file_name_in_container, '--grace-period=0', '--force'])
+ if code != 0:
+ raise Exception("Could not delete kubernetes object from file '%s'" % file_name_in_container)
+
def __copy_file_to_container(self, host_file, container_name, container_file):
if subprocess.run(['docker', 'cp', host_file, container_name + ':' + container_file]).returncode != 0:
raise Exception("Could not copy file '%s' into container '%s' as '%s'" % (host_file, container_name, container_file))
@@ -111,8 +129,3 @@ class KindProxy:
return output
else:
return None
-
- def cleanup(self):
- # cleanup gets called multiple times, also after the temp directories had been removed
- if os.path.exists(self.kind_binary_path):
- subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
diff --git a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
index 81ba5b02e..e9c821367 100644
--- a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
+++ b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
@@ -19,7 +19,6 @@ import logging
import os
import shutil
-from .KindProxy import KindProxy
from .LogSource import LogSource
from .MinifiContainer import MinifiContainer
@@ -28,11 +27,9 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
MINIFI_IMAGE_NAME = 'apacheminificpp'
MINIFI_IMAGE_TAG = 'docker_test'
- def __init__(self, config_dir, name, vols, network, image_store, command=None):
+ def __init__(self, kubernetes_proxy, config_dir, name, vols, network, image_store, command=None):
super().__init__(config_dir, name, vols, network, image_store, command)
-
- resources_directory = os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc')
- self.kind = KindProxy(self.config_dir, resources_directory)
+ self.kubernetes_proxy = kubernetes_proxy
test_dir = os.environ['TEST_DIRECTORY']
shutil.copy(os.path.join(test_dir, os.pardir, os.pardir, os.pardir, 'conf', 'minifi.properties'), self.config_dir)
@@ -50,10 +47,8 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
self._create_config()
- self.kind.create_config(self.vols)
- self.kind.start_cluster()
- self.kind.load_docker_image(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
- self.kind.create_objects()
+ self.kubernetes_proxy.load_docker_image(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
+ self.kubernetes_proxy.create_objects()
logging.info('Finished setting up container: %s', self.name)
@@ -61,8 +56,8 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
return LogSource.FROM_GET_APP_LOG_METHOD
def get_app_log(self):
- return 'OK', self.kind.get_logs('daemon', 'log-collector')
+ return 'OK', self.kubernetes_proxy.get_logs('daemon', 'log-collector')
def cleanup(self):
- logging.info('Cleaning up container: %s', self.name)
- self.kind.cleanup()
+ # cleanup is done through the kubernetes cluster in the environment.py
+ pass
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/minifi/core/OutputEventHandler.py
index 735295fd8..f4d570298 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -68,6 +68,3 @@ class OutputEventHandler(FileSystemEventHandler):
if file_count_modified:
self.done_event.set()
-
- def on_deleted(self, event):
- logging.info("Output file deleted: " + event.src_path)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 50dc9f3eb..b622685b0 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -44,12 +44,13 @@ class SingleNodeDockerCluster(Cluster):
testing or use-cases which do not span multiple compute nodes.
"""
- def __init__(self, image_store):
+ def __init__(self, context):
self.vols = {}
self.network = self.create_docker_network()
self.containers = {}
- self.image_store = image_store
+ self.image_store = context.image_store
self.data_directories = {}
+ self.kubernetes_proxy = context.kubernetes_proxy
# Get docker client
self.client = docker.from_env()
@@ -91,7 +92,7 @@ class SingleNodeDockerCluster(Cluster):
elif engine == 'minifi-cpp':
return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
elif engine == 'kubernetes':
- return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+ return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.kubernetes_proxy, self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
elif engine == 'transient-minifi':
return self.containers.setdefault(name, TransientMinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
elif engine == 'minifi-cpp-with-provenance-repo':
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 8a64acadf..f588c8cc7 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -44,7 +44,7 @@ import os
# Background
@given("the content of \"{directory}\" is monitored")
def step_impl(context, directory):
- context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
+ context.test.add_file_system_observer(FileSystemObserver(context.directory_bindings.docker_path_to_local_path(context.test_id, directory)))
def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'):
@@ -394,12 +394,14 @@ def step_impl(context):
def step_impl(context):
context.test.acquire_container("azure-storage-server", "azure-storage-server")
+
# syslog client
@given(u'a Syslog client with {protocol} protocol is setup to send logs to minifi')
def step_impl(context, protocol):
client_name = "syslog-" + protocol.lower() + "-client"
context.test.acquire_container(client_name, client_name)
+
# google cloud storage setup
@given("a Google Cloud storage server is set up with some test data")
@given("a Google Cloud storage server is set up")