You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/10/28 12:03:26 UTC

[nifi-minifi-cpp] 03/04: MINIFICPP-977 MQTT tests added

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 0f9150730c9909f4618772b8a948b2ac5249a8fa
Author: Adam Markovics <am...@cloudera.com>
AuthorDate: Mon Sep 13 11:45:22 2021 +0200

    MINIFICPP-977 MQTT tests added
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1172
---
 .github/workflows/ci.yml                           |  2 +-
 docker/DockerVerify.sh                             |  4 +-
 .../integration/MiNiFi_integration_test_driver.py  |  3 +
 docker/test/integration/features/mqtt.feature      | 66 ++++++++++++++++++
 .../integration/minifi/core/DockerTestCluster.py   |  2 +-
 .../minifi/core/DockerTestDirectoryBindings.py     |  2 +-
 docker/test/integration/minifi/core/ImageStore.py  | 78 +++++++++++-----------
 .../integration/minifi/core/MqttBrokerContainer.py | 23 +++++++
 .../minifi/core/SingleNodeDockerCluster.py         |  3 +
 .../integration/minifi/processors/ConsumeMQTT.py   | 12 ++++
 .../integration/minifi/processors/PublishMQTT.py   | 12 ++++
 docker/test/integration/steps/steps.py             | 22 +++++-
 12 files changed, 183 insertions(+), 46 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3d4a19e..23ec814 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -265,7 +265,7 @@ jobs:
           if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
           mkdir build
           cd build
-          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
           make docker
       - id: install_deps
         run: |
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index e1e70d8..2f89e48 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -63,8 +63,8 @@ export JAVA_HOME
 PATH="$PATH:/usr/lib/jvm/default-jvm/bin"
 export PATH
 
-PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
-export PYTHONPATH
+TEST_DIRECTORY="${docker_dir}/test/integration"
+export TEST_DIRECTORY
 
 # Add --no-logcapture to see logs interleaved with the test output
 BEHAVE_OPTS=(-f pretty --logging-level INFO --logging-clear-handlers)
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 98956f5..a1bbb53 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -204,3 +204,6 @@ class MiNiFi_integration_test():
 
     def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
         assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds)
+
+    def wait_for_container_logs(self, container_name, log_pattern, timeout_seconds, count=1):
+        self.cluster.wait_for_app_logs_regex(container_name, log_pattern, timeout_seconds, count)
diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature
new file mode 100644
index 0000000..a606565
--- /dev/null
+++ b/docker/test/integration/features/mqtt.feature
@@ -0,0 +1,66 @@
+Feature: Sending data to MQTT streaming platform using PublishMQTT
+  In order to send and receive data via MQTT
+  As a user of MiNiFi
+  I need to have PublishMQTT and ConsumeMQTT processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to an MQTT broker
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)"
+
+  Scenario: If the MQTT broker does not exist, then no flow files are processed
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected to the PutFile
+    And the "failure" relationship of the PublishMQTT processor is connected to the PutFile
+
+    When the MiNiFi instance starts up
+    Then no files are placed in the monitored directory in 30 seconds of running time
+
+  Scenario: Verify delivery of message when MQTT broker is unstable
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected to the PutFile
+
+    When the MiNiFi instance starts up
+    Then no files are placed in the monitored directory in 30 seconds of running time
+
+    And an MQTT broker is deployed in correspondence with the PublishMQTT
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)"
+
+  Scenario: A MiNiFi instance publishes and consumes data to/from an MQTT broker
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And "ConsumeMQTT" processor is a start node
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)"
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from"
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 4f465b9..6a25ed7 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -77,7 +77,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
         return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, False)
 
     def wait_for_startup_log(self, container_name, timeout_seconds):
-        return self.wait_for_app_logs(container_name, self.containers[container_name].get_startup_finished_log_entry(), timeout_seconds, 1)
+        return self.wait_for_app_logs_regex(container_name, self.containers[container_name].get_startup_finished_log_entry(), timeout_seconds, 1)
 
     def log_app_output(self):
         for container_name in self.containers:
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index ca769a3..1aed63a 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -22,7 +22,7 @@ class DockerTestDirectoryBindings:
         [self.create_directory(directory) for directory in self.data_directories[test_id].values()]
 
         # Add resources
-        test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
+        test_dir = os.environ['TEST_DIRECTORY']  # Based on DockerVerify.sh
         shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
         shutil.copytree(test_dir + "/resources/python", self.data_directories[test_id]["resources_dir"] + "/python")
 
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 4044574..c1ef051 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -12,6 +12,7 @@ class ImageStore:
     def __init__(self):
         self.client = docker.from_env()
         self.images = dict()
+        self.test_dir = os.environ['TEST_DIRECTORY']  # Based on DockerVerify.sh
 
     def __del__(self):
         self.cleanup()
@@ -23,21 +24,28 @@ class ImageStore:
             self.client.images.remove(image.id, force=True)
 
     def get_image(self, container_engine):
-        if container_engine == "minifi-cpp":
-            return self.__get_minifi_cpp_image()
-        if container_engine == "http-proxy":
-            return self.__get_http_proxy_image()
-        if container_engine == "nifi":
-            return self.__get_nifi_image()
-        if container_engine == "postgresql-server":
-            return self.__get_postgresql_server_image()
-        if container_engine == "kafka-broker":
-            return self.__get_kafka_broker_image()
-
-    def __get_minifi_cpp_image(self):
-        if "minifi-cpp" in self.images:
-            return self.images["minifi-cpp"]
+        if container_engine in self.images:
+            return self.images[container_engine]
 
+        if container_engine == "minifi-cpp":
+            image = self.__build_minifi_cpp_image()
+        elif container_engine == "http-proxy":
+            image = self.__build_http_proxy_image()
+        elif container_engine == "nifi":
+            image = self.__build_nifi_image()
+        elif container_engine == "postgresql-server":
+            image = self.__build_postgresql_server_image()
+        elif container_engine == "kafka-broker":
+            image = self.__build_kafka_broker_image()
+        elif container_engine == "mqtt-broker":
+            image = self.__build_mqtt_broker_image()
+        else:
+            raise Exception("There is no associated image for " + container_engine)
+
+        self.images[container_engine] = image
+        return image
+
+    def __build_minifi_cpp_image(self):
         dockerfile = dedent("""FROM {base_image}
                 USER root
                 RUN apk --update --no-cache add psqlodbc
@@ -71,13 +79,9 @@ class ImageStore:
                 """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
                            minifi_root=MinifiContainer.MINIFI_ROOT))
 
-        self.images["minifi-cpp"] = self.__build_image(dockerfile)
-        return self.images["minifi-cpp"]
-
-    def __get_http_proxy_image(self):
-        if "http-proxy" in self.images:
-            return self.images["http-proxy"]
+        return self.__build_image(dockerfile)
 
+    def __build_http_proxy_image(self):
         dockerfile = dedent("""FROM {base_image}
                 RUN apt -y update && apt install -y apache2-utils
                 RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
@@ -89,13 +93,9 @@ class ImageStore:
                 ENTRYPOINT ["/sbin/entrypoint.sh"]
                 """.format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128'))
 
-        self.images["http-proxy"] = self.__build_image(dockerfile)
-        return self.images["http-proxy"]
-
-    def __get_nifi_image(self):
-        if "nifi" in self.images:
-            return self.images["nifi"]
+        return self.__build_image(dockerfile)
 
+    def __build_nifi_image(self):
         dockerfile = dedent(r"""FROM {base_image}
                 USER root
                 RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties
@@ -105,13 +105,9 @@ class ImageStore:
                            base_image='apache/nifi:' + NifiContainer.NIFI_VERSION,
                            nifi_root=NifiContainer.NIFI_ROOT))
 
-        self.images["nifi"] = self.__build_image(dockerfile)
-        return self.images["nifi"]
-
-    def __get_postgresql_server_image(self):
-        if "postgresql-server" in self.images:
-            return self.images["postgresql-server"]
+        return self.__build_image(dockerfile)
 
+    def __build_postgresql_server_image(self):
         dockerfile = dedent("""FROM {base_image}
                 RUN mkdir -p /docker-entrypoint-initdb.d
                 RUN echo "#!/bin/bash" > /docker-entrypoint-initdb.d/init-user-db.sh && \
@@ -123,16 +119,18 @@ class ImageStore:
                     echo "    INSERT INTO test_table (int_col, text_col) VALUES (3, 'pear');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
                     echo "EOSQL" >> /docker-entrypoint-initdb.d/init-user-db.sh
                 """.format(base_image='postgres:13.2'))
-        self.images["postgresql-server"] = self.__build_image(dockerfile)
-        return self.images["postgresql-server"]
+        return self.__build_image(dockerfile)
+
+    def __build_kafka_broker_image(self):
+        return self.__build_image_by_path(self.test_dir + "/resources/kafka_broker", 'minifi-kafka')
 
-    def __get_kafka_broker_image(self):
-        if "kafka-broker" in self.images:
-            return self.images["kafka-broker"]
+    def __build_mqtt_broker_image(self):
+        dockerfile = dedent("""FROM {base_image}
+            RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
+            CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"]
+            """.format(base_image='eclipse-mosquitto:2.0.12'))
 
-        test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
-        self.images["kafka-broker"] = self.__build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka')
-        return self.images["kafka-broker"]
+        return self.__build_image(dockerfile)
 
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
diff --git a/docker/test/integration/minifi/core/MqttBrokerContainer.py b/docker/test/integration/minifi/core/MqttBrokerContainer.py
new file mode 100644
index 0000000..51632b4
--- /dev/null
+++ b/docker/test/integration/minifi/core/MqttBrokerContainer.py
@@ -0,0 +1,23 @@
+import logging
+from .Container import Container
+
+
+class MqttBrokerContainer(Container):
+    def __init__(self, name, vols, network, image_store):
+        super().__init__(name, 'mqtt-broker', vols, network, image_store)
+
+    def get_startup_finished_log_entry(self):
+        return "mosquitto version [0-9\\.]+ running"
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running MQTT broker docker container...')
+        self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            ports={'1883/tcp': 1883})
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 78fc3de..631c7b3 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -11,6 +11,7 @@ from .S3ServerContainer import S3ServerContainer
 from .AzureStorageServerContainer import AzureStorageServerContainer
 from .HttpProxyContainer import HttpProxyContainer
 from .PostgreSQLServerContainer import PostgreSQLServerContainer
+from .MqttBrokerContainer import MqttBrokerContainer
 
 
 class SingleNodeDockerCluster(Cluster):
@@ -77,6 +78,8 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, AzureStorageServerContainer(name, self.vols, self.network, self.image_store))
         elif engine == 'postgresql-server':
             return self.containers.setdefault(name, PostgreSQLServerContainer(name, self.vols, self.network, self.image_store))
+        elif engine == 'mqtt-broker':
+            return self.containers.setdefault(name, MqttBrokerContainer(name, self.vols, self.network, self.image_store))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/minifi/processors/ConsumeMQTT.py b/docker/test/integration/minifi/processors/ConsumeMQTT.py
new file mode 100644
index 0000000..8c5721f
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ConsumeMQTT.py
@@ -0,0 +1,12 @@
+from ..core.Processor import Processor
+
+
+class ConsumeMQTT(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'TIMER_DRIVEN'}):
+        super(ConsumeMQTT, self).__init__(
+            'ConsumeMQTT',
+            properties={
+                'Broker URI': 'mqtt-broker:1883',
+                'Topic': 'testtopic'},
+            auto_terminate=['success'],
+            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishMQTT.py b/docker/test/integration/minifi/processors/PublishMQTT.py
new file mode 100644
index 0000000..4f84fa8
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PublishMQTT.py
@@ -0,0 +1,12 @@
+from ..core.Processor import Processor
+
+
+class PublishMQTT(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(PublishMQTT, self).__init__(
+            'PublishMQTT',
+            properties={
+                'Broker URI': 'mqtt-broker:1883',
+                'Topic': 'testtopic'},
+            auto_terminate=['success', 'failure'],
+            schedule=schedule)
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 110ca60..c2fef16 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -81,6 +81,7 @@ def step_impl(context, processor_type, minifi_container_name):
 @given("a {processor_type} processor set up to communicate with the same s3 server")
 @given("a {processor_type} processor set up to communicate with an Azure blob storage")
 @given("a {processor_type} processor set up to communicate with a kafka broker instance")
+@given("a {processor_type} processor set up to communicate with an MQTT broker instance")
 def step_impl(context, processor_type):
     context.execute_steps("given a {processor_type} processor in the \"{minifi_container_name}\" flow".format(processor_type=processor_type, minifi_container_name="minifi-cpp-flow"))
 
@@ -290,6 +291,13 @@ def step_impl(context):
     context.test.acquire_container("kafka-broker", "kafka-broker")
 
 
+# MQTT setup
+@given("an MQTT broker is set up in correspondence with the PublishMQTT")
+@given("an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT")
+def step_impl(context):
+    context.test.acquire_container("mqtt-broker", "mqtt-broker")
+
+
 # s3 setup
 @given("a s3 server is set up in correspondence with the PutS3Object")
 @given("a s3 server is set up in correspondence with the DeleteS3Object")
@@ -367,7 +375,7 @@ def step_impl(context, content, topic_name):
 
 @when("a message with content \"{content}\" is published to the \"{topic_name}\" topic using an ssl connection")
 def step_impl(context, content, topic_name):
-    test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
+    test_dir = os.environ['TEST_DIRECTORY']  # Based on DockerVerify.sh
     producer = Producer({
         "bootstrap.servers": "localhost:29093",
         "security.protocol": "ssl",
@@ -551,3 +559,15 @@ def step_impl(context, log_message, duration):
 @then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}")
 def step_impl(context, regex, duration):
     context.test.check_minifi_log_matches_regex(regex, timeparse(duration))
+
+
+# MQTT
+@then("the MQTT broker has a log line matching \"{log_pattern}\"")
+def step_impl(context, log_pattern):
+    context.test.wait_for_container_logs('mqtt-broker', log_pattern, 30, count=1)
+
+
+@then("an MQTT broker is deployed in correspondence with the PublishMQTT")
+def step_impl(context):
+    context.test.acquire_container("mqtt-broker", "mqtt-broker")
+    context.test.start()