You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/03/03 15:46:28 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1734 Add integration test for Kubernetes log collection
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f811c79 MINIFICPP-1734 Add integration test for Kubernetes log collection
f811c79 is described below
commit f811c79c5b6df20ecb5eb2cbdf0f1ad2c77df328
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Thu Mar 3 16:44:55 2022 +0100
MINIFICPP-1734 Add integration test for Kubernetes log collection
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1270
---
.github/workflows/ci.yml | 2 +-
cmake/DockerConfig.cmake | 1 +
docker/Dockerfile | 3 +
.../test/integration/features/kubernetes.feature | 58 ++++++++++
.../controllers/KubernetesControllerService.py | 28 +++++
docker/test/integration/minifi/core/Container.py | 21 ++++
.../integration/minifi/core/DockerTestCluster.py | 26 +++++
docker/test/integration/minifi/core/ImageStore.py | 27 +++++
docker/test/integration/minifi/core/KindProxy.py | 124 +++++++++++++++++++++
docker/test/integration/minifi/core/LogSource.py | 19 ++++
.../minifi/core/MinifiAsPodInKubernetesCluster.py | 59 ++++++++++
.../integration/minifi/core/MinifiContainer.py | 20 +++-
.../minifi/core/SingleNodeDockerCluster.py | 19 ++++
.../kubernetes/minifi-conf/minifi-log.properties | 3 +
.../kubernetes/pods-etc/daemon.namespace.yml | 6 +
.../kubernetes/pods-etc/hello-world-one.pod.yml | 12 ++
.../kubernetes/pods-etc/hello-world-two.pod.yml | 12 ++
.../kubernetes/pods-etc/log-collector.pod.yml | 29 +++++
.../pods-etc/namespace-reader.clusterrole.yml | 9 ++
.../namespace-reader.clusterrolebinding.yml | 12 ++
.../kubernetes/pods-etc/pod-reader.clusterrole.yml | 9 ++
.../pods-etc/pod-reader.clusterrolebinding.yml | 12 ++
docker/test/integration/steps/steps.py | 105 +++++++++++------
23 files changed, 581 insertions(+), 35 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 60628c6..1c2e75e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -202,7 +202,7 @@ jobs:
if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
mkdir build
cd build
- cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+ cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
make docker
- id: install_deps
run: |
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index dd05f65..226439f 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -47,6 +47,7 @@ add_custom_target(
-c ENABLE_SPLUNK=${ENABLE_SPLUNK}
-c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
-c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
+ -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
-c DISABLE_CURL=${DISABLE_CURL}
-c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
-c DISABLE_CIVET=${DISABLE_CIVET}
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 9a8f2e1..3ec9856 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -62,6 +62,7 @@ ARG DISABLE_BZIP2=OFF
ARG ENABLE_SCRIPTING=OFF
ARG DISABLE_PYTHON_SCRIPTING=
ARG ENABLE_LUA_SCRIPTING=
+ARG ENABLE_KUBERNETES=OFF
ARG DISABLE_CONTROLLER=OFF
ARG CMAKE_BUILD_TYPE=Release
@@ -121,6 +122,7 @@ RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL="${ENABLE_ALL}" -DENABL
-DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
-DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
-DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
+ -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" \
-DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
make -j "$(nproc)" package && \
tar -xzvf "${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}.tar.gz" -C "${MINIFI_BASE_DIR}"
@@ -144,6 +146,7 @@ ARG ENABLE_BUSTACHE=OFF
ARG ENABLE_SCRIPTING=OFF
ARG DISABLE_PYTHON_SCRIPTING=
ARG ENABLE_LUA_SCRIPTING=
+ARG ENABLE_KUBERNETES=OFF
# Add testing repo for rocksdb
RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >> /etc/apk/repositories
diff --git a/docker/test/integration/features/kubernetes.feature b/docker/test/integration/features/kubernetes.feature
new file mode 100644
index 0000000..af7ac29
--- /dev/null
+++ b/docker/test/integration/features/kubernetes.feature
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: TailFile can collect logs from Kubernetes pods
+
+ Background:
+ Given the content of "/tmp/output" is monitored
+
+ Scenario: Collect all logs from the default namespace
+ Given a TailFile processor in a Kubernetes cluster
+ And the "tail-mode" property of the TailFile processor is set to "Multiple file"
+ And the "tail-base-directory" property of the TailFile processor is set to "/var/log/pods/${namespace}_${pod}_${uid}/${container}"
+ And the "File to Tail" property of the TailFile processor is set to ".*\.log"
+ And the "Lookup frequency" property of the TailFile processor is set to "1s"
+ And the TailFile processor has an Attribute Provider Service which is a Kubernetes Controller Service
+ And a PutFile processor in the Kubernetes cluster
+ And the "Directory" property of the PutFile processor is set to "/tmp/output"
+ And the "success" relationship of the TailFile processor is connected to the PutFile
+ When the MiNiFi instance starts up
+ Then two flowfiles with the contents "Hello World!" and "Hello again, World!" are placed in the monitored directory in less than 30 seconds
+
+ Scenario: Collect logs from selected pods
+ Given a TailFile processor in a Kubernetes cluster
+ And the "tail-mode" property of the TailFile processor is set to "Multiple file"
+ And the "tail-base-directory" property of the TailFile processor is set to "/var/log/pods/${namespace}_${pod}_${uid}/${container}"
+ And the "File to Tail" property of the TailFile processor is set to ".*\.log"
+ And the "Lookup frequency" property of the TailFile processor is set to "1s"
+ And the TailFile processor has an Attribute Provider Service which is a Kubernetes Controller Service with the "Pod Name Filter" property set to ".*one"
+ And a PutFile processor in the Kubernetes cluster
+ And the "Directory" property of the PutFile processor is set to "/tmp/output"
+ And the "success" relationship of the TailFile processor is connected to the PutFile
+ When the MiNiFi instance starts up
+ Then one flowfile with the contents "Hello World!" is placed in the monitored directory in less than 30 seconds
+
+ Scenario: Collect logs from selected containers
+ Given a TailFile processor in a Kubernetes cluster
+ And the "tail-mode" property of the TailFile processor is set to "Multiple file"
+ And the "tail-base-directory" property of the TailFile processor is set to "/var/log/pods/${namespace}_${pod}_${uid}/${container}"
+ And the "File to Tail" property of the TailFile processor is set to ".*\.log"
+ And the "Lookup frequency" property of the TailFile processor is set to "1s"
+ And the TailFile processor has an Attribute Provider Service which is a Kubernetes Controller Service with the "Container Name Filter" property set to "echo-[^o].."
+ And a PutFile processor in the Kubernetes cluster
+ And the "Directory" property of the PutFile processor is set to "/tmp/output"
+ And the "success" relationship of the TailFile processor is connected to the PutFile
+ When the MiNiFi instance starts up
+ Then one flowfile with the contents "Hello again, World!" is placed in the monitored directory in less than 30 seconds
diff --git a/docker/test/integration/minifi/controllers/KubernetesControllerService.py b/docker/test/integration/minifi/controllers/KubernetesControllerService.py
new file mode 100644
index 0000000..5b024d8
--- /dev/null
+++ b/docker/test/integration/minifi/controllers/KubernetesControllerService.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.ControllerService import ControllerService
+
+
+class KubernetesControllerService(ControllerService):
+ def __init__(self, name=None, properties=None):
+ super(KubernetesControllerService, self).__init__(name=name)
+
+ self.service_class = 'KubernetesControllerService'
+
+ if properties is not None:
+ for key, value in properties.items():
+ self.properties[key] = value
diff --git a/docker/test/integration/minifi/core/Container.py b/docker/test/integration/minifi/core/Container.py
index 1457230..700dddb 100644
--- a/docker/test/integration/minifi/core/Container.py
+++ b/docker/test/integration/minifi/core/Container.py
@@ -1,6 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
import docker
import logging
+from .LogSource import LogSource
+
class Container:
def __init__(self, name, engine, vols, network, image_store, command):
@@ -41,6 +59,9 @@ class Container:
def deploy(self):
raise NotImplementedError()
+ def log_source(self):
+ return LogSource.FROM_DOCKER_CONTAINER
+
def get_startup_finished_log_entry(self):
raise NotImplementedError()
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index ed5983a..be2a32b 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
import json
import logging
import subprocess
@@ -9,6 +25,7 @@ import tarfile
import io
import tempfile
+from .LogSource import LogSource
from .SingleNodeDockerCluster import SingleNodeDockerCluster
from .utils import retry_check
from azure.storage.blob import BlobServiceClient
@@ -34,6 +51,15 @@ class DockerTestCluster(SingleNodeDockerCluster):
return encoding
def get_app_log(self, container_name):
+ log_source = self.containers[container_name].log_source()
+ if log_source == LogSource.FROM_DOCKER_CONTAINER:
+ return self.__get_app_log_from_docker_container(container_name)
+ elif log_source == LogSource.FROM_GET_APP_LOG_METHOD:
+ return self.containers[container_name].get_app_log()
+ else:
+ raise Exception("Unexpected log source '%s'" % log_source)
+
+ def __get_app_log_from_docker_container(self, container_name):
try:
container = self.client.containers.get(container_name)
except Exception:
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index c602344..28e003c 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
from .NifiContainer import NifiContainer
from .MinifiContainer import MinifiContainer
import logging
@@ -29,6 +45,8 @@ class ImageStore:
if container_engine == "minifi-cpp":
image = self.__build_minifi_cpp_image()
+ elif container_engine == "minifi-cpp-in-kubernetes":
+ image = self.__build_simple_minifi_cpp_image_with_root()
elif container_engine == "http-proxy":
image = self.__build_http_proxy_image()
elif container_engine == "nifi":
@@ -84,6 +102,15 @@ class ImageStore:
return self.__build_image(dockerfile)
+ def __build_simple_minifi_cpp_image_with_root(self):
+ dockerfile = dedent(r"""\
+ FROM {base_image}
+ USER root
+ CMD ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml /tmp/minifi_config/minifi-log.properties ./conf/ && ./bin/minifi.sh run"]
+ """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION))
+
+ return self.__build_image(dockerfile)
+
def __build_http_proxy_image(self):
dockerfile = dedent("""FROM {base_image}
RUN apt -y update && apt install -y apache2-utils
diff --git a/docker/test/integration/minifi/core/KindProxy.py b/docker/test/integration/minifi/core/KindProxy.py
new file mode 100644
index 0000000..d304cc8
--- /dev/null
+++ b/docker/test/integration/minifi/core/KindProxy.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import docker
+import glob
+import os
+import stat
+import subprocess
+import time
+from textwrap import dedent
+
+
+class KindProxy:
+ def __init__(self, temp_directory, resources_directory, image_name, image_repository, image_tag):
+ self.temp_directory = temp_directory
+ self.resources_directory = resources_directory
+ self.image_name = image_name
+ self.image_repository = image_repository
+ self.image_tag = image_tag
+
+ self.kind_binary_path = os.path.join(self.temp_directory, 'kind')
+ self.kind_config_path = os.path.join(self.temp_directory, 'kind-config.yml')
+ self.__download_kind()
+ self.docker_client = docker.from_env()
+
+ def __download_kind(self):
+ if subprocess.run(['curl', '-Lo', self.kind_binary_path, 'https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64']).returncode != 0:
+ raise Exception("Could not download kind")
+ os.chmod(self.kind_binary_path, stat.S_IXUSR)
+
+ def create_config(self, volumes):
+ kind_config = dedent("""\
+ apiVersion: kind.x-k8s.io/v1alpha4
+ kind: Cluster
+ nodes:
+ - role: control-plane
+ """)
+
+ if volumes:
+ kind_config += " extraMounts:\n"
+
+ for host_path, container_path in volumes.items():
+ kind_config += " - hostPath: {path}\n".format(path=host_path)
+ kind_config += " containerPath: {path}\n".format(path=container_path['bind'])
+ if container_path['mode'] != 'rw':
+ kind_config += " readOnly: true\n"
+
+ with open(self.kind_config_path, 'wb') as config_file:
+ config_file.write(kind_config.encode('utf-8'))
+
+ def start_cluster(self):
+ subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
+
+ if subprocess.run([self.kind_binary_path, 'create', 'cluster', '--config=' + self.kind_config_path]).returncode != 0:
+ raise Exception("Could not start the kind cluster")
+
+ def load_docker_image(self, image_store):
+ image = image_store.get_image(self.image_name)
+ image.tag(repository=self.image_repository, tag=self.image_tag)
+
+ if subprocess.run([self.kind_binary_path, 'load', 'docker-image', self.image_repository + ':' + self.image_tag]).returncode != 0:
+ raise Exception("Could not load the %s docker image (%s:%s) into the kind cluster" % (self.image_name, self.image_repository, self.image_tag))
+
+ def create_objects(self):
+ self.__wait_for_default_service_account('default')
+ namespaces = self.__create_objects_of_type(self.resources_directory, 'namespace')
+ for namespace in namespaces:
+ self.__wait_for_default_service_account(namespace)
+
+ self.__create_objects_of_type(self.resources_directory, 'pod')
+ self.__create_objects_of_type(self.resources_directory, 'clusterrole')
+ self.__create_objects_of_type(self.resources_directory, 'clusterrolebinding')
+
+ def __wait_for_default_service_account(self, namespace):
+ for _ in range(120):
+ (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
+ if code == 0:
+ return
+ time.sleep(1)
+ raise Exception("Default service account for namespace '%s' not found" % namespace)
+
+ def __create_objects_of_type(self, directory, type):
+ found_objects = []
+ for full_file_name in glob.iglob(os.path.join(directory, f'*.{type}.yml')):
+ file_name = os.path.basename(full_file_name)
+ file_name_in_container = os.path.join('/var/tmp', file_name)
+ self.__copy_file_to_container(full_file_name, 'kind-control-plane', file_name_in_container)
+
+ (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
+ if code != 0:
+ raise Exception("Could not create kubernetes object from file '%s'" % full_file_name)
+
+ object_name = file_name.replace(f'.{type}.yml', '')
+ found_objects.append(object_name)
+ return found_objects
+
+ def __copy_file_to_container(self, host_file, container_name, container_file):
+ if subprocess.run(['docker', 'cp', host_file, container_name + ':' + container_file]).returncode != 0:
+ raise Exception("Could not copy file '%s' into container '%s' as '%s'" % (host_file, container_name, container_file))
+
+ def get_logs(self, namespace, pod_name):
+ (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'logs', pod_name])
+ if code == 0:
+ return output
+ else:
+ return None
+
+ def cleanup(self):
+ # cleanup gets called multiple times, also after the temp directories had been removed
+ if os.path.exists(self.kind_binary_path):
+ subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
diff --git a/docker/test/integration/minifi/core/LogSource.py b/docker/test/integration/minifi/core/LogSource.py
new file mode 100644
index 0000000..c5ff698
--- /dev/null
+++ b/docker/test/integration/minifi/core/LogSource.py
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class LogSource:
+ FROM_DOCKER_CONTAINER = "from docker container"
+ FROM_GET_APP_LOG_METHOD = "from get_app_log() method"
diff --git a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
new file mode 100644
index 0000000..43132b2
--- /dev/null
+++ b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import os
+import shutil
+
+from .KindProxy import KindProxy
+from .LogSource import LogSource
+from .MinifiContainer import MinifiContainer
+
+
+class MinifiAsPodInKubernetesCluster(MinifiContainer):
+ def __init__(self, config_dir, name, vols, network, image_store, command=None):
+ super().__init__(config_dir, name, vols, network, image_store, command)
+
+ test_dir = os.environ['TEST_DIRECTORY']
+ shutil.copy(os.path.join(test_dir, 'resources', 'kubernetes', 'minifi-conf', 'minifi-log.properties'), self.config_dir)
+
+ def deploy(self):
+ if not self.set_deployed():
+ return
+
+ logging.info('Setting up container: %s', self.name)
+
+ self._create_config()
+
+ resources_directory = os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc')
+
+ self.kind = KindProxy(self.config_dir, resources_directory, 'minifi-cpp-in-kubernetes', 'minifi-kubernetes-test', 'v1')
+ self.kind.create_config(self.vols)
+ self.kind.start_cluster()
+ self.kind.load_docker_image(self.image_store)
+ self.kind.create_objects()
+
+ logging.info('Finished setting up container: %s', self.name)
+
+ def log_source(self):
+ return LogSource.FROM_GET_APP_LOG_METHOD
+
+ def get_app_log(self):
+ return 'OK', self.kind.get_logs('daemon', 'log-collector')
+
+ def cleanup(self):
+ logging.info('Cleaning up container: %s', self.name)
+ self.kind.cleanup()
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
index 8b971bf..770bf75 100644
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiContainer.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
import os
import logging
from .FlowContainer import FlowContainer
@@ -19,7 +35,7 @@ class MinifiContainer(FlowContainer):
def get_log_file_path(self):
return MinifiContainer.MINIFI_ROOT + '/logs/minifi-app.log'
- def __create_config(self):
+ def _create_config(self):
serializer = Minifi_flow_yaml_serializer()
test_flow_yaml = serializer.serialize(self.start_nodes)
logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
@@ -31,7 +47,7 @@ class MinifiContainer(FlowContainer):
return
logging.info('Creating and running minifi docker container...')
- self.__create_config()
+ self._create_config()
self.client.containers.run(
self.image_store.get_image(self.get_engine()),
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index acc375c..d1cbb6b 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
import docker
import logging
import uuid
@@ -14,6 +30,7 @@ from .PostgreSQLServerContainer import PostgreSQLServerContainer
from .MqttBrokerContainer import MqttBrokerContainer
from .OPCUAServerContainer import OPCUAServerContainer
from .SplunkContainer import SplunkContainer
+from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
class SingleNodeDockerCluster(Cluster):
@@ -68,6 +85,8 @@ class SingleNodeDockerCluster(Cluster):
return self.containers.setdefault(name, NifiContainer(self.data_directories["nifi_config_dir"], name, self.vols, self.network, self.image_store, command))
elif engine == 'minifi-cpp':
return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+ elif engine == 'kubernetes':
+ return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
elif engine == 'kafka-broker':
if 'zookeeper' not in self.containers:
self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store, command))
diff --git a/docker/test/integration/resources/kubernetes/minifi-conf/minifi-log.properties b/docker/test/integration/resources/kubernetes/minifi-conf/minifi-log.properties
new file mode 100644
index 0000000..f350eb8
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/minifi-conf/minifi-log.properties
@@ -0,0 +1,3 @@
+spdlog.pattern=[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v
+appender.stdout=stdout
+logger.root=INFO,stdout
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/daemon.namespace.yml b/docker/test/integration/resources/kubernetes/pods-etc/daemon.namespace.yml
new file mode 100644
index 0000000..f628b21
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/daemon.namespace.yml
@@ -0,0 +1,6 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+ name: daemon
+ labels:
+ name: daemon
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/hello-world-one.pod.yml b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-one.pod.yml
new file mode 100644
index 0000000..22a6634
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-one.pod.yml
@@ -0,0 +1,12 @@
+apiVersion: v1
+kind: Pod
+metadata:
+ name: hello-world-one
+spec:
+ containers:
+ - name: echo-one
+ image: busybox
+ args:
+ - /bin/sh
+ - -c
+ - 'echo "Hello World!"; sleep 10000'
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/hello-world-two.pod.yml b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-two.pod.yml
new file mode 100644
index 0000000..14b0c22
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/hello-world-two.pod.yml
@@ -0,0 +1,12 @@
+apiVersion: v1
+kind: Pod
+metadata:
+ name: hello-world-two
+spec:
+ containers:
+ - name: echo-two
+ image: busybox
+ args:
+ - /bin/sh
+ - -c
+ - 'echo "Hello again, World!"; sleep 10000'
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/log-collector.pod.yml b/docker/test/integration/resources/kubernetes/pods-etc/log-collector.pod.yml
new file mode 100644
index 0000000..2ea287c
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/log-collector.pod.yml
@@ -0,0 +1,29 @@
+apiVersion: v1
+kind: Pod
+metadata:
+ namespace: daemon
+ name: log-collector
+spec:
+ containers:
+ - name: minifi
+ image: minifi-kubernetes-test:v1
+ imagePullPolicy: Never
+ volumeMounts:
+ - name: var-log-pods
+ mountPath: /var/log/pods
+ readOnly: true
+ - name: tmp-minifi-config
+ mountPath: /tmp/minifi_config
+ readOnly: true
+ - name: tmp-output
+ mountPath: /tmp/output
+ volumes:
+ - name: var-log-pods
+ hostPath:
+ path: /var/log/pods
+ - name: tmp-minifi-config
+ hostPath:
+ path: /tmp/minifi_config
+ - name: tmp-output
+ hostPath:
+ path: /tmp/output
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrole.yml b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrole.yml
new file mode 100644
index 0000000..863439a
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrole.yml
@@ -0,0 +1,9 @@
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ namespace: default
+ name: namespace-reader
+rules:
+- apiGroups: [""] # "" indicates the core API group
+ resources: ["namespaces"]
+ verbs: ["get", "watch", "list"]
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrolebinding.yml b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrolebinding.yml
new file mode 100644
index 0000000..98de6c4
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/namespace-reader.clusterrolebinding.yml
@@ -0,0 +1,12 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+ name: namespace-reader-binding
+subjects:
+- kind: ServiceAccount
+ name: default
+ namespace: daemon
+roleRef:
+ kind: ClusterRole
+ name: namespace-reader
+ apiGroup: rbac.authorization.k8s.io
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrole.yml b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrole.yml
new file mode 100644
index 0000000..a832314
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrole.yml
@@ -0,0 +1,9 @@
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ namespace: default
+ name: pod-reader
+rules:
+- apiGroups: [""] # "" indicates the core API group
+ resources: ["pods"]
+ verbs: ["get", "watch", "list"]
diff --git a/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrolebinding.yml b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrolebinding.yml
new file mode 100644
index 0000000..1d32a19
--- /dev/null
+++ b/docker/test/integration/resources/kubernetes/pods-etc/pod-reader.clusterrolebinding.yml
@@ -0,0 +1,12 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+ name: pod-reader-binding
+subjects:
+- kind: ServiceAccount
+ name: default
+ namespace: daemon
+roleRef:
+ kind: ClusterRole
+ name: pod-reader
+ apiGroup: rbac.authorization.k8s.io
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 4ac41e4..cc56a70 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
from minifi.core.FileSystemObserver import FileSystemObserver
from minifi.core.RemoteProcessGroup import RemoteProcessGroup
from minifi.core.SSL_cert_utils import gen_cert, rsa_gen_key_callback, make_ca, make_cert, dump_certificate, dump_privatekey
@@ -5,6 +21,7 @@ from minifi.core.Funnel import Funnel
from minifi.controllers.SSLContextService import SSLContextService
from minifi.controllers.ODBCService import ODBCService
+from minifi.controllers.KubernetesControllerService import KubernetesControllerService
from behave import given, then, when
from behave.model_describe import ModelDescriptor
@@ -29,63 +46,56 @@ def step_impl(context, directory):
context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
-# MiNiFi cluster setups
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
-def step_impl(context, processor_type, processor_name, property, property_value, minifi_container_name):
- container = context.test.acquire_container(minifi_container_name)
+def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'):
+ container = context.test.acquire_container(container_name, engine)
processor = locate("minifi.processors." + processor_type + "." + processor_type)()
processor.set_name(processor_name)
- if property:
- processor.set_property(property, property_value)
+ if property_name is not None:
+ processor.set_property(property_name, property_value)
context.test.add_node(processor)
# Assume that the first node declared is primary unless specified otherwise
if not container.get_start_nodes():
container.add_start_node(processor)
-@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
-@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
-def step_impl(context, processor_type, property, property_value, minifi_container_name):
- context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow".
- format(processor_type=processor_type, property=property, property_value=property_value, minifi_container_name=minifi_container_name, processor_name=processor_type))
+# MiNiFi cluster setups
+@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
+@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
+def step_impl(context, processor_type, processor_name, property_name, property_value, minifi_container_name):
+ __create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name)
+
+
+@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
+@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
+def step_impl(context, processor_type, property_name, property_value, minifi_container_name):
+ __create_processor(context, processor_type, processor_type, property_name, property_value, minifi_container_name)
-@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\"")
-def step_impl(context, processor_type, property, property_value):
- context.execute_steps("given a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow".
- format(processor_type=processor_type, property=property, property_value=property_value, minifi_container_name="minifi-cpp-flow"))
+@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\"")
+def step_impl(context, processor_type, property_name, property_value):
+ __create_processor(context, processor_type, processor_type, property_name, property_value, "minifi-cpp-flow")
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\"")
-def step_impl(context, processor_type, property, property_value, processor_name):
- context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" and the \"{property}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow".
- format(processor_type=processor_type, property=property, property_value=property_value, minifi_container_name="minifi-cpp-flow", processor_name=processor_name))
+@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\"")
+def step_impl(context, processor_type, property_name, property_value, processor_name):
+ __create_processor(context, processor_type, processor_name, property_name, property_value, "minifi-cpp-flow")
@given("a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow")
def step_impl(context, processor_type, processor_name, minifi_container_name):
- container = context.test.acquire_container(minifi_container_name)
- processor = locate("minifi.processors." + processor_type + "." + processor_type)()
- processor.set_name(processor_name)
- context.test.add_node(processor)
- # Assume that the first node declared is primary unless specified otherwise
- if not container.get_start_nodes():
- container.add_start_node(processor)
+ __create_processor(context, processor_type, processor_name, None, None, minifi_container_name)
@given("a {processor_type} processor with the name \"{processor_name}\"")
def step_impl(context, processor_type, processor_name):
- context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow".
- format(processor_type=processor_type, processor_name=processor_name, minifi_container_name="minifi-cpp-flow"))
+ __create_processor(context, processor_type, processor_name, None, None, "minifi-cpp-flow")
@given("a {processor_type} processor in the \"{minifi_container_name}\" flow")
@given("a {processor_type} processor in a \"{minifi_container_name}\" flow")
@given("a {processor_type} processor set up in a \"{minifi_container_name}\" flow")
def step_impl(context, processor_type, minifi_container_name):
- context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow".
- format(processor_type=processor_type, processor_name=processor_type, minifi_container_name=minifi_container_name))
+ __create_processor(context, processor_type, processor_type, None, None, minifi_container_name)
@given("a {processor_type} processor")
@@ -96,7 +106,13 @@ def step_impl(context, processor_type, minifi_container_name):
@given("a {processor_type} processor set up to communicate with an MQTT broker instance")
@given("a {processor_type} processor set up to communicate with the Splunk HEC instance")
def step_impl(context, processor_type):
- context.execute_steps("given a {processor_type} processor in the \"{minifi_container_name}\" flow".format(processor_type=processor_type, minifi_container_name="minifi-cpp-flow"))
+ __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow")
+
+
+@given("a {processor_type} processor in a Kubernetes cluster")
+@given("a {processor_type} processor in the Kubernetes cluster")
+def step_impl(context, processor_type):
+ __create_processor(context, processor_type, processor_type, None, None, "kubernetes", "kubernetes")
@given("a set of processors in the \"{minifi_container_name}\" flow")
@@ -309,6 +325,26 @@ def step_impl(context, producer_name):
producer.set_property("SSL Context Service", ssl_context_service.name)
+# Kubernetes
+def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
+ kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
+ processor = context.test.get_node_by_name(processor_name)
+ processor.controller_services.append(kubernetes_controller_service)
+ processor.set_property(service_property_name, kubernetes_controller_service.name)
+
+
+@given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service")
+@given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service")
+def step_impl(context, processor_name, service_property_name):
+ __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {})
+
+
+@given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"")
+@given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"")
+def step_impl(context, processor_name, service_property_name, property_name, property_value):
+ __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {property_name: property_value})
+
+
# Kafka setup
@given("a kafka broker is set up in correspondence with the PublishKafka")
@given("a kafka broker is set up in correspondence with the third-party kafka publisher")
@@ -553,6 +589,11 @@ def step_impl(context, num_flowfiles, duration):
context.test.check_for_num_files_generated(int(num_flowfiles), timeparse(duration))
+@then("one flowfile with the contents \"{content}\" is placed in the monitored directory in less than {duration}")
+def step_impl(context, content, duration):
+ context.test.check_for_multiple_files_generated(1, timeparse(duration), [content])
+
+
@then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are placed in the monitored directory in less than {duration}")
def step_impl(context, content_1, content_2, duration):
context.test.check_for_multiple_files_generated(2, timeparse(duration), [content_1, content_2])