You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/08/22 20:13:06 UTC
[nifi-minifi-cpp] 04/05: MINIFICPP-1912 Fix occasionally failing MQTT tests
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit c73c904893a71944b41f67b2e3c848dde0af10a2
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Mon Aug 22 22:02:04 2022 +0200
MINIFICPP-1912 Fix occasionally failing MQTT tests
Closes #1395
Signed-off-by: Marton Szasz <sz...@apache.org>
---
docker/test/integration/features/mqtt.feature | 43 +++++++++-------------
.../integration/minifi/processors/ConsumeMQTT.py | 3 +-
.../integration/minifi/processors/PublishMQTT.py | 3 +-
3 files changed, 22 insertions(+), 27 deletions(-)
diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature
index d83605b57..1f5204875 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/docker/test/integration/features/mqtt.feature
@@ -75,10 +75,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
When both instances start up
- Then a file with the content "test" is placed in "/tmp/input"
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+ And a file with the content "test" is placed in "/tmp/input"
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
- And the MQTT broker has a log line matching "Received SUBSCRIBE from"
Scenario Outline: Subscription to topics with wildcards
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
@@ -95,10 +96,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
When both instances start up
- Then a file with the content "test" is placed in "/tmp/input"
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+ And a file with the content "test" is placed in "/tmp/input"
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
And the MQTT broker has a log line matching "Received PUBLISH from .*test/my/topic.*\(4 bytes\)"
- And the MQTT broker has a log line matching "Received SUBSCRIBE from"
Examples: Topic wildcard patterns
| topic wildcard pattern |
@@ -110,14 +112,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
And a PublishMQTT processor in the "publisher-client" flow
And the "Quality of Service" property of the PublishMQTT processor is set to "1"
- And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
# consuming MQTT client
And a ConsumeMQTT processor in the "consumer-client" flow
And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
And the "Clean Session" property of the ConsumeMQTT processor is set to "false"
- And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -150,10 +150,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
When both instances start up
- Then a file with the content "<message>" is placed in "/tmp/input"
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+ And a file with the content "<message>" is placed in "/tmp/input"
And a flowfile with the content "<message>" is placed in the monitored directory in less than 60 seconds
And the MQTT broker has a log line matching "Received PUBLISH from .*<topic>"
- And the MQTT broker has a log line matching "Received SUBSCRIBE from"
Examples: Topic wildcard patterns
| topic | message |
@@ -167,12 +168,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a PublishMQTT processor set up to communicate with an MQTT broker instance
And the "Quality of Service" property of the PublishMQTT processor is set to "0"
- And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
And the "Quality of Service" property of the ConsumeMQTT processor is set to "0"
- And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And "ConsumeMQTT" processor is a start node
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -180,9 +179,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
When both instances start up
- Then a file with the content "test" is placed in "/tmp/input"
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+ And a file with the content "test" is placed in "/tmp/input"
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
- And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
And the MQTT broker has a log line matching "Received PUBLISH from publisher-client \(d0, q0, r0, m0, 'testtopic'.*\(4 bytes\)"
And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q0, r0, m0, 'testtopic',.*\(4 bytes\)\)"
@@ -190,12 +190,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a PublishMQTT processor set up to communicate with an MQTT broker instance
And the "Quality of Service" property of the PublishMQTT processor is set to "1"
- And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
- And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And "ConsumeMQTT" processor is a start node
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -203,9 +201,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
When both instances start up
- Then a file with the content "test" is placed in "/tmp/input"
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+ And a file with the content "test" is placed in "/tmp/input"
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
- And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
And the MQTT broker has a log line matching "Sending PUBACK to publisher-client \(m1, rc0\)"
And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q1, r0, m1, 'testtopic',.*\(4 bytes\)\)"
@@ -216,12 +215,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a PublishMQTT processor set up to communicate with an MQTT broker instance
And the "Quality of Service" property of the PublishMQTT processor is set to "2"
- And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
And the "Quality of Service" property of the ConsumeMQTT processor is set to "2"
- And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And "ConsumeMQTT" processor is a start node
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -229,9 +226,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
When both instances start up
- Then a file with the content "test" is placed in "/tmp/input"
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+ And a file with the content "test" is placed in "/tmp/input"
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
- And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
And the MQTT broker has a log line matching "Sending PUBREC to publisher-client \(m1, rc0\)"
And the MQTT broker has a log line matching "Received PUBREL from publisher-client \(Mid: 1\)"
@@ -246,13 +244,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
And a file with the content "test" is present in "/tmp/input"
And a PublishMQTT processor in the "publisher-client" flow
- And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
And the "Retain" property of the PublishMQTT processor is set to "true"
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
# consuming MQTT client
And a ConsumeMQTT processor in the "consumer-client" flow
- And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -272,14 +268,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
# publishing MQTT client with last will
Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
And a PublishMQTT processor in the "publisher-client" flow
- And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic"
And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message"
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
# consuming MQTT client set to consume last will topic
And a ConsumeMQTT processor in the "consumer-client" flow
- And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -295,7 +289,6 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
Scenario: Keep Alive
Given a ConsumeMQTT processor set up to communicate with an MQTT broker instance
- And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
And the "Keep Alive Interval" property of the ConsumeMQTT processor is set to "1 sec"
And an MQTT broker is set up in correspondence with the ConsumeMQTT
diff --git a/docker/test/integration/minifi/processors/ConsumeMQTT.py b/docker/test/integration/minifi/processors/ConsumeMQTT.py
index 6b2c444c6..d32b5bb40 100644
--- a/docker/test/integration/minifi/processors/ConsumeMQTT.py
+++ b/docker/test/integration/minifi/processors/ConsumeMQTT.py
@@ -23,6 +23,7 @@ class ConsumeMQTT(Processor):
'ConsumeMQTT',
properties={
'Broker URI': 'mqtt-broker:1883',
- 'Topic': 'testtopic'},
+ 'Topic': 'testtopic',
+ 'Client ID': 'consumer-client'},
auto_terminate=['success'],
schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishMQTT.py b/docker/test/integration/minifi/processors/PublishMQTT.py
index d4840fee0..8506d3812 100644
--- a/docker/test/integration/minifi/processors/PublishMQTT.py
+++ b/docker/test/integration/minifi/processors/PublishMQTT.py
@@ -23,6 +23,7 @@ class PublishMQTT(Processor):
'PublishMQTT',
properties={
'Broker URI': 'mqtt-broker:1883',
- 'Topic': 'testtopic'},
+ 'Topic': 'testtopic',
+ 'Client ID': 'publisher-client'},
auto_terminate=['success', 'failure'],
schedule=schedule)