You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/08/22 20:13:06 UTC

[nifi-minifi-cpp] 04/05: MINIFICPP-1912 Fix occasionally failing MQTT tests

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit c73c904893a71944b41f67b2e3c848dde0af10a2
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Mon Aug 22 22:02:04 2022 +0200

    MINIFICPP-1912 Fix occasionally failing MQTT tests
    
    Closes #1395
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 docker/test/integration/features/mqtt.feature      | 43 +++++++++-------------
 .../integration/minifi/processors/ConsumeMQTT.py   |  3 +-
 .../integration/minifi/processors/PublishMQTT.py   |  3 +-
 3 files changed, 22 insertions(+), 27 deletions(-)

diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature
index d83605b57..1f5204875 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/docker/test/integration/features/mqtt.feature
@@ -75,10 +75,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
 
     When both instances start up
-    Then a file with the content "test" is placed in "/tmp/input"
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "test" is placed in "/tmp/input"
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
-    And the MQTT broker has a log line matching "Received SUBSCRIBE from"
 
   Scenario Outline: Subscription to topics with wildcards
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
@@ -95,10 +96,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
 
     When both instances start up
-    Then a file with the content "test" is placed in "/tmp/input"
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "test" is placed in "/tmp/input"
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from .*test/my/topic.*\(4 bytes\)"
-    And the MQTT broker has a log line matching "Received SUBSCRIBE from"
 
     Examples: Topic wildcard patterns
     | topic wildcard pattern |
@@ -110,14 +112,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
     And a PublishMQTT processor in the "publisher-client" flow
     And the "Quality of Service" property of the PublishMQTT processor is set to "1"
-    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     # consuming MQTT client
     And a ConsumeMQTT processor in the "consumer-client" flow
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
     And the "Clean Session" property of the ConsumeMQTT processor is set to "false"
-    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
     And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
 
@@ -150,10 +150,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
 
     When both instances start up
-    Then a file with the content "<message>" is placed in "/tmp/input"
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "<message>" is placed in "/tmp/input"
     And a flowfile with the content "<message>" is placed in the monitored directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from .*<topic>"
-    And the MQTT broker has a log line matching "Received SUBSCRIBE from"
 
     Examples: Topic wildcard patterns
     | topic                  | message     |
@@ -167,12 +168,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
     And the "Quality of Service" property of the PublishMQTT processor is set to "0"
-    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "0"
-    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -180,9 +179,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
 
     When both instances start up
-    Then a file with the content "test" is placed in "/tmp/input"
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "test" is placed in "/tmp/input"
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
-    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
     And the MQTT broker has a log line matching "Received PUBLISH from publisher-client \(d0, q0, r0, m0, 'testtopic'.*\(4 bytes\)"
     And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q0, r0, m0, 'testtopic',.*\(4 bytes\)\)"
 
@@ -190,12 +190,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
     And the "Quality of Service" property of the PublishMQTT processor is set to "1"
-    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
-    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -203,9 +201,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
 
     When both instances start up
-    Then a file with the content "test" is placed in "/tmp/input"
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "test" is placed in "/tmp/input"
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
-    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
     And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
     And the MQTT broker has a log line matching "Sending PUBACK to publisher-client \(m1, rc0\)"
     And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q1, r0, m1, 'testtopic',.*\(4 bytes\)\)"
@@ -216,12 +215,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
     And the "Quality of Service" property of the PublishMQTT processor is set to "2"
-    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "2"
-    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -229,9 +226,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
 
     When both instances start up
-    Then a file with the content "test" is placed in "/tmp/input"
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "test" is placed in "/tmp/input"
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
-    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
     And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
     And the MQTT broker has a log line matching "Sending PUBREC to publisher-client \(m1, rc0\)"
     And the MQTT broker has a log line matching "Received PUBREL from publisher-client \(Mid: 1\)"
@@ -246,13 +244,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
     And a file with the content "test" is present in "/tmp/input"
     And a PublishMQTT processor in the "publisher-client" flow
-    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
     And the "Retain" property of the PublishMQTT processor is set to "true"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     # consuming MQTT client
     And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
     And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
 
@@ -272,14 +268,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     # publishing MQTT client with last will
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
     And a PublishMQTT processor in the "publisher-client" flow
-    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
     And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic"
     And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     # consuming MQTT client set to consume last will topic
     And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
     And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic"
     And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -295,7 +289,6 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
 
   Scenario: Keep Alive
     Given a ConsumeMQTT processor set up to communicate with an MQTT broker instance
-    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
     And the "Keep Alive Interval" property of the ConsumeMQTT processor is set to "1 sec"
 
     And an MQTT broker is set up in correspondence with the ConsumeMQTT
diff --git a/docker/test/integration/minifi/processors/ConsumeMQTT.py b/docker/test/integration/minifi/processors/ConsumeMQTT.py
index 6b2c444c6..d32b5bb40 100644
--- a/docker/test/integration/minifi/processors/ConsumeMQTT.py
+++ b/docker/test/integration/minifi/processors/ConsumeMQTT.py
@@ -23,6 +23,7 @@ class ConsumeMQTT(Processor):
             'ConsumeMQTT',
             properties={
                 'Broker URI': 'mqtt-broker:1883',
-                'Topic': 'testtopic'},
+                'Topic': 'testtopic',
+                'Client ID': 'consumer-client'},
             auto_terminate=['success'],
             schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishMQTT.py b/docker/test/integration/minifi/processors/PublishMQTT.py
index d4840fee0..8506d3812 100644
--- a/docker/test/integration/minifi/processors/PublishMQTT.py
+++ b/docker/test/integration/minifi/processors/PublishMQTT.py
@@ -23,6 +23,7 @@ class PublishMQTT(Processor):
             'PublishMQTT',
             properties={
                 'Broker URI': 'mqtt-broker:1883',
-                'Topic': 'testtopic'},
+                'Topic': 'testtopic',
+                'Client ID': 'publisher-client'},
             auto_terminate=['success', 'failure'],
             schedule=schedule)