You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/08/16 15:03:31 UTC

[nifi-minifi-cpp] branch main updated (ba15abcd4 -> 2abcb71ce)

This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from ba15abcd4 MINIFICPP-1885 Mark extensions not in a static list as disabled
     new faa16d683 MINIFICPP-1907 Add description to monadic operation wrappers
     new e31ee031b MINIFICPP-1905 - Enumerate only relevant subdir contents
     new 2abcb71ce MINIFICPP-1680 - Add support and tests for advanced MQTT features

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 C2.md                                              | 154 +---------
 PROCESSORS.md                                      |  73 +++--
 bin/minifi.sh                                      |   4 +
 cmake/PahoMqttC.cmake                              |   4 +-
 .../integration/MiNiFi_integration_test_driver.py  |  19 +-
 docker/test/integration/features/mqtt.feature      | 228 +++++++++++++-
 docker/test/integration/minifi/core/Cluster.py     |  15 +
 docker/test/integration/minifi/core/Container.py   |   9 +
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 .../integration/minifi/core/MinifiContainer.py     |  18 ++
 .../minifi/core/SingleNodeDockerCluster.py         |  26 +-
 docker/test/integration/steps/steps.py             |  35 ++-
 extensions/mqtt/CMakeLists.txt                     |   6 +-
 .../controllerservice/MQTTControllerService.cpp    |  81 -----
 .../mqtt/controllerservice/MQTTControllerService.h | 327 ---------------------
 .../mqtt/processors/AbstractMQTTProcessor.cpp      | 231 ++++++++-------
 extensions/mqtt/processors/AbstractMQTTProcessor.h | 226 +++++++++-----
 .../AbstractMQTTProcessorStaticDefinitions.cpp     |  54 +++-
 extensions/mqtt/processors/ConsumeMQTT.cpp         |  95 ++++--
 extensions/mqtt/processors/ConsumeMQTT.h           |  75 +++--
 extensions/mqtt/processors/ConvertBase.cpp         |  44 ---
 extensions/mqtt/processors/ConvertBase.h           |  67 -----
 .../processors/ConvertBaseStaticDefinitions.cpp    |  51 ----
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |  62 ----
 extensions/mqtt/processors/ConvertHeartBeat.h      |  52 ----
 extensions/mqtt/processors/ConvertJSONAck.cpp      |  84 ------
 extensions/mqtt/processors/ConvertJSONAck.h        |  58 ----
 extensions/mqtt/processors/ConvertUpdate.cpp       |  92 ------
 extensions/mqtt/processors/ConvertUpdate.h         |  66 -----
 extensions/mqtt/processors/PublishMQTT.cpp         |  70 +++--
 extensions/mqtt/processors/PublishMQTT.h           | 106 ++++---
 extensions/mqtt/protocol/MQTTC2Protocol.cpp        |  98 ------
 extensions/mqtt/protocol/MQTTC2Protocol.h          |  81 -----
 extensions/mqtt/tests/CMakeLists.txt               |  35 +++
 extensions/mqtt/tests/ConsumeMQTTTests.cpp         |  84 ++++++
 extensions/mqtt/tests/PublishMQTTTests.cpp         |  51 ++++
 extensions/pdh/tests/CMakeLists.txt                |   1 -
 libminifi/include/core/Property.h                  |  18 +-
 .../utils/detail/MonadicOperationWrappers.h        |  29 ++
 libminifi/include/utils/file/FileUtils.h           |  21 +-
 libminifi/test/mqtt-tests/CMakeLists.txt           |  36 ---
 libminifi/test/unit/FilePatternTests.cpp           |  33 +++
 42 files changed, 1178 insertions(+), 1743 deletions(-)
 delete mode 100644 extensions/mqtt/controllerservice/MQTTControllerService.cpp
 delete mode 100644 extensions/mqtt/controllerservice/MQTTControllerService.h
 delete mode 100644 extensions/mqtt/processors/ConvertBase.cpp
 delete mode 100644 extensions/mqtt/processors/ConvertBase.h
 delete mode 100644 extensions/mqtt/processors/ConvertBaseStaticDefinitions.cpp
 delete mode 100644 extensions/mqtt/processors/ConvertHeartBeat.cpp
 delete mode 100644 extensions/mqtt/processors/ConvertHeartBeat.h
 delete mode 100644 extensions/mqtt/processors/ConvertJSONAck.cpp
 delete mode 100644 extensions/mqtt/processors/ConvertJSONAck.h
 delete mode 100644 extensions/mqtt/processors/ConvertUpdate.cpp
 delete mode 100644 extensions/mqtt/processors/ConvertUpdate.h
 delete mode 100644 extensions/mqtt/protocol/MQTTC2Protocol.cpp
 delete mode 100644 extensions/mqtt/protocol/MQTTC2Protocol.h
 create mode 100644 extensions/mqtt/tests/CMakeLists.txt
 create mode 100644 extensions/mqtt/tests/ConsumeMQTTTests.cpp
 create mode 100644 extensions/mqtt/tests/PublishMQTTTests.cpp
 delete mode 100644 libminifi/test/mqtt-tests/CMakeLists.txt


[nifi-minifi-cpp] 03/03: MINIFICPP-1680 - Add support and tests for advanced MQTT features

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2abcb71ce34d80a6ee16b027e96872cb59cec8e9
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Thu Nov 4 16:43:12 2021 +0100

    MINIFICPP-1680 - Add support and tests for advanced MQTT features
    
    - Durable sessions working
    - Remove unused MQTT code
    - Support for disconnection and durable sessions
    - UTF-8 topics and messages
    - QoS implementation
    - Last Will implementation
    - Keep Alive
    - Bump versions of Mosquitto docker container
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1363
---
 C2.md                                              | 154 +---------
 PROCESSORS.md                                      |  73 +++--
 bin/minifi.sh                                      |   4 +
 cmake/PahoMqttC.cmake                              |   4 +-
 .../integration/MiNiFi_integration_test_driver.py  |  19 +-
 docker/test/integration/features/mqtt.feature      | 228 +++++++++++++-
 docker/test/integration/minifi/core/Cluster.py     |  15 +
 docker/test/integration/minifi/core/Container.py   |   9 +
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 .../integration/minifi/core/MinifiContainer.py     |  18 ++
 .../minifi/core/SingleNodeDockerCluster.py         |  26 +-
 docker/test/integration/steps/steps.py             |  35 ++-
 extensions/mqtt/CMakeLists.txt                     |   6 +-
 .../controllerservice/MQTTControllerService.cpp    |  81 -----
 .../mqtt/controllerservice/MQTTControllerService.h | 327 ---------------------
 .../mqtt/processors/AbstractMQTTProcessor.cpp      | 231 ++++++++-------
 extensions/mqtt/processors/AbstractMQTTProcessor.h | 226 +++++++++-----
 .../AbstractMQTTProcessorStaticDefinitions.cpp     |  54 +++-
 extensions/mqtt/processors/ConsumeMQTT.cpp         |  95 ++++--
 extensions/mqtt/processors/ConsumeMQTT.h           |  75 +++--
 extensions/mqtt/processors/ConvertBase.cpp         |  44 ---
 extensions/mqtt/processors/ConvertBase.h           |  67 -----
 .../processors/ConvertBaseStaticDefinitions.cpp    |  51 ----
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |  62 ----
 extensions/mqtt/processors/ConvertHeartBeat.h      |  52 ----
 extensions/mqtt/processors/ConvertJSONAck.cpp      |  84 ------
 extensions/mqtt/processors/ConvertJSONAck.h        |  58 ----
 extensions/mqtt/processors/ConvertUpdate.cpp       |  92 ------
 extensions/mqtt/processors/ConvertUpdate.h         |  66 -----
 extensions/mqtt/processors/PublishMQTT.cpp         |  70 +++--
 extensions/mqtt/processors/PublishMQTT.h           | 106 ++++---
 extensions/mqtt/protocol/MQTTC2Protocol.cpp        |  98 ------
 extensions/mqtt/protocol/MQTTC2Protocol.h          |  81 -----
 extensions/mqtt/tests/CMakeLists.txt               |  35 +++
 extensions/mqtt/tests/ConsumeMQTTTests.cpp         |  84 ++++++
 extensions/mqtt/tests/PublishMQTTTests.cpp         |  51 ++++
 extensions/pdh/tests/CMakeLists.txt                |   1 -
 libminifi/include/core/Property.h                  |  18 +-
 libminifi/test/mqtt-tests/CMakeLists.txt           |  36 ---
 39 files changed, 1111 insertions(+), 1727 deletions(-)

diff --git a/C2.md b/C2.md
index 4e2a815ae..2f2af37d5 100644
--- a/C2.md
+++ b/C2.md
@@ -70,8 +70,6 @@ be requested via C2 DESCRIBE manifest command.
 
 	# specify C2 protocol -- default is RESTSender if not specified
 	nifi.c2.agent.protocol.class=RESTSender
-	# may also use MQTT or CoapProtocol
-	# nifi.c2.agent.protocol.class=MQTTC2Protocol
 	# nifi.c2.agent.protocol.class=CoapProtocol
 
 	# control c2 heartbeat interval in milliseconds
@@ -208,8 +206,7 @@ configuration produces the following JSON:
 
 ### Protocols
 
-The default protocol is a RESTFul service; however, there is an MQTT protocol with a translation to use the
-RESTFul C2 server and a CoAP Protocol implementation. The CoAP protocol requires that COAP be enabled either
+The default protocol is a RESTFul service. The CoAP protocol requires that COAP be enabled either
 through the bootstrap or the cmake flag -DENABLE_COAP=ON .
 
 Once configured, COAP uses a controller service within the flow OR minifi properties entries: nifi.c2.agent.coap.host and nifi.c2.agent.coap.port.
@@ -232,155 +229,6 @@ below. Note that Max Queue Size is the only non-required property:
 	    Remote Port: port
 	    Max Queue Size: 1000
 
-As defined, above, MQTTC2Protocol can be used for the agent protocol class. If you wish to communicate with a RESTFul C2 server,
-you may use the ConvertBase, ConvertHeartBeat, ConvertJSONAack, and ConvertUpdate classes on an agent to perform the translation.
-State is not kept with an intermediate agent other than the broker. The broker is not embedded with the agent to simplify the agent.
-
-An example configuration, below, defines an agent that receives and forward MQTT C2 requests to a C2 server. Additionally, this agent
-will forward responses and updates to the heartbeating agents.
-
-	MiNiFi Config Version: 3
-	Flow Controller:
-	  name: GetFile
-	Core Properties:
-	  flow controller graceful shutdown period: 10 sec
-	  flow service write delay interval: 500 ms
-	  administrative yield duration: 30 sec
-	  bored yield duration: 10 millis
-	  max concurrent threads: 1
-	  variable registry properties: ''
-	FlowFile Repository:
-	  partitions: 256
-	  checkpoint interval: 2 mins
-	  always sync: false
-	  Swap:
-	    threshold: 20000
-	    in period: 5 sec
-	    in threads: 1
-	    out period: 5 sec
-	    out threads: 4
-	Content Repository:
-	  content claim max appendable size: 10 MB
-	  content claim max flow files: 100
-	  always sync: false
-	Provenance Repository:
-	  provenance rollover time: 1 min
-	Component Status Repository:
-	  buffer size: 1440
-	  snapshot frequency: 1 min
-	Security Properties:
-	  keystore: ''
-	  keystore type: ''
-	  keystore password: ''
-	  key password: ''
-	  truststore: ''
-	  truststore type: ''
-	  truststore password: ''
-	  ssl protocol: ''
-	  Sensitive Props:
-	    key:
-	    algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
-	    provider: BC
-	Processors:
-	- id: 24493a28-015a-1000-0000-000000000000
-	  name: convert
-	  class: org.apache.nifi.processors.standard.ConvertHeartBeat
-	  max concurrent tasks: 1
-	  scheduling strategy: TIMER_DRIVEN
-	  scheduling period: 10 msec
-	  penalization period: 30 sec
-	  yield period: 2 sec
-	  run duration nanos: 10 msec
-	  auto-terminated relationships list:
-	  Properties:
-	    MQTT Controller Service: mqttservice
-	    Listening Topic: heartbeats
-	- id: 24493a28-015a-1000-0000-000000000006
-	  name: convertJSON
-	  class: org.apache.nifi.processors.standard.ConvertJSONAck
-	  max concurrent tasks: 1
-	  scheduling strategy: TIMER_DRIVEN
-	  scheduling period: 10 msec
-	  penalization period: 30 sec
-	  yield period: 1 sec
-	  run duration nanos: 10 msec
-	  auto-terminated relationships list:
-	  - success
-	  Properties:
-	    MQTT Controller Service: mqttservice
-	- id: 24493a28-015a-1000-0000-000000000007
-	  name: convertupdate
-	  class: org.apache.nifi.processors.standard.ConvertUpdate
-	  max concurrent tasks: 1
-	  scheduling strategy: TIMER_DRIVEN
-	  scheduling period: 10 msec
-	  penalization period: 30 sec
-	  yield period: 1 sec
-	  run duration nanos: 10 msec
-	  auto-terminated relationships list:
-	  - success
-	  Properties:
-	    MQTT Controller Service: mqttservice
-	    Listening Topic: updates
-	- id: 24493a28-015a-1000-0000-000000000021
-	  name: httpheartbeat
-	  class: org.apache.nifi.processors.standard.InvokeHTTP
-	  max concurrent tasks: 1
-	  scheduling strategy: TIMER_DRIVEN
-	  scheduling period: 10 msec
-	  penalization period: 30 sec
-	  yield period: 1 sec
-	  run duration nanos: 10 msec
-	  auto-terminated relationships list:
-	  Properties:
-	    HTTP Method: POST
-	    Remote URL: http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat
-	    Content-type: application/json
-	- id: 24493a28-015a-1000-0000-000000000022
-	  name: log
-	  class: org.apache.nifi.processors.standard.LogAttribute
-	  max concurrent tasks: 1
-	  scheduling strategy: TIMER_DRIVEN
-	  scheduling period: 100 msec
-	  penalization period: 30 sec
-	  yield period: 1 sec
-	  run duration nanos: 1 msec
-	  auto-terminated relationships list:
-	  - success
-	  Properties:
-	Controller Services:
-	- id: 94491a38-015a-1000-0000-000000000001
-	  name: mqttservice
-	  class: MQTTContextService
-	  Properties:
-	    Broker URI: localhost:1883
-	    Client ID: hiblajl
-	    Quality of Service: 2
-	Process Groups: []
-	Input Ports: []
-	Output Ports: []
-	Funnels: []
-	Connections:
-	- id: 1d09c015-015d-1000-0000-000000000000
-	  name: convert/success/httpheartbeat
-	  source id: 24493a28-015a-1000-0000-000000000000
-	  source relationship name: success
-	  destination id: 24493a28-015a-1000-0000-000000000021
-	  max work queue size: 10000
-	  max work queue data size: 1 GB
-	  flowfile expiration: 0 sec
-	  queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
-	- id: 1d09c015-015d-1000-0000-000000000002
-	  name: httpheartbeat/success/convertJSON
-	  source id: 24493a28-015a-1000-0000-000000000021
-	  source relationship name: success
-	  destination id: 24493a28-015a-1000-0000-000000000006
-	  max work queue size: 10000
-	  max work queue data size: 1 GB
-	  flowfile expiration: 0 sec
-	  queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
-	Remote Process Groups: []
-	NiFi Properties Overrides: {}
 
 ### UpdatePolicies
 
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 20a393c57..853b4c82a 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -336,19 +336,30 @@ This Processor gets the contents of a FlowFile from a MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                             |
-|-----------------------|---------------|------------------|---------------------------------------------------------------------------------------------------------|
-| Broker URI            |               |                  | The URI to use to connect to the MQTT broker                                                            |
-| Client ID             |               |                  | MQTT client ID to use                                                                                   |
-| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT server                |
-| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages sent or received                                     |
-| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                           |
-| Password              |               |                  | Password to use when connecting to the broker                                                           |
-| Quality of Service    | MQTT_QOS_0    |                  | The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'             |
-| Queue Max Message     |               |                  | Maximum number of messages allowed on the received MQTT queue                                           |
-| Session state         | true          |                  | Whether to start afresh or resume previous flows. See the allowable value descriptions for more details |
-| Topic                 |               |                  | The topic to publish the message to                                                                     |
-| Username              |               |                  | Username to use when connecting to the broker                                                           |
+| Name                  | Default Value | Allowable Values | Description                                                                                                                 |
+|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**        |               |                  | The URI to use to connect to the MQTT broker                                                                                |
+| **Topic**             |               |                  | The topic to subscribe to                                                                                                   |
+| Client ID             |               |                  | MQTT client ID to use                                                                                                       |
+| Quality of Service    | 0             |                  | The Quality of Service (QoS) to receive the message with. Accepts three values '0', '1' and '2'                             |
+| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
+| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages being sent to the broker                                                 |
+| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                                               |
+| Last Will Topic       |               |                  | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
+| Last Will Message     |               |                  | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
+| Last Will QoS         | 0             |                  | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'                              |
+| Last Will Retain      | false         |                  | Whether to retain the client's Last Will                                                                                    |
+| Security Protocol     |               |                  | Protocol used to communicate with brokers                                                                                   |
+| Security CA           |               |                  | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
+| Security Cert         |               |                  | Path to client's public key (PEM) used for authentication                                                                   |
+| Security Private Key  |               |                  | Path to client's private key (PEM) used for authentication                                                                  |
+| Security Pass Phrase  |               |                  | Private key passphrase                                                                                                      |
+| Username              |               |                  | Username to use when connecting to the broker                                                                               |
+| Password              |               |                  | Password to use when connecting to the broker                                                                               |
+| Clean Session         | true          |                  | Whether to start afresh rather than remembering previous subscriptions                                                      |
+| Queue Max Message     | 1000          |                  | Maximum number of messages allowed on the received MQTT queue                                                               |
+
+
 ### Relationships
 
 | Name    | Description                                                                                  |
@@ -1770,19 +1781,29 @@ PublishMQTT serializes FlowFile content as an MQTT payload, sending the message
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                             |
-|-----------------------|---------------|------------------|---------------------------------------------------------------------------------------------------------|
-| Broker URI            |               |                  | The URI to use to connect to the MQTT broker                                                            |
-| Client ID             |               |                  | MQTT client ID to use                                                                                   |
-| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT server                |
-| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages sent or received                                     |
-| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                           |
-| Password              |               |                  | Password to use when connecting to the broker                                                           |
-| Quality of Service    | MQTT_QOS_0    |                  | The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'             |
-| Retain                | false         |                  | Retain MQTT published record in broker                                                                  |
-| Session state         | true          |                  | Whether to start afresh or resume previous flows. See the allowable value descriptions for more details |
-| Topic                 |               |                  | The topic to publish the message to                                                                     |
-| Username              |               |                  | Username to use when connecting to the broker                                                           |
+| Name                  | Default Value | Allowable Values | Description                                                                                                                 |
+|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**        |               |                  | The URI to use to connect to the MQTT broker                                                                                |
+| **Topic**             |               |                  | The topic to publish to                                                                                                     |
+| Client ID             |               |                  | MQTT client ID to use                                                                                                       |
+| Quality of Service    | 0             |                  | The Quality of Service (QoS) to send the message with. Accepts three values '0', '1' and '2'                                |
+| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
+| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages being sent to the broker                                                 |
+| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                                               |
+| Last Will Topic       |               |                  | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
+| Last Will Message     |               |                  | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
+| Last Will QoS         | 0             |                  | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'                              |
+| Last Will Retain      | false         |                  | Whether to retain the client's Last Will                                                                                    |
+| Security Protocol     |               |                  | Protocol used to communicate with brokers                                                                                   |
+| Security CA           |               |                  | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
+| Security Cert         |               |                  | Path to client's public key (PEM) used for authentication                                                                   |
+| Security Private Key  |               |                  | Path to client's private key (PEM) used for authentication                                                                  |
+| Security Pass Phrase  |               |                  | Private key passphrase                                                                                                      |
+| Username              |               |                  | Username to use when connecting to the broker                                                                               |
+| Password              |               |                  | Password to use when connecting to the broker                                                                               |
+| Retain                | false         |                  | Retain published message in broker                                                                                          |
+
+
 ### Relationships
 
 | Name    | Description                                                                                  |
diff --git a/bin/minifi.sh b/bin/minifi.sh
index b9ed692f1..2e074e4c5 100755
--- a/bin/minifi.sh
+++ b/bin/minifi.sh
@@ -78,6 +78,8 @@ active_pid() {
   pid=${1}
   if [ "${pid}" -eq -1 ]; then
     echo 1
+  elif [ "${pid}" -eq $$ ]; then
+    echo 1
   else
     kill -s 0 "${pid}" > /dev/null 2>&1
     echo $?
@@ -155,6 +157,8 @@ active_pid() {
   pid=\${1}
   if [ \${pid} -eq -1 ]; then
     echo 1
+  elif [ "${pid}" -eq $$ ]; then
+    echo 1
   else
     kill -s 0 \${pid} > /dev/null 2>&1
     echo \$?
diff --git a/cmake/PahoMqttC.cmake b/cmake/PahoMqttC.cmake
index d6b2870f9..85932daac 100644
--- a/cmake/PahoMqttC.cmake
+++ b/cmake/PahoMqttC.cmake
@@ -38,8 +38,8 @@ FetchContent_MakeAvailable(paho.mqtt.c-external)
 
 # Set dependencies and target to link to
 if (NOT OPENSSL_OFF)
-    add_library(paho.mqtt.c ALIAS paho-mqtt3cs-static)
+    add_library(paho.mqtt.c ALIAS paho-mqtt3as-static)
     add_dependencies(common_ssl_obj_static OpenSSL::SSL OpenSSL::Crypto)
 else()
-    add_library(paho.mqtt.c ALIAS paho-mqtt3c-static)
+    add_library(paho.mqtt.c ALIAS paho-mqtt3a-static)
 endif()
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index ec392aeb0..a67679106 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -87,12 +87,29 @@ class MiNiFi_integration_test:
         self.cluster.deploy('opensearch')
         assert self.wait_for_container_startup_to_finish('opensearch')
 
-    def start(self):
+    def start(self, container_name=None):
+        if container_name is not None:
+            logging.info("Starting container %s", container_name)
+            self.cluster.deploy_flow(container_name)
+            assert self.wait_for_container_startup_to_finish(container_name)
+            return
         logging.info("MiNiFi_integration_test start")
         self.cluster.deploy_flow()
         for container_name in self.cluster.containers:
             assert self.wait_for_container_startup_to_finish(container_name)
 
+    def stop(self, container_name):
+        logging.info("Stopping container %s", container_name)
+        self.cluster.stop_flow(container_name)
+
+    def kill(self, container_name):
+        logging.info("Killing container %s", container_name)
+        self.cluster.kill_flow(container_name)
+
+    def restart(self, container_name):
+        logging.info("Restarting container %s", container_name)
+        self.cluster.restart_flow(container_name)
+
     def add_node(self, processor):
         if processor.get_name() in (elem.get_name() for elem in self.connectable_nodes):
             raise Exception("Trying to register processor with an already registered name: \"%s\"" % processor.get_name())
diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature
index bcdcaec22..d83605b57 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/docker/test/integration/features/mqtt.feature
@@ -64,7 +64,6 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
 
   Scenario: A MiNiFi instance publishes and consumes data to/from an MQTT broker
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
-    And a file with the content "test" is present in "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
@@ -76,6 +75,231 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
 
     When both instances start up
-    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    Then a file with the content "test" is placed in "/tmp/input"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from"
+
+  Scenario Outline: Subscription to topics with wildcards
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "Topic" property of the PublishMQTT processor is set to "test/my/topic"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "Topic" property of the ConsumeMQTT processor is set to "<topic wildcard pattern>"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And "ConsumeMQTT" processor is a start node
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then a file with the content "test" is placed in "/tmp/input"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*test/my/topic.*\(4 bytes\)"
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from"
+
+    Examples: Topic wildcard patterns
+    | topic wildcard pattern |
+    | test/#                 |
+    | test/+/topic           |
+
+  Scenario: Subscription and publishing with disconnecting clients in persistent sessions
+    # publishing MQTT client
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+    And a PublishMQTT processor in the "publisher-client" flow
+    And the "Quality of Service" property of the PublishMQTT processor is set to "1"
+    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    # consuming MQTT client
+    And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
+    And the "Clean Session" property of the ConsumeMQTT processor is set to "false"
+    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When all instances start up
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And "consumer-client" flow is stopped
+    And the MQTT broker has a log line matching "Received DISCONNECT from consumer-client"
+
+    And a file with the content "test" is placed in "/tmp/input"
     And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
+
+    And "consumer-client" flow is restarted
+    And the MQTT broker has 2 log lines matching "New client connected from .* as consumer-client"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario Outline: UTF-8 topics and messages
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "Topic" property of the PublishMQTT processor is set to "<topic>"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "Topic" property of the ConsumeMQTT processor is set to "<topic>"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And "ConsumeMQTT" processor is a start node
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then a file with the content "<message>" is placed in "/tmp/input"
+    And a flowfile with the content "<message>" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*<topic>"
     And the MQTT broker has a log line matching "Received SUBSCRIBE from"
+
+    Examples: Topic wildcard patterns
+    | topic                  | message     |
+    | Лев Николаевич Толстой | Война и мир |
+    | 孙子                    | 孫子兵法     |
+    | محمد بن موسی خوارزمی   | ٱلْجَبْر       |
+    | תַּלְמוּד                  | תּוֹרָה        |
+
+
+  Scenario: QoS 0 message flow is correct
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "Quality of Service" property of the PublishMQTT processor is set to "0"
+    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "Quality of Service" property of the ConsumeMQTT processor is set to "0"
+    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And "ConsumeMQTT" processor is a start node
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then a file with the content "test" is placed in "/tmp/input"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "Received PUBLISH from publisher-client \(d0, q0, r0, m0, 'testtopic'.*\(4 bytes\)"
+    And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q0, r0, m0, 'testtopic',.*\(4 bytes\)\)"
+
+  Scenario: QoS 1 Subscriber sends PUBACK on a PUBLISH message, with correct packet ID
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "Quality of Service" property of the PublishMQTT processor is set to "1"
+    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
+    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And "ConsumeMQTT" processor is a start node
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then a file with the content "test" is placed in "/tmp/input"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
+    And the MQTT broker has a log line matching "Sending PUBACK to publisher-client \(m1, rc0\)"
+    And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q1, r0, m1, 'testtopic',.*\(4 bytes\)\)"
+    And the MQTT broker has a log line matching "Received PUBACK from consumer-client \(Mid: 1, RC:0\)"
+
+
+  Scenario: QoS 2 message flow is correct
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "Quality of Service" property of the PublishMQTT processor is set to "2"
+    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "Quality of Service" property of the ConsumeMQTT processor is set to "2"
+    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And "ConsumeMQTT" processor is a start node
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then a file with the content "test" is placed in "/tmp/input"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
+    And the MQTT broker has a log line matching "Sending PUBREC to publisher-client \(m1, rc0\)"
+    And the MQTT broker has a log line matching "Received PUBREL from publisher-client \(Mid: 1\)"
+    And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q2, r0, m1, 'testtopic',.*\(4 bytes\)\)"
+    And the MQTT broker has a log line matching "Sending PUBCOMP to publisher-client \(m1\)"
+    And the MQTT broker has a log line matching "Received PUBREC from consumer-client \(Mid: 1\)"
+    And the MQTT broker has a log line matching "Sending PUBREL to consumer-client \(m1\)"
+    And the MQTT broker has a log line matching "Received PUBCOMP from consumer-client \(Mid: 1, RC:0\)"
+
+  Scenario: Retained message
+    # publishing MQTT client
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor in the "publisher-client" flow
+    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
+    And the "Retain" property of the PublishMQTT processor is set to "true"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    # consuming MQTT client
+    And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When the MQTT broker is started
+    And "publisher-client" flow is started
+    Then the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
+
+    # consumer is joining late, but it will still see the retained message
+    And "consumer-client" flow is started
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: Last will
+    # publishing MQTT client with last will
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+    And a PublishMQTT processor in the "publisher-client" flow
+    And the "Client ID" property of the PublishMQTT processor is set to "publisher-client"
+    And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic"
+    And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    # consuming MQTT client set to consume last will topic
+    And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
+    And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When all instances start up
+    Then the MQTT broker has a log line matching "Sending CONNACK to publisher-client"
+    Then the MQTT broker has a log line matching "Sending CONNACK to consumer-client"
+    And "publisher-client" flow is killed
+    And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client"
+    And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: Keep Alive
+    Given a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client"
+    And the "Keep Alive Interval" property of the ConsumeMQTT processor is set to "1 sec"
+
+    And an MQTT broker is set up in correspondence with the ConsumeMQTT
+
+    When both instances start up
+    Then the MQTT broker has a log line matching "Received PINGREQ from consumer-client"
+    Then the MQTT broker has a log line matching "Sending PINGRESP to consumer-client"
diff --git a/docker/test/integration/minifi/core/Cluster.py b/docker/test/integration/minifi/core/Cluster.py
index d13a2f2aa..76c79df0d 100644
--- a/docker/test/integration/minifi/core/Cluster.py
+++ b/docker/test/integration/minifi/core/Cluster.py
@@ -26,6 +26,21 @@ class Cluster(object):
         Deploys a flow to the cluster.
         """
 
+    def stop_flow(self, container_name):
+        """
+        Stops a flow in the cluster.
+        """
+
+    def kill_flow(self, container_name):
+        """
+        Kills (ungracefully stops) a flow in the cluster.
+        """
+
+    def restart_flow(self, container_name):
+        """
+        Stops a flow in the cluster.
+        """
+
     def __enter__(self):
         """
         Allocate ephemeral cluster resources.
diff --git a/docker/test/integration/minifi/core/Container.py b/docker/test/integration/minifi/core/Container.py
index 2e0c40641..d027fd696 100644
--- a/docker/test/integration/minifi/core/Container.py
+++ b/docker/test/integration/minifi/core/Container.py
@@ -62,5 +62,14 @@ class Container:
     def log_source(self):
         return LogSource.FROM_DOCKER_CONTAINER
 
+    def stop(self):
+        raise NotImplementedError()
+
+    def kill(self):
+        raise NotImplementedError()
+
+    def restart(self):
+        raise NotImplementedError()
+
     def get_startup_finished_log_entry(self):
         raise NotImplementedError()
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index ee87b3332..d5ec680cc 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -169,7 +169,7 @@ class ImageStore:
             FROM {base_image}
             RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
             CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"]
-            """.format(base_image='eclipse-mosquitto:2.0.12'))
+            """.format(base_image='eclipse-mosquitto:2.0.14'))
 
         return self.__build_image(dockerfile)
 
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
index 17783eec6..31d728005 100644
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiContainer.py
@@ -54,3 +54,21 @@ class MinifiContainer(FlowContainer):
             entrypoint=self.command,
             volumes=self.vols)
         logging.info('Added container \'%s\'', self.name)
+
+    def stop(self):
+        logging.info('Stopping minifi docker container "%s"...', self.name)
+        self.client.containers.get(self.name).stop()
+        logging.info('Successfully stopped minifi docker container "%s"', self.name)
+        self.deployed = False
+
+    def kill(self):
+        logging.info('Killing minifi docker container "%s"...', self.name)
+        self.client.containers.get(self.name).kill()
+        logging.info('Successfully killed minifi docker container "%s"', self.name)
+        self.deployed = False
+
+    def restart(self):
+        logging.info('Restarting minifi docker container "%s"...', self.name)
+        self.client.containers.get(self.name).restart()
+        logging.info('Successfully restarted minifi docker container "%s"', self.name)
+        self.deployed = True
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index fd99e61ea..5ee368cfb 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -142,6 +142,30 @@ class SingleNodeDockerCluster(Cluster):
 
         self.containers[name].deploy()
 
-    def deploy_flow(self):
+    def deploy_flow(self, container_name=None):
+        if container_name is not None:
+            if container_name not in self.containers:
+                logging.error('Could not start container because it is not found: \'%s\'', container_name)
+                return
+            self.containers[container_name].deploy()
+            return
         for container in self.containers.values():
             container.deploy()
+
+    def stop_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].stop()
+
+    def kill_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not kill container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].kill()
+
+    def restart_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not restart container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].restart()
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 3b4f43ace..8fad64466 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -267,6 +267,7 @@ def step_impl(context):
 
 
 @given("a file with the content \"{content}\" is present in \"{path}\"")
+@then("a file with the content \"{content}\" is placed in \"{path}\"")
 def step_impl(context, content, path):
     context.test.add_test_data(path, content)
 
@@ -378,6 +379,7 @@ def step_impl(context):
 
 # MQTT setup
 @given("an MQTT broker is set up in correspondence with the PublishMQTT")
+@given("an MQTT broker is set up in correspondence with the ConsumeMQTT")
 @given("an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT")
 def step_impl(context):
     context.test.acquire_container("mqtt-broker", "mqtt-broker")
@@ -572,6 +574,27 @@ def step_impl(context):
     context.test.start()
 
 
+@then("\"{container_name}\" flow is stopped")
+def step_impl(context, container_name):
+    context.test.stop(container_name)
+
+
+@then("\"{container_name}\" flow is killed")
+def step_impl(context, container_name):
+    context.test.kill(container_name)
+
+
+@then("\"{container_name}\" flow is restarted")
+def step_impl(context, container_name):
+    context.test.restart(container_name)
+
+
+@when("\"{container_name}\" flow is started")
+@then("\"{container_name}\" flow is started")
+def step_impl(context, container_name):
+    context.test.start(container_name)
+
+
 @when("content \"{content}\" is added to file \"{file_name}\" present in directory \"{path}\" {seconds:d} seconds later")
 def step_impl(context, content, file_name, path, seconds):
     time.sleep(seconds)
@@ -841,7 +864,12 @@ def step_impl(context, log_message, duration):
 # MQTT
 @then("the MQTT broker has a log line matching \"{log_pattern}\"")
 def step_impl(context, log_pattern):
-    context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 30, count=1)
+    context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 10, count=1)
+
+
+@then("the MQTT broker has {log_count} log lines matching \"{log_pattern}\"")
+def step_impl(context, log_count, log_pattern):
+    context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 10, count=int(log_count))
 
 
 @then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}")
@@ -855,6 +883,11 @@ def step_impl(context):
     context.test.start()
 
 
+@when("the MQTT broker is started")
+def step_impl(context):
+    context.test.start('mqtt-broker')
+
+
 # Google Cloud Storage
 @then('an object with the content \"{content}\" is present in the Google Cloud storage')
 def step_imp(context, content):
diff --git a/extensions/mqtt/CMakeLists.txt b/extensions/mqtt/CMakeLists.txt
index a7738c322..c39500ab1 100644
--- a/extensions/mqtt/CMakeLists.txt
+++ b/extensions/mqtt/CMakeLists.txt
@@ -22,9 +22,9 @@ if(NOT (ENABLE_ALL OR ENABLE_MQTT))
 endif()
 
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-include_directories(./controllerservice ./processors ./protocol ../../libminifi/include  ../../libminifi/include/core)
+include_directories(./processors ../../libminifi/include  ../../libminifi/include/core)
 
-file(GLOB SOURCES "*.cpp" "protocol/*.cpp" "processors/*.cpp" "controllerservice/*.cpp")
+file(GLOB SOURCES "processors/*.cpp")
 
 add_library(minifi-mqtt-extensions SHARED ${SOURCES})
 
@@ -33,5 +33,5 @@ target_link_libraries(minifi-mqtt-extensions ${LIBMINIFI})
 include(PahoMqttC)
 target_link_libraries(minifi-mqtt-extensions paho.mqtt.c)
 
-register_extension(minifi-mqtt-extensions "MQTT EXTENSIONS" MQTT-EXTENSIONS "This Enables MQTT functionality including PublishMQTT/ConsumeMQTT" "${TEST_DIR}/mqtt-tests")
+register_extension(minifi-mqtt-extensions "MQTT EXTENSIONS" MQTT-EXTENSIONS "This Enables MQTT functionality including PublishMQTT/ConsumeMQTT" "${CMAKE_CURRENT_SOURCE_DIR}/tests")
 register_extension_linter(minifi-mqtt-extensions-linter)
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.cpp b/extensions/mqtt/controllerservice/MQTTControllerService.cpp
deleted file mode 100644
index aeb82bcc7..000000000
--- a/extensions/mqtt/controllerservice/MQTTControllerService.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "MQTTControllerService.h"
-
-#include <openssl/err.h>
-#include <openssl/ssl.h>
-#include <string>
-#include <memory>
-#include "core/Property.h"
-#include "core/Resource.h"
-#include "io/validation.h"
-#include "properties/Configure.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-const core::Property MQTTControllerService::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
-const core::Property MQTTControllerService::ClientID("Client ID", "MQTT client ID to use", "");
-const core::Property MQTTControllerService::UserName("Username", "Username to use when connecting to the broker", "");
-const core::Property MQTTControllerService::Password("Password", "Password to use when connecting to the broker", "");
-const core::Property MQTTControllerService::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
-const core::Property MQTTControllerService::ConnectionTimeout("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
-const core::Property MQTTControllerService::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
-const core::Property MQTTControllerService::Topic("Topic", "The topic to publish the message to", "");
-const core::Property MQTTControllerService::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
-
-void MQTTControllerService::initialize() {
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
-  if (initialized_) {
-    return;
-  }
-
-  ControllerService::initialize();
-
-  initializeProperties();
-
-  initialized_ = true;
-}
-
-void MQTTControllerService::onEnable() {
-  for (auto &linked_service : linked_services_) {
-    std::shared_ptr<controllers::SSLContextService> ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(linked_service);
-    if (nullptr != ssl_service) {
-      // security is enabled.
-      ssl_context_service_ = ssl_service;
-    }
-  }
-  if (getProperty(BrokerURL.getName(), uri_) && getProperty(ClientID.getName(), clientID_)) {
-    if (!client_) {
-      MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
-    }
-
-    if (client_) {
-      MQTTClient_setCallbacks(client_, this, reconnectCallback, receiveCallback, deliveryCallback);
-      // call reconnect to bootstrap
-      this->reconnect();
-    }
-  }
-}
-
-void MQTTControllerService::initializeProperties() {
-  setSupportedProperties(properties());
-}
-
-REGISTER_RESOURCE_AS(MQTTControllerService, InternalResource, ("MQTTContextService"));
-
-}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.h b/extensions/mqtt/controllerservice/MQTTControllerService.h
deleted file mode 100644
index 6fee09bab..000000000
--- a/extensions/mqtt/controllerservice/MQTTControllerService.h
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <openssl/err.h>
-#include <openssl/ssl.h>
-
-#include <iostream>
-#include <limits>
-#include <map>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "utils/StringUtils.h"
-#include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "controllers/SSLContextService.h"
-#include "concurrentqueue.h"
-#include "MQTTClient.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-static constexpr const char* const MQTT_QOS_0 = "0";
-static constexpr const char* const MQTT_QOS_1 = "1";
-static constexpr const char* const MQTT_QOS_2 = "2";
-
-class Message {
- public:
-  // empty constructor facilitates moves
-  Message() = default;
-  explicit Message(std::string topic, void *data, size_t dataLen)
-      : topic_(std::move(topic)),
-        data_(reinterpret_cast<std::byte*>(data), (reinterpret_cast<std::byte*>(data) + dataLen)) {
-  }
-
-  Message(const Message &other) = default;
-  Message(Message &&other) = default;
-
-  ~Message() = default;
-
-  Message &operator=(const Message &other) = default;
-  Message &operator=(Message &&other) = default;
-
-  std::string topic_;
-  std::vector<std::byte> data_;
-};
-
-/**
- * MQTTContextService provides a controller service for MQTT connectivity.
- *
- */
-class MQTTControllerService : public core::controller::ControllerService {
- public:
-  explicit MQTTControllerService(const std::string &name, const utils::Identifier &uuid = {})
-      : ControllerService(name, uuid),
-        initialized_(false),
-        client_(nullptr),
-        qos_(2),
-        ssl_context_service_(nullptr) {
-  }
-
-  explicit MQTTControllerService(const std::string &name, const std::shared_ptr<Configure> &configuration)
-      : ControllerService(name),
-        initialized_(false),
-        client_(nullptr),
-        qos_(2),
-        ssl_context_service_(nullptr) {
-    setConfiguration(configuration);
-    initialize();
-  }
-
-  EXTENSIONAPI static const core::Property BrokerURL;
-  EXTENSIONAPI static const core::Property ClientID;
-  EXTENSIONAPI static const core::Property UserName;
-  EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepLiveInterval;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QOS;
-  EXTENSIONAPI static const core::Property SecurityProtocol;
-  static auto properties() {
-    return std::array{
-      BrokerURL,
-      ClientID,
-      UserName,
-      Password,
-      KeepLiveInterval,
-      ConnectionTimeout,
-      Topic,
-      QOS,
-      SecurityProtocol
-    };
-  }
-
-  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
-
-  void initialize() override;
-
-  void yield() override {
-  }
-
-  int send(const std::string &topic, const std::vector<uint8_t> &data) {
-    int token;
-    MQTTClient_message pubmsg = MQTTClient_message_initializer;
-    const uint8_t *d = data.data();
-    pubmsg.payload = const_cast<uint8_t*>(d);
-    pubmsg.payloadlen = data.size();
-    pubmsg.qos = qos_;
-    pubmsg.retained = 0;
-
-    auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token);
-    if (resp != MQTTCLIENT_SUCCESS) {
-      return -1;
-    }
-    if (qos_ == 0) {
-      std::unique_lock<std::mutex> lock(delivery_mutex_);
-      delivered_[token] = true;
-    }
-    return token;
-  }
-
-  int send(const std::string &topic, gsl::span<const std::byte> data) {
-    gsl_Expects(data.size() < std::numeric_limits<int>::max());
-    int token;
-
-    std::vector<std::byte> copy_because_const_correctness{std::begin(data), std::end(data)};
-    MQTTClient_message pubmsg = MQTTClient_message_initializer;
-    pubmsg.payload = copy_because_const_correctness.data();
-    pubmsg.payloadlen = gsl::narrow<int>(copy_because_const_correctness.size());
-    pubmsg.qos = qos_;
-    pubmsg.retained = 0;
-
-    auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token);
-    if (resp != MQTTCLIENT_SUCCESS) {
-      return -1;
-    }
-
-    if (qos_ == 0) {
-      std::unique_lock<std::mutex> lock(delivery_mutex_);
-      delivered_[token] = true;
-    }
-    return token;
-  }
-
-  bool isRunning() override {
-    return getState() == core::controller::ControllerServiceState::ENABLED;
-  }
-
-  bool isWorkAvailable() override {
-    return false;
-  }
-
-  void onEnable() override;
-
-  void subscribeToTopic(const std::string newTopic) {
-    std::lock_guard<std::mutex> lock(initialization_mutex_);
-    if (topics_.find(newTopic) == topics_.end()) {
-      MQTTClient_subscribe(client_, newTopic.c_str(), qos_);
-      topics_[newTopic].size_approx();
-    }
-  }
-
-  bool waitForDelivery(const uint64_t millisToWait, int token) {
-    std::unique_lock<std::mutex> lock(delivery_mutex_);
-    if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return delivered_[token] == true;})) {
-      bool delivered = delivered_[token];
-      delivered_.erase(token);
-      return delivered;
-    } else {
-      delivered_.erase(token);
-      return false;
-    }
-  }
-
-  bool get(const uint64_t millisToWait, const std::string &topic, std::vector<std::byte> &data) {
-    std::unique_lock<std::mutex> lock(delivery_mutex_);
-    if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return topics_[topic].size_approx() > 0;})) {
-      Message resp;
-      if (topics_[topic].try_dequeue(resp)) {
-        data = std::move(resp.data_);
-        return true;
-      } else {
-        return false;
-      }
-    } else {
-      return false;
-    }
-  }
-
-  bool awaitResponse(const uint64_t millisToWait, int token, const std::string &topic, std::vector<std::byte> &data) {
-    std::unique_lock<std::mutex> lock(delivery_mutex_);
-    if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {
-      return
-      delivered_[token] == true;
-    })) {
-      bool delivered = delivered_[token];
-      if (delivered) {
-        if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return topics_[topic].size_approx() > 0;})) {
-          Message resp;
-          if (topics_[topic].try_dequeue(resp)) {
-            data = std::move(resp.data_);
-            return true;
-          } else {
-            return false;
-          }
-        } else {
-          return false;
-        }
-      }
-      delivered_.erase(token);
-      return delivered;
-    } else {
-      delivered_.erase(token);
-      return false;
-    }
-  }
-
- protected:
-  void acknowledgeDelivery(MQTTClient_deliveryToken token) {
-    std::lock_guard<std::mutex> lock(delivery_mutex_);
-    // locked the mutex
-    auto finder = delivered_.find(token);
-    // only acknowledge delivery if we expect the delivery to occur, otherwise
-    // we won't have any waiters.
-    if (finder != delivered_.end()) {
-      delivered_[token] = true;
-    }
-  }
-
-  void enqueue(const std::string &topic, Message &&message) {
-    std::unique_lock<std::mutex> lock(delivery_mutex_);
-    topics_[topic].enqueue(std::move(message));
-    delivery_notification_.notify_one();
-  }
-
-  static void deliveryCallback(void *context, MQTTClient_deliveryToken dt) {
-    MQTTControllerService *service = reinterpret_cast<MQTTControllerService *>(context);
-    service->acknowledgeDelivery(dt);
-  }
-
-  static int receiveCallback(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
-    MQTTControllerService *service = reinterpret_cast<MQTTControllerService *>(context);
-    std::string topic(topicName, topicLen == 0 ? strlen(topicName) : topicLen);
-    Message queueMessage(topic, message->payload, message->payloadlen);
-    service->enqueue(topic, std::move(queueMessage));
-    MQTTClient_freeMessage(&message);
-    MQTTClient_free(topicName);
-    return 1;
-  }
-  static void reconnectCallback(void *context, char* /*cause*/) {
-    MQTTControllerService *service = reinterpret_cast<MQTTControllerService *>(context);
-    service->reconnect();
-  }
-
-  bool reconnect() {
-    if (!client_)
-      return false;
-    if (MQTTClient_isConnected(client_))
-      return true;
-    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-    conn_opts.keepAliveInterval = std::chrono::duration_cast<std::chrono::seconds>(keepAliveInterval_).count();
-    conn_opts.cleansession = 1;
-    if (!userName_.empty()) {
-      conn_opts.username = userName_.c_str();
-      conn_opts.password = passWord_.c_str();
-    }
-    if (ssl_context_service_ != nullptr)
-      conn_opts.ssl = &sslopts_;
-    if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
-      logger_->log_error("Failed to connect to MQTT broker %s", uri_);
-      return false;
-    }
-
-    if (!topic_.empty()) {
-      std::unique_lock<std::mutex> lock(delivery_mutex_);
-      MQTTClient_subscribe(client_, topic_.c_str(), qos_);
-    }
-    return true;
-  }
-
-  virtual void initializeProperties();
-
-  std::mutex initialization_mutex_;
-  bool initialized_;
-
-  MQTTClient client_;
-  std::string uri_;
-  std::string topic_;
-  std::chrono::milliseconds keepAliveInterval_{0};
-  std::chrono::milliseconds connectionTimeout_{0};
-  int64_t qos_;
-  std::string clientID_;
-  std::string userName_;
-  std::string passWord_;
-
- private:
-  std::map<int, bool> delivered_;
-  std::map<std::string, moodycamel::ConcurrentQueue<Message> > topics_;
-
-  std::mutex delivery_mutex_;
-  std::condition_variable delivery_notification_;
-
-  MQTTClient_SSLOptions sslopts_;
-
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<MQTTControllerService>::getLogger();
-};
-
-}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 95bcc55a8..713b68be4 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -15,147 +15,172 @@
  * limitations under the License.
  */
 #include "AbstractMQTTProcessor.h"
-#include <cstdio>
 #include <memory>
 #include <string>
-#include <cinttypes>
-#include <vector>
+#include <utility>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
-  sslEnabled_ = false;
-  sslopts_ = MQTTClient_SSLOptions_initializer;
-
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
-    uri_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
-  }
-  value = "";
-  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
-    clientID_ = value;
+  if (auto value = context->getProperty(BrokerURI)) {
+    uri_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+  }
+  if (auto value = context->getProperty(ClientID)) {
+    clientID_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
   }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
   }
-  value = "";
-  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
-    userName_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+  if (auto value = context->getProperty(Username)) {
+    username_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
   }
-  value = "";
-  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
-    passWord_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+  if (auto value = context->getProperty(Password)) {
+    password_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
   }
 
-  const auto cleanSession_parsed = [&] () -> std::optional<bool> {
-    std::string property_value;
-    if (!context->getProperty(CleanSession.getName(), property_value)) return std::nullopt;
-    return utils::StringUtils::toBool(property_value);
-  }();
-  if ( cleanSession_parsed ) {
-    cleanSession_ = *cleanSession_parsed;
-    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
+  if (const auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
+    keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()});
   }
 
-  if (auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
-    keepAliveInterval_ = keep_alive_interval->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "] ms", int64_t{keepAliveInterval_.count()});
+  if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
+    max_seg_size_ = {*value};
+    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", max_seg_size_);
   }
 
-  if (auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
-    connectionTimeout_ = connection_timeout->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] ms", int64_t{connectionTimeout_.count()});
+  if (const auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+    connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()});
   }
 
-  value = "";
-  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
-      core::Property::StringToInt(value, valInt)) {
-    qos_ = valInt;
-    logger_->log_debug("AbstractMQTTProcessor: QOS [%" PRId64 "]", qos_);
+  if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+    qos_ = {*value};
+    logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
   }
-  value = "";
 
-  if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
-    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
-      sslEnabled_ = true;
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
-        securityCA_ = value;
-        sslopts_.trustStore = securityCA_.c_str();
+  if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
+    if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslOpts_ = MQTTAsync_SSLOptions_initializer;
+      if (auto value = context->getProperty(SecurityCA)) {
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", *value);
+        securityCA_ = std::move(*value);
+        sslOpts_->trustStore = securityCA_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
-        securityCert_ = value;
-        sslopts_.keyStore = securityCert_.c_str();
+      if (auto value = context->getProperty(SecurityCert)) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", *value);
+        securityCert_ = std::move(*value);
+        sslOpts_->keyStore = securityCert_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
-        securityPrivateKey_ = value;
-        sslopts_.privateKey = securityPrivateKey_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKey)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", *value);
+        securityPrivateKey_ = std::move(*value);
+        sslOpts_->privateKey = securityPrivateKey_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value);
-        securityPrivateKeyPassWord_ = value;
-        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKeyPassword)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", *value);
+        securityPrivateKeyPassword_ = std::move(*value);
+        sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
       }
     }
   }
+
+  if (auto last_will_topic = context->getProperty(LastWillTopic); last_will_topic.has_value() && !last_will_topic->empty()) {
+    last_will_ = MQTTAsync_willOptions_initializer;
+
+    logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [%s]", *last_will_topic);
+    last_will_topic_ = std::move(*last_will_topic);
+    last_will_->topicName = last_will_topic_.c_str();
+
+    if (auto value = context->getProperty(LastWillMessage)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Message [%s]", *value);
+      last_will_message_ = std::move(*value);
+      last_will_->message = last_will_message_.c_str();
+    }
+
+    if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]", *value);
+      last_will_qos_ = {*value};
+      last_will_->qos = gsl::narrow<int>(last_will_qos_);
+    }
+
+    if (const auto value = context->getProperty<bool>(LastWillRetain)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", *value);
+      last_will_retain_ = {*value};
+      last_will_->retained = last_will_retain_;
+    }
+  }
+
+  checkProperties();
+
   if (!client_) {
-    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+    if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
+      logger_->log_error("Creating MQTT client failed");
+    }
   }
   if (client_) {
-    MQTTClient_setCallbacks(client_, this, connectionLost, msgReceived, msgDelivered);
+    if (MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, nullptr) == MQTTASYNC_FAILURE) {
+      logger_->log_error("Setting MQTT client callbacks failed");
+      return;
+    }
     // call reconnect to bootstrap
-    this->reconnect();
+    reconnect();
   }
 }
 
-bool AbstractMQTTProcessor::reconnect() {
-  if (!client_)
-    return false;
-  if (MQTTClient_isConnected(client_))
-    return true;
-  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-  conn_opts.keepAliveInterval = std::chrono::duration_cast<std::chrono::seconds>(keepAliveInterval_).count();
-  conn_opts.cleansession = cleanSession_;
-  if (!userName_.empty()) {
-    conn_opts.username = userName_.c_str();
-    conn_opts.password = passWord_.c_str();
-  }
-  if (sslEnabled_) {
-    conn_opts.ssl = &sslopts_;
-  }
-  int ret = MQTTClient_connect(client_, &conn_opts);
-  if (ret != MQTTCLIENT_SUCCESS) {
-    logger_->log_error("Failed to connect to MQTT broker %s (%d)", uri_, ret);
-    return false;
-  }
-  if (isSubscriber_) {
-    ret = MQTTClient_subscribe(client_, topic_.c_str(), qos_);
-    if (ret != MQTTCLIENT_SUCCESS) {
-      logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
-      return false;
-    }
-    logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
+void AbstractMQTTProcessor::reconnect() {
+  if (!client_) {
+    logger_->log_error("MQTT client is not existing while trying to reconnect");
+    return;
+  }
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
+  conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
+  conn_opts.cleansession = getCleanSession();
+  conn_opts.context = this;
+  conn_opts.onSuccess = connectionSuccess;
+  conn_opts.onFailure = connectionFailure;
+  conn_opts.connectTimeout = gsl::narrow<int>(connection_timeout_.count());
+  if (!username_.empty()) {
+    conn_opts.username = username_.c_str();
+    conn_opts.password = password_.c_str();
+  }
+  if (sslOpts_) {
+    conn_opts.ssl = &*sslOpts_;
+  }
+  if (last_will_) {
+    conn_opts.will = &*last_will_;
+  }
+
+  logger_->log_info("Reconnecting to %s", uri_);
+  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret);
+  }
+}
+
+void AbstractMQTTProcessor::freeResources() {
+  if (client_ && MQTTAsync_isConnected(client_)) {
+    MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer;
+    disconnect_options.context = this;
+    disconnect_options.onSuccess = disconnectionSuccess;
+    disconnect_options.onFailure = disconnectionFailure;
+    disconnect_options.timeout = gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
+    MQTTAsync_disconnect(client_, &disconnect_options);
+  }
+  if (client_) {
+    MQTTAsync_destroy(&client_);
   }
-  return true;
 }
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index 4da1f2559..24063dc73 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -16,130 +16,196 @@
  */
 #pragma once
 
-#include <string>
+#include <limits>
 #include <memory>
+#include <string>
 #include <vector>
 
-#include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
+#include "MQTTAsync.h"
 
 namespace org::apache::nifi::minifi::processors {
 
-static constexpr const char* const MQTT_QOS_0 = "0";
-static constexpr const char* const MQTT_QOS_1 = "1";
-static constexpr const char* const MQTT_QOS_2 = "2";
+static constexpr uint8_t MQTT_QOS_0 = 0;
+static constexpr uint8_t MQTT_QOS_1 = 1;
+static constexpr uint8_t MQTT_QOS_2 = 2;
 
-static constexpr const char* const MQTT_SECURITY_PROTOCOL_PLAINTEXT = "plaintext";
 static constexpr const char* const MQTT_SECURITY_PROTOCOL_SSL = "ssl";
 
 class AbstractMQTTProcessor : public core::Processor {
  public:
   explicit AbstractMQTTProcessor(const std::string& name, const utils::Identifier& uuid = {})
       : core::Processor(name, uuid) {
-    client_ = nullptr;
-    cleanSession_ = false;
-    qos_ = 0;
-    isSubscriber_ = false;
   }
 
   ~AbstractMQTTProcessor() override {
-    if (isSubscriber_) {
-      MQTTClient_unsubscribe(client_, topic_.c_str());
-    }
-    if (client_ && MQTTClient_isConnected(client_)) {
-      MQTTClient_disconnect(client_, std::chrono::milliseconds{connectionTimeout_}.count());
-    }
-    if (client_)
-      MQTTClient_destroy(&client_);
+    freeResources();
   }
 
-  EXTENSIONAPI static const core::Property BrokerURL;
+  EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
-  EXTENSIONAPI static const core::Property UserName;
-  EXTENSIONAPI static const core::Property PassWord;
-  EXTENSIONAPI static const core::Property CleanSession;
-  EXTENSIONAPI static const core::Property KeepLiveInterval;
+  EXTENSIONAPI static const core::Property Username;
+  EXTENSIONAPI static const core::Property Password;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property MaxFlowSegSize;
   EXTENSIONAPI static const core::Property ConnectionTimeout;
   EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QOS;
+  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
-  EXTENSIONAPI static const core::Property SecurityPrivateKeyPassWord;
-  static auto properties() {
+  EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+
+  EXTENSIONAPI static auto properties() {
     return std::array{
-      BrokerURL,
-      ClientID,
-      UserName,
-      PassWord,
-      CleanSession,
-      KeepLiveInterval,
-      ConnectionTimeout,
-      Topic,
-      QOS,
-      SecurityProtocol,
-      SecurityCA,
-      SecurityCert,
-      SecurityPrivateKey,
-      SecurityPrivateKeyPassWord
+            BrokerURI,
+            Topic,
+            ClientID,
+            QoS,
+            ConnectionTimeout,
+            KeepAliveInterval,
+            MaxFlowSegSize,
+            LastWillTopic,
+            LastWillMessage,
+            LastWillQoS,
+            LastWillRetain,
+            Username,
+            Password,
+            SecurityProtocol,
+            SecurityCA,
+            SecurityCert,
+            SecurityPrivateKey,
+            SecurityPrivateKeyPassword
     };
   }
 
- public:
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
-
-  // MQTT async callbacks
-  static void msgDelivered(void *context, MQTTClient_deliveryToken dt) {
-    AbstractMQTTProcessor *processor = reinterpret_cast<AbstractMQTTProcessor *>(context);
-    processor->delivered_token_ = dt;
-  }
-  static int msgReceived(void *context, char *topicName, int /*topicLen*/, MQTTClient_message *message) {
-    AbstractMQTTProcessor *processor = reinterpret_cast<AbstractMQTTProcessor *>(context);
-    if (processor->isSubscriber_) {
-      if (!processor->enqueueReceiveMQTTMsg(message))
-        MQTTClient_freeMessage(&message);
-    } else {
-      MQTTClient_freeMessage(&message);
-    }
-    MQTTClient_free(topicName);
-    return 1;
-  }
-  static void connectionLost(void *context, char* /*cause*/) {
-    AbstractMQTTProcessor *processor = reinterpret_cast<AbstractMQTTProcessor *>(context);
-    processor->reconnect();
-  }
-  bool reconnect();
-  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message* /*message*/) {
-    return false;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+
+  void notifyStop() override {
+    freeResources();
   }
 
  protected:
-  MQTTClient client_;
-  MQTTClient_deliveryToken delivered_token_;
+  void reconnect();
+
+  MQTTAsync client_ = nullptr;
   std::string uri_;
   std::string topic_;
-  std::chrono::milliseconds keepAliveInterval_ = std::chrono::seconds(60);
-  std::chrono::milliseconds connectionTimeout_ = std::chrono::seconds(30);
-  int64_t qos_;
-  bool cleanSession_;
+  std::chrono::seconds keep_alive_interval_{60};
+  uint64_t max_seg_size_ = std::numeric_limits<uint64_t>::max();
+  std::chrono::seconds connection_timeout_{30};
+  uint32_t qos_ = MQTT_QOS_1;
   std::string clientID_;
-  std::string userName_;
-  std::string passWord_;
-  bool isSubscriber_;
+  std::string username_;
+  std::string password_;
 
  private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AbstractMQTTProcessor>::getLogger();
-  MQTTClient_SSLOptions sslopts_;
-  bool sslEnabled_;
+  // MQTT async callback
+  static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message) {
+    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+    processor->onMessageReceived(topic_name, topic_len, message);
+    return 1;
+  }
+
+  // MQTT async callback
+  static void connectionLost(void *context, char* cause) {
+    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+    processor->onConnectionLost(cause);
+  }
+
+  // MQTT async callback
+  static void connectionSuccess(void* context, MQTTAsync_successData* response) {
+    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+    processor->onConnectionSuccess(response);
+  }
+
+  // MQTT async callback
+  static void connectionFailure(void* context, MQTTAsync_failureData* response) {
+    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+    processor->onConnectionFailure(response);
+  }
+
+  // MQTT async callback
+  static void disconnectionSuccess(void* context, MQTTAsync_successData* response) {
+    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+    processor->onDisconnectionSuccess(response);
+  }
+
+  // MQTT async callback
+  static void disconnectionFailure(void* context, MQTTAsync_failureData* response) {
+    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+    processor->onDisconnectionFailure(response);
+  }
+
+  virtual void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
+    MQTTAsync_freeMessage(&message);
+    MQTTAsync_free(topic_name);
+  }
+
+  void onConnectionLost(char* cause) {
+    logger_->log_error("Connection lost to MQTT broker %s", uri_);
+    if (cause != nullptr) {
+      logger_->log_error("Cause for connection loss: %s", cause);
+    }
+  }
+
+  void onConnectionSuccess(MQTTAsync_successData* /*response*/) {
+    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
+    startupClient();
+  }
+
+  void onConnectionFailure(MQTTAsync_failureData* response) {
+    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, response->code);
+    if (response->message != nullptr) {
+      logger_->log_error("Detailed reason for connection failure: %s", response->message);
+    }
+  }
+
+  void onDisconnectionSuccess(MQTTAsync_successData* /*response*/) {
+    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
+  }
+
+  void onDisconnectionFailure(MQTTAsync_failureData* response) {
+    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, response->code);
+    if (response->message != nullptr) {
+      logger_->log_error("Detailed reason for disconnection failure: %s", response->message);
+    }
+  }
+
+  virtual bool getCleanSession() const = 0;
+  virtual bool startupClient() = 0;
+
+  void freeResources();
+
+  /**
+   * Checks property consistency before connecting to broker
+   */
+  virtual void checkProperties() {
+  }
+
+  // SSL
+  std::optional<MQTTAsync_SSLOptions> sslOpts_;
   std::string securityCA_;
   std::string securityCert_;
   std::string securityPrivateKey_;
-  std::string securityPrivateKeyPassWord_;
+  std::string securityPrivateKeyPassword_;
+
+  // Last Will
+  std::optional<MQTTAsync_willOptions> last_will_;
+  std::string last_will_topic_;
+  std::string last_will_message_;
+  uint32_t last_will_qos_ = MQTT_QOS_1;
+  bool last_will_retain_ = false;
+
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AbstractMQTTProcessor>::getLogger();
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp b/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp
index 3c2bf433d..cfc7ee7ea 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp
@@ -17,6 +17,7 @@
 #include "AbstractMQTTProcessor.h"
 #include "ConsumeMQTT.h"
 #include "PublishMQTT.h"
+#include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
 // FIXME(fgerlits): we need to put all these static definitions in a single file so that they are executed in this order at runtime
@@ -26,26 +27,48 @@ namespace org::apache::nifi::minifi::processors {
 
 // AbstractMQTTProcessor
 
-const core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
-const core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
-const core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
-const core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
-const core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
-const core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
-const core::Property AbstractMQTTProcessor::ConnectionTimeout("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
-const core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
-const core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
+const core::Property AbstractMQTTProcessor::BrokerURI(
+  core::PropertyBuilder::createProperty("Broker URI")->
+    withDescription("The URI to use to connect to the MQTT broker")->
+    isRequired(true)->
+    supportsExpressionLanguage(true)->
+    build());
+
+const core::Property AbstractMQTTProcessor::ClientID(
+        core::PropertyBuilder::createProperty("Client ID")->
+        withDescription("MQTT client ID to use")->
+        supportsExpressionLanguage(true)->
+        build());
+
+const core::Property AbstractMQTTProcessor::Topic(
+  core::PropertyBuilder::createProperty("Topic")->
+    withDescription("The topic to publish or subscribe to")->
+    isRequired(true)->
+    supportsExpressionLanguage(true)->
+    build());
+
+const core::Property AbstractMQTTProcessor::QoS("Quality of Service",
+                                                "The Quality of Service (QoS) to send or receive the message with. Accepts three values '0', '1' and '2'", std::to_string(MQTT_QOS_0));
+const core::Property AbstractMQTTProcessor::KeepAliveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
+const core::Property AbstractMQTTProcessor::ConnectionTimeout("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT broker", "30 sec");
+const core::Property AbstractMQTTProcessor::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
+const core::Property AbstractMQTTProcessor::Username("Username", "Username to use when connecting to the broker", "");
+const core::Property AbstractMQTTProcessor::Password("Password", "Password to use when connecting to the broker", "");
 const core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
 const core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", "");
 const core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
 const core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
-const core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
-
+const core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassword("Security Pass Phrase", "Private key passphrase", "");
+const core::Property AbstractMQTTProcessor::LastWillTopic("Last Will Topic", "The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent", "");
+const core::Property AbstractMQTTProcessor::LastWillMessage("Last Will Message",
+                                                            "The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker", "");
+const core::Property AbstractMQTTProcessor::LastWillQoS("Last Will QoS", "The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'", std::to_string(MQTT_QOS_0));
+const core::Property AbstractMQTTProcessor::LastWillRetain("Last Will Retain", "Whether to retain the client's Last Will", "false");
 
 // ConsumeMQTT
 
-const core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
-const core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", "");
+const core::Property ConsumeMQTT::CleanSession("Clean Session", "Whether to start afresh rather than remembering previous subscriptions.", "true");
+const core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", "1000");
 
 const core::Relationship ConsumeMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
 
@@ -54,11 +77,10 @@ REGISTER_RESOURCE(ConsumeMQTT, Processor);
 
 // PublishMQTT
 
-const core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in broker", "false");
-const core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
+const core::Property PublishMQTT::Retain("Retain", "Retain published message in broker", "false");
 
 const core::Relationship PublishMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
-const core::Relationship PublishMQTT::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
+const core::Relationship PublishMQTT::Failure("failure", "FlowFiles that failed to be sent to the destination are transferred to this relationship");
 
 REGISTER_RESOURCE(PublishMQTT, Processor);
 
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index fe24538f0..2de3fdf08 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -16,15 +16,12 @@
  */
 #include "ConsumeMQTT.h"
 
-#include <cstdio>
-#include <algorithm>
 #include <memory>
 #include <string>
-#include <map>
+#include <set>
 #include <cinttypes>
 #include <vector>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -37,48 +34,53 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxQueueSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+  if (const auto value = context->getProperty<bool>(CleanSession)) {
+    cleanSession_ = *value;
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_);
+
+  if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
+    maxQueueSize_ = *value;
+    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
   }
+
+  // this connects to broker, so properties of this processor must be read before
+  AbstractMQTTProcessor::onSchedule(context, factory);
 }
 
 void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if necessary
-  if (!reconnect()) {
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
     yield();
+    return;
   }
 
-  std::deque<MQTTClient_message *> msg_queue;
+  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
   getReceivedMQTTMsg(msg_queue);
   while (!msg_queue.empty()) {
-    MQTTClient_message *message = msg_queue.front();
+    const auto& message = msg_queue.front();
     std::shared_ptr<core::FlowFile> processFlowFile = session->create();
     int write_status{};
-    session->write(processFlowFile, [message, &write_status](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
       if (message->payloadlen < 0) {
         write_status = -1;
         return -1;
@@ -99,9 +101,44 @@ void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
       logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
       session->transfer(processFlowFile, Success);
     }
-    MQTTClient_freeMessage(&message);
     msg_queue.pop_front();
   }
 }
 
+bool ConsumeMQTT::startupClient() {
+  MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
+  response_options.context = this;
+  response_options.onSuccess = subscriptionSuccess;
+  response_options.onFailure = subscriptionFailure;
+  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_), &response_options);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
+    return false;
+  }
+  logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
+  return true;
+}
+
+void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
+  MQTTAsync_free(topic_name);
+
+  const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
+  const size_t msgLen = message->payloadlen;
+  const std::string messageText(msgPayload, msgLen);
+  logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s", messageText, topic_, uri_);
+
+  std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> smartMessage(message);
+  enqueueReceivedMQTTMsg(std::move(smartMessage));
+}
+
+void ConsumeMQTT::checkProperties() {
+  if (!cleanSession_ && clientID_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions");
+  }
+  if (!cleanSession_ && qos_ == 0) {
+    logger_->log_warn("Messages are not preserved during client disconnection "
+                      "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.");
+  }
+}
+
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index c49884d81..bed71bc27 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -16,11 +16,12 @@
  */
 #pragma once
 
+#include <deque>
 #include <limits>
-#include <string>
 #include <memory>
+#include <string>
+#include <utility>
 
-#include <deque>
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -28,7 +29,6 @@
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "concurrentqueue.h"
-#include "MQTTClient.h"
 #include "AbstractMQTTProcessor.h"
 #include "utils/ArrayUtils.h"
 #include "utils/gsl.h"
@@ -42,25 +42,18 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
  public:
   explicit ConsumeMQTT(const std::string& name, const utils::Identifier& uuid = {})
       : processors::AbstractMQTTProcessor(name, uuid) {
-    isSubscriber_ = true;
     maxQueueSize_ = 100;
-    maxSegSize_ = ULLONG_MAX;
-  }
-  ~ConsumeMQTT() override {
-    MQTTClient_message *message;
-    while (queue_.try_dequeue(message)) {
-      MQTTClient_freeMessage(&message);
-    }
   }
 
   EXTENSIONAPI static constexpr const char* Description = "This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. "
       "The the payload of the MQTT message becomes content of a FlowFile";
 
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
+  EXTENSIONAPI static const core::Property CleanSession;
   EXTENSIONAPI static const core::Property QueueBufferMaxMessage;
+
   static auto properties() {
     return utils::array_cat(AbstractMQTTProcessor::properties(), std::array{
-      MaxFlowSegSize,
+      CleanSession,
       QueueBufferMaxMessage
     });
   }
@@ -78,22 +71,60 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
-  bool enqueueReceiveMQTTMsg(MQTTClient_message *message) override;
 
- protected:
-  void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
-    MQTTClient_message *message;
+ private:
+  struct MQTTMessageDeleter {
+    void operator()(MQTTAsync_message* message) {
+      MQTTAsync_freeMessage(&message);
+    }
+  };
+
+  void getReceivedMQTTMsg(std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>>& msg_queue) {
+    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message;
     while (queue_.try_dequeue(message)) {
-      msg_queue.push_back(message);
+      msg_queue.push_back(std::move(message));
     }
   }
 
- private:
+  // MQTT async callback
+  static void subscriptionSuccess(void* context, MQTTAsync_successData* response) {
+    auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
+    processor->onSubscriptionSuccess(response);
+  }
+
+  // MQTT async callback
+  static void subscriptionFailure(void* context, MQTTAsync_failureData* response) {
+    auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
+    processor->onSubscriptionFailure(response);
+  }
+
+  void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) {
+    logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", topic_, uri_);
+  }
+
+  void onSubscriptionFailure(MQTTAsync_failureData* response) {
+    logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code);
+    if (response->message != nullptr) {
+      logger_->log_error("Detailed reason for subscription failure: %s", response->message);
+    }
+  }
+
+  bool getCleanSession() const override {
+    return cleanSession_;
+  }
+
+  void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) override;
+
+  void enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message);
+
+  bool startupClient() override;
+
+  void checkProperties() override;
+
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConsumeMQTT>::getLogger();
-  std::mutex mutex_;
+  bool cleanSession_ = true;
   uint64_t maxQueueSize_;
-  uint64_t maxSegSize_;
-  moodycamel::ConcurrentQueue<MQTTClient_message *> queue_;
+  moodycamel::ConcurrentQueue<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> queue_;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertBase.cpp b/extensions/mqtt/processors/ConvertBase.cpp
deleted file mode 100644
index db9e2be98..000000000
--- a/extensions/mqtt/processors/ConvertBase.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <memory>
-#include <string>
-#include "core/ProcessContext.h"
-#include "ConvertBase.h"
-#include "c2/PayloadSerializer.h"
-
-namespace org::apache::nifi::minifi::processors {
-
-const core::Relationship ConvertBase::Success("success", "All files are routed to success");
-
-void ConvertBase::initialize() {
-  setSupportedProperties(properties());
-  setSupportedRelationships(relationships());
-}
-
-void ConvertBase::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
-  std::string controller_service_name;
-  if (context->getProperty(MQTTControllerService.getName(), controller_service_name) && !controller_service_name.empty()) {
-    auto service = context->getControllerService(controller_service_name);
-    mqtt_service_ = std::static_pointer_cast<controllers::MQTTControllerService>(service);
-  }
-  context->getProperty(ListeningTopic.getName(), listening_topic);
-  if (!listening_topic.empty()) {
-    mqtt_service_->subscribeToTopic(listening_topic);
-  }
-}
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertBase.h b/extensions/mqtt/processors/ConvertBase.h
deleted file mode 100644
index 392dfe831..000000000
--- a/extensions/mqtt/processors/ConvertBase.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <string>
-#include <memory>
-
-#include "MQTTControllerService.h"
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Property.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
-#include "c2/protocols/RESTProtocol.h"
-
-namespace org::apache::nifi::minifi::processors {
-
-/**
- * Purpose: Provides base functionality for mqtt conversion classes.
- * At a minimum we need a controller service and listening topic.
- */
-class ConvertBase : public core::Processor, public minifi::c2::RESTProtocol {
- public:
-  explicit ConvertBase(const std::string& name, const utils::Identifier& uuid = {})
-      : core::Processor(name, uuid) {
-  }
-  virtual ~ConvertBase() = default;
-
-  EXTENSIONAPI static const core::Property MQTTControllerService;
-  EXTENSIONAPI static const core::Property ListeningTopic;
-  static auto properties() {
-    return std::array{
-      MQTTControllerService,
-      ListeningTopic
-    };
-  }
-
-  static const core::Relationship Success;
-  static auto relationships() { return std::array{Success}; }
-
- public:
-  void initialize() override;
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-
- protected:
-  std::shared_ptr<controllers::MQTTControllerService> mqtt_service_;
-
-  std::string listening_topic;
-};
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertBaseStaticDefinitions.cpp b/extensions/mqtt/processors/ConvertBaseStaticDefinitions.cpp
deleted file mode 100644
index a31b0a1c2..000000000
--- a/extensions/mqtt/processors/ConvertBaseStaticDefinitions.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "ConvertBase.h"
-#include "ConvertHeartBeat.h"
-#include "ConvertJSONAck.h"
-#include "ConvertUpdate.h"
-#include "core/Resource.h"
-
-// FIXME(fgerlits): we need to put all these static definitions in a single file so that they are executed in this order at runtime
-// when https://issues.apache.org/jira/browse/MINIFICPP-1825 is closed, these definitions should be moved back to the cpp file of the class to which they belong
-
-namespace org::apache::nifi::minifi::processors {
-
-// ConvertBase
-
-const core::Property ConvertBase::MQTTControllerService("MQTT Controller Service", "Name of controller service that will be used for MQTT interactivity", "");
-const core::Property ConvertBase::ListeningTopic("Listening Topic", "Name of topic to listen to", "");
-
-
-// ConvertHeartBeat
-
-REGISTER_RESOURCE(ConvertHeartBeat, InternalResource);
-
-
-// ConvertJSONAck
-
-REGISTER_RESOURCE(ConvertJSONAck, InternalResource);
-
-
-// ConvertUpdate
-
-core::Property ConvertUpdate::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", "");
-
-REGISTER_RESOURCE(ConvertUpdate, InternalResource);
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp b/extensions/mqtt/processors/ConvertHeartBeat.cpp
deleted file mode 100644
index 1324fb127..000000000
--- a/extensions/mqtt/processors/ConvertHeartBeat.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <memory>
-#include <vector>
-
-#include "utils/StringUtils.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/Resource.h"
-#include "ConvertHeartBeat.h"
-#include "c2/PayloadSerializer.h"
-#include "utils/ByteArrayCallback.h"
-namespace org::apache::nifi::minifi::processors {
-
-void ConvertHeartBeat::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  auto ff = session.get();
-  if (ff != nullptr) {
-    logger_->log_error("ConvertHeartBeat does not receive flow files");
-    session->rollback();
-  }
-  if (nullptr == mqtt_service_) {
-    context->yield();
-    return;
-  }
-  std::vector<std::byte> heartbeat;
-  bool received_heartbeat = false;
-  // while we have heartbeats we can continue to loop.
-  while (mqtt_service_->get(100, listening_topic, heartbeat)) {
-    if (heartbeat.size() > 0) {
-      c2::C2Payload payload = c2::PayloadSerializer::deserialize(heartbeat);
-      auto serialized = serializeJsonRootPayload(payload);
-      logger_->log_debug("Converted JSON output %s", serialized);
-      minifi::utils::StreamOutputCallback byteCallback(serialized.size() + 1);
-      byteCallback.write(const_cast<char*>(serialized.c_str()), serialized.size());
-      auto newff = session->create();
-      session->write(newff, std::ref(byteCallback));
-      session->transfer(newff, Success);
-      received_heartbeat = true;
-    } else {
-      break;
-    }
-  }
-  if (!received_heartbeat) {
-    context->yield();
-  }
-}
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.h b/extensions/mqtt/processors/ConvertHeartBeat.h
deleted file mode 100644
index 8a63543b3..000000000
--- a/extensions/mqtt/processors/ConvertHeartBeat.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <string>
-#include <memory>
-
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "ConvertBase.h"
-
-namespace org::apache::nifi::minifi::processors {
-
-/*
- * Purpose: ConvertHeartBeat converts heatbeats into MQTT messages.
- */
-class ConvertHeartBeat: public ConvertBase {
- public:
-  explicit ConvertHeartBeat(const std::string& name, const utils::Identifier& uuid = {})
-    : ConvertBase(name, uuid) {
-  }
-  ~ConvertHeartBeat() override = default;
-
-  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
-  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
-
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
-
- private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConvertHeartBeat>::getLogger();
-};
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertJSONAck.cpp b/extensions/mqtt/processors/ConvertJSONAck.cpp
deleted file mode 100644
index 10d47c897..000000000
--- a/extensions/mqtt/processors/ConvertJSONAck.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConvertJSONAck.h"
-
-#include <memory>
-#include <sstream>
-#include <string>
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/Resource.h"
-#include "c2/PayloadSerializer.h"
-
-namespace org::apache::nifi::minifi::processors {
-
-std::string ConvertJSONAck::parseTopicName(const std::string &json) {
-  std::string topic;
-  rapidjson::Document root;
-
-  try {
-    rapidjson::ParseResult ok = root.Parse(json.c_str());
-    if (ok && root.HasMember("agentInfo") && root["agentInfo"].HasMember("identifier")) {
-      std::stringstream topicStr;
-      topicStr << root["agentInfo"]["identifier"].GetString() << "/in";
-      return topicStr.str();
-    }
-  } catch (...) {
-  }
-  return topic;
-}
-
-void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (nullptr == mqtt_service_) {
-    context->yield();
-    return;
-  }
-  auto flow = session->get();
-  if (!flow) {
-    return;
-  }
-
-  /**
-   * This processor expects a JSON response from InvokeHTTP and thus we expect a heartbeat ack following that.
-   * Since we are trailing InvokeHTTP
-   */
-  std::string topic;
-  {
-    // expect JSON response from InvokeHTTP and thus we expect a heartbeat and then the output from the HTTP
-    c2::C2Payload response_payload(c2::Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true);
-    const auto read_result = session->readBuffer(flow);
-    topic = parseTopicName(to_string(read_result));
-    session->transfer(flow, Success);
-  }
-
-  flow = session->get();
-  if (!flow) {
-    return;
-  }
-
-  if (!topic.empty()) {
-    const auto read_result = session->readBuffer(flow);
-    c2::C2Payload response_payload(c2::Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true);
-    const auto payload = parseJsonResponse(response_payload, read_result.buffer);
-    const auto stream = c2::PayloadSerializer::serialize(1, payload);
-    mqtt_service_->send(topic, stream->getBuffer());
-  }
-
-  session->transfer(flow, Success);
-}
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
deleted file mode 100644
index 9496e1461..000000000
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <vector>
-#include <string>
-#include <memory>
-
-#include "FlowFileRecord.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "ConvertBase.h"
-
-namespace org::apache::nifi::minifi::processors {
-
-class ConvertJSONAck : public ConvertBase {
- public:
-  explicit ConvertJSONAck(const std::string& name, const utils::Identifier& uuid = {})
-      : ConvertBase(name, uuid) {
-  }
-  ~ConvertJSONAck() override = default;
-
-  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
-  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
-
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
-
- protected:
-  /**
-   * Parse Topic name from the json -- given a known structure that we expect.
-   * @param json json representation defined by the restful protocol
-   */
-  static std::string parseTopicName(const std::string &json);
-
- private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConvertJSONAck>::getLogger();
-};
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp
deleted file mode 100644
index dcbef511b..000000000
--- a/extensions/mqtt/processors/ConvertUpdate.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <string>
-#include <memory>
-#include <vector>
-#include <algorithm>
-
-#include "ConvertUpdate.h"
-#include "utils/HTTPClient.h"
-#include "io/BaseStream.h"
-#include "io/BufferStream.h"
-#include "core/Resource.h"
-
-namespace org::apache::nifi::minifi::processors {
-
-void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  if (nullptr == mqtt_service_) {
-    context->yield();
-    return;
-  }
-  std::vector<std::byte> update;
-  bool received_update = false;
-  while (mqtt_service_->get(100, listening_topic, update)) {
-    // first we have the input topic string followed by the update URI
-    if (!update.empty()) {
-      io::BufferStream stream(update);
-
-      std::string returnTopic;
-      std::string url;
-
-      if (returnTopic.empty() || url.empty()) {
-        logger_->log_debug("topic and/or URL are empty");
-        break;
-      }
-
-      stream.read(returnTopic);
-      stream.read(url);
-
-      /**
-       * Not having curl support is actually okay for MQTT to be built, but running the update processor requires
-       * that we have curl available.
-       */
-      auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
-      if (nullptr == client_ptr) {
-        logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
-        return;
-      }
-      std::unique_ptr<utils::BaseHTTPClient> client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
-      client->initialize("GET");
-      client->setConnectionTimeout(std::chrono::milliseconds(2000));
-      client->setReadTimeout(std::chrono::milliseconds(2000));
-
-      if (client->submit()) {
-        auto data = client->getResponseBody();
-        std::vector<uint8_t> raw_data;
-        std::transform(std::begin(data), std::end(data), std::back_inserter(raw_data), [](char c) {
-          return (uint8_t)c;
-        });
-        mqtt_service_->send(returnTopic, raw_data);
-      }
-
-      received_update = true;
-    } else {
-      break;
-    }
-  }
-
-  if (!received_update) {
-    context->yield();
-  }
-}
-
-void ConvertUpdate::initialize() {
-  setSupportedProperties(properties());
-  setSupportedRelationships(relationships());
-}
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConvertUpdate.h b/extensions/mqtt/processors/ConvertUpdate.h
deleted file mode 100644
index 8a50e1b6e..000000000
--- a/extensions/mqtt/processors/ConvertUpdate.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <string>
-#include <memory>
-
-#include "MQTTControllerService.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Property.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "ConvertBase.h"
-#include "utils/ArrayUtils.h"
-
-namespace org::apache::nifi::minifi::processors {
-
-/**
- * Purpose: Converts update messages into the appropriate Restful call
- *
- * Justification: The other protocol classes are responsible for standard messaging, which carries most
- * heartbeat related activity; however, updates generally connect to a different source. This processor will
- * retrieve the updates and respond via MQTT.
- */
-class ConvertUpdate : public ConvertBase {
- public:
-  explicit ConvertUpdate(const std::string& name, const utils::Identifier& uuid = {})
-    : ConvertBase(name, uuid) {
-  }
-  ~ConvertUpdate() override = default;
-
-  EXTENSIONAPI static core::Property SSLContext;
-  static auto properties() { return utils::array_cat(ConvertBase::properties(), std::array{SSLContext}); }
-
-  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
-  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
-
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-
-  void initialize() override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
-
- protected:
-  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
-
- private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConvertUpdate>::getLogger();
-};
-
-}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index bf0afbe6b..e73544016 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -16,16 +16,12 @@
  */
 #include "PublishMQTT.h"
 
-#include <algorithm>
 #include <cinttypes>
-#include <cstdio>
-#include <map>
 #include <memory>
 #include <optional>
 #include <string>
 #include <vector>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -39,39 +35,31 @@ void PublishMQTT::initialize() {
 }
 
 void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    max_seg_size_ = valInt;
-    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", max_seg_size_);
+  if (const auto retain_opt = context->getProperty<bool>(Retain)) {
+    retain_ = *retain_opt;
   }
+  logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  const auto retain_parsed = [&] () -> std::optional<bool> {
-    std::string property_value;
-    if (!context->getProperty(CleanSession.getName(), property_value)) return std::nullopt;
-    return utils::StringUtils::toBool(property_value);
-  }();
-  if ( retain_parsed ) {
-    retain_ = *retain_parsed;
-    logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
-  }
+  AbstractMQTTProcessor::onSchedule(context, factory);
 }
 
 void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!reconnect()) {
-    logger_->log_error("MQTT connect to %s failed", uri_);
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+    logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_);
     yield();
     return;
   }
+
   std::shared_ptr<core::FlowFile> flowFile = session->get();
 
   if (!flowFile) {
     return;
   }
 
-  PublishMQTT::ReadCallback callback(flowFile->getSize(), max_seg_size_, topic_, client_, qos_, retain_, delivered_token_);
+  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow<int>(qos_), retain_);
   session->read(flowFile, std::ref(callback));
   if (callback.status_ < 0) {
     logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
@@ -82,4 +70,40 @@ void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
   }
 }
 
+int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
+  if (flow_size_ < max_seg_size_)
+    max_seg_size_ = flow_size_;
+  gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
+  std::vector<std::byte> buffer(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+    // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
+    const auto readRet = stream->read(buffer);
+    if (io::isError(readRet)) {
+      status_ = -1;
+      return gsl::narrow<int64_t>(read_size_);
+    }
+    if (readRet > 0) {
+      MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
+      pubmsg.payload = buffer.data();
+      pubmsg.payloadlen = gsl::narrow<int>(readRet);
+      pubmsg.qos = qos_;
+      pubmsg.retained = retain_;
+      MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
+      response_options.context = processor_;
+      response_options.onSuccess = PublishMQTT::sendSuccess;
+      response_options.onFailure = PublishMQTT::sendFailure;
+      if (MQTTAsync_sendMessage(client_, topic_.c_str(), &pubmsg, &response_options) != MQTTASYNC_SUCCESS) {
+        status_ = -1;
+        return -1;
+      }
+      read_size_ += gsl::narrow<size_t>(readRet);
+    } else {
+      break;
+    }
+  }
+  return gsl::narrow<int64_t>(read_size_);
+}
+
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index 0b0e2711b..f940b73b5 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -19,6 +19,7 @@
 #include <vector>
 #include <string>
 #include <memory>
+#include <utility>
 
 #include <limits>
 
@@ -28,7 +29,6 @@
 #include "core/Core.h"
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
 #include "AbstractMQTTProcessor.h"
 #include "utils/ArrayUtils.h"
 #include "utils/gsl.h"
@@ -39,19 +39,15 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
  public:
   explicit PublishMQTT(const std::string& name, const utils::Identifier& uuid = {})
       : processors::AbstractMQTTProcessor(name, uuid) {
-    retain_ = false;
-    max_seg_size_ = ULLONG_MAX;
   }
-  ~PublishMQTT() override = default;
 
   EXTENSIONAPI static constexpr const char* Description = "PublishMQTT serializes FlowFile content as an MQTT payload, sending the message to the configured topic and broker.";
 
   EXTENSIONAPI static const core::Property Retain;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
+
   static auto properties() {
     return utils::array_cat(AbstractMQTTProcessor::properties(), std::array{
-      Retain,
-      MaxFlowSegSize
+      Retain
     });
   }
 
@@ -68,68 +64,70 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
 
   class ReadCallback {
    public:
-    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client, int qos, bool retain, MQTTClient_deliveryToken &token)
-        : flow_size_(flow_size),
+    ReadCallback(PublishMQTT* processor, uint64_t flow_size, uint64_t max_seg_size, std::string topic, MQTTAsync client, int qos, bool retain)
+        : processor_(processor),
+          flow_size_(flow_size),
           max_seg_size_(max_seg_size),
-          key_(key),
+          topic_(std::move(topic)),
           client_(client),
           qos_(qos),
-          retain_(retain),
-          token_(token) {
-      status_ = 0;
-      read_size_ = 0;
-    }
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
-      if (flow_size_ < max_seg_size_)
-        max_seg_size_ = flow_size_;
-      gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
-      std::vector<std::byte> buffer(max_seg_size_);
-      read_size_ = 0;
-      status_ = 0;
-      while (read_size_ < flow_size_) {
-        // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-        const auto readRet = stream->read(buffer);
-        if (io::isError(readRet)) {
-          status_ = -1;
-          return gsl::narrow<int64_t>(read_size_);
-        }
-        if (readRet > 0) {
-          MQTTClient_message pubmsg = MQTTClient_message_initializer;
-          pubmsg.payload = buffer.data();
-          pubmsg.payloadlen = gsl::narrow<int>(readRet);
-          pubmsg.qos = qos_;
-          pubmsg.retained = retain_;
-          if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) {
-            status_ = -1;
-            return -1;
-          }
-          read_size_ += gsl::narrow<size_t>(readRet);
-        } else {
-          break;
-        }
-      }
-      return gsl::narrow<int64_t>(read_size_);
+          retain_(retain) {
     }
+
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
+
+    size_t read_size_ = 0;
+    int status_ = 0;
+
+   private:
+    PublishMQTT* processor_;
     uint64_t flow_size_;
     uint64_t max_seg_size_;
-    std::string key_;
-    MQTTClient client_;
+    std::string topic_;
+    MQTTAsync client_;
 
-    int status_;
-    size_t read_size_;
     int qos_;
-    int retain_;
-    MQTTClient_deliveryToken &token_;
+    bool retain_;
   };
 
- public:
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
 
  private:
-  uint64_t max_seg_size_;
-  bool retain_;
+  // MQTT async callback
+  static void sendSuccess(void* context, MQTTAsync_successData* response) {
+    auto* processor = reinterpret_cast<PublishMQTT*>(context);
+    processor->onSendSuccess(response);
+  }
+
+  // MQTT async callback
+  static void sendFailure(void* context, MQTTAsync_failureData* response) {
+    auto* processor = reinterpret_cast<PublishMQTT*>(context);
+    processor->onSendFailure(response);
+  }
+
+  void onSendSuccess(MQTTAsync_successData* /*response*/) {
+    logger_->log_debug("Successfully sent message to MQTT topic %s on broker %s", topic_, uri_);
+  }
+
+  void onSendFailure(MQTTAsync_failureData* response) {
+    logger_->log_error("Sending message failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code);
+    if (response->message != nullptr) {
+      logger_->log_error("Detailed reason for sending failure: %s", response->message);
+    }
+  }
+
+  bool getCleanSession() const override {
+    return true;
+  }
+
+  bool startupClient() override {
+    // there is no need to do anything like subscribe on the beginning
+    return true;
+  }
+
+  bool retain_ = false;
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PublishMQTT>::getLogger();
 };
 
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
deleted file mode 100644
index 4c3132edf..000000000
--- a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MQTTC2Protocol.h"
-#include "core/Resource.h"
-#include "properties/Configuration.h"
-
-namespace org::apache::nifi::minifi::c2 {
-
-MQTTC2Protocol::MQTTC2Protocol(const std::string& name, const utils::Identifier& uuid)
-    : C2Protocol(name, uuid) {
-}
-
-MQTTC2Protocol::~MQTTC2Protocol() = default;
-
-void MQTTC2Protocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
-  if (configure->get(minifi::Configuration::nifi_c2_mqtt_connector_service, controller_service_name_)) {
-    auto service = controller->getControllerService(controller_service_name_);
-    mqtt_service_ = std::static_pointer_cast<controllers::MQTTControllerService>(service);
-  } else {
-    mqtt_service_ = nullptr;
-  }
-
-  agent_identifier_ = configure->getAgentIdentifier();
-
-  std::stringstream outputStream;
-  std::string updateTopicOpt;
-  std::string heartbeatTopicOpt;
-  if (configure->get(minifi::Configuration::nifi_c2_mqtt_heartbeat_topic, heartbeatTopicOpt)) {
-    heartbeat_topic_ = heartbeatTopicOpt;
-  } else {
-    heartbeat_topic_ = "heartbeats";  // outputStream.str();
-  }
-  if (configure->get(minifi::Configuration::nifi_c2_mqtt_update_topic, updateTopicOpt)) {
-    update_topic_ = updateTopicOpt;
-  } else {
-    update_topic_ = "updates";
-  }
-
-  std::stringstream inputStream;
-  inputStream << agent_identifier_ << "/in";
-  in_topic_ = inputStream.str();
-
-  if (mqtt_service_) {
-    mqtt_service_->subscribeToTopic(in_topic_);
-  }
-}
-
-C2Payload MQTTC2Protocol::consumePayload(const std::string &url, const C2Payload &payload, Direction /*direction*/, bool /*async*/) {
-  // we are getting an update.
-  std::lock_guard<std::mutex> lock(input_mutex_);
-  io::BufferStream stream;
-  stream.write(in_topic_);
-  stream.write(url);
-  std::vector<std::byte> response;
-  auto transmit_id = mqtt_service_->send(update_topic_, stream.getBuffer());
-  if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) {
-    C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
-    response_payload.setRawData(response);
-    return response_payload;
-  } else {
-    return {payload.getOperation(), state::UpdateState::READ_COMPLETE};
-  }
-}
-
-C2Payload MQTTC2Protocol::serialize(const C2Payload &payload) {
-  if (mqtt_service_ == nullptr || !mqtt_service_->isRunning()) {
-    return {payload.getOperation(), state::UpdateState::READ_ERROR};
-  }
-
-  std::lock_guard<std::mutex> lock(input_mutex_);
-
-  auto stream = c2::PayloadSerializer::serialize(0x00, payload);
-
-  auto transmit_id = mqtt_service_->send(heartbeat_topic_, stream->getBuffer());
-  std::vector<std::byte> response;
-  if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) {
-    return c2::PayloadSerializer::deserialize(response);
-  }
-  return {payload.getOperation(), state::UpdateState::READ_ERROR};
-}
-
-REGISTER_RESOURCE(MQTTC2Protocol, InternalResource);
-
-}  // namespace org::apache::nifi::minifi::c2
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h b/extensions/mqtt/protocol/MQTTC2Protocol.h
deleted file mode 100644
index 096f98275..000000000
--- a/extensions/mqtt/protocol/MQTTC2Protocol.h
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <algorithm>
-#include <iostream>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-#include "../controllerservice/MQTTControllerService.h"
-#include "c2/C2Protocol.h"
-#include "io/BaseStream.h"
-#include "agent/agent_version.h"
-#include "c2/PayloadSerializer.h"
-
-namespace org::apache::nifi::minifi::c2 {
-
-/**
- * Purpose: Implementation of the MQTT C2 protocol. Serializes messages to and from
- * and mqtt server.
- */
-class MQTTC2Protocol : public C2Protocol {
- public:
-  explicit MQTTC2Protocol(const std::string& name, const utils::Identifier& uuid = {});
-
-  ~MQTTC2Protocol() override;
-
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
-  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-
-  C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
-
-  C2Payload consumePayload(const C2Payload &payload, Direction /*direction*/, bool /*async*/) override {
-    return serialize(payload);
-  }
-
-  void update(const std::shared_ptr<Configure>& /*configure*/) override {
-    // no op.
-  }
-
-  void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override;
-
- protected:
-  C2Payload serialize(const C2Payload &payload);
-
-  std::mutex input_mutex_;
-  // input topic on which we will listen.
-  std::string in_topic_;
-  // agent identifier
-  std::string agent_identifier_;
-  // heartbeat topic name.
-  std::string heartbeat_topic_;
-  // update topic name.
-  std::string update_topic_;
-
-  // mqtt controller service reference.
-  std::shared_ptr<controllers::MQTTControllerService> mqtt_service_;
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<MQTTC2Protocol>::getLogger();
-  // mqtt controller serviec name.
-  std::string controller_service_name_;
-};
-
-}  // namespace org::apache::nifi::minifi::c2
diff --git a/extensions/mqtt/tests/CMakeLists.txt b/extensions/mqtt/tests/CMakeLists.txt
new file mode 100644
index 000000000..081867aea
--- /dev/null
+++ b/extensions/mqtt/tests/CMakeLists.txt
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB MQTT_TESTS  "*.cpp")
+
+FOREACH(testfile ${MQTT_TESTS})
+	get_filename_component(testfilename "${testfile}" NAME_WE)
+	add_executable("${testfilename}" "${testfile}")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_CURRENT_SOURCE_DIR}/../../extensions/standard-processors")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_CURRENT_SOURCE_DIR}/../../../libminifi/test/")
+	createTests("${testfilename}")
+	target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+	target_link_libraries(${testfilename} minifi-mqtt-extensions)
+	target_link_libraries(${testfilename} minifi-standard-processors)
+	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
+ENDFOREACH()
+
+list(LENGTH MQTT_TESTS TEST_COUNT)
+message("-- Finished building ${TEST_COUNT} MQTT related test file(s)...")
diff --git a/extensions/mqtt/tests/ConsumeMQTTTests.cpp b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
new file mode 100644
index 000000000..ac18118f7
--- /dev/null
+++ b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Catch.h"
+#include "TestBase.h"
+#include "../processors/ConsumeMQTT.h"
+
+namespace {
+struct Fixture {
+  Fixture() {
+    LogTestController::getInstance().setDebug<minifi::processors::ConsumeMQTT>();
+    plan_ = testController_.createPlan();
+    consumeMqttProcessor_ = plan_->addProcessor("ConsumeMQTT", "consumeMqttProcessor");
+  }
+
+  ~Fixture() {
+    LogTestController::getInstance().reset();
+  }
+
+  TestController testController_;
+  std::shared_ptr<TestPlan> plan_;
+  std::shared_ptr<core::Processor> consumeMqttProcessor_;
+};
+}  // namespace
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyTopic", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), Catch::EndsWith("Required property is empty: Topic"));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyBrokerURI", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), Catch::EndsWith("Required property is empty: Broker URI"));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "1");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession, "false");
+
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
+    Catch::EndsWith("Processor must have a Client ID for durable (non-clean) sessions"));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID, "subscriber");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "1");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession, "false");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
+    "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved."));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID, "subscriber");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "0");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession, "false");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+
+  REQUIRE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
+    "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved."));
+}
diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp b/extensions/mqtt/tests/PublishMQTTTests.cpp
new file mode 100644
index 000000000..40ab25a4a
--- /dev/null
+++ b/extensions/mqtt/tests/PublishMQTTTests.cpp
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Catch.h"
+#include "TestBase.h"
+#include "../processors/PublishMQTT.h"
+
+namespace {
+struct Fixture {
+  Fixture() {
+    LogTestController::getInstance().setDebug<minifi::processors::PublishMQTT>();
+    plan_ = testController_.createPlan();
+    publishMqttProcessor_ = plan_->addProcessor("PublishMQTT", "publishMqttProcessor");
+  }
+
+  ~Fixture() {
+    LogTestController::getInstance().reset();
+  }
+
+  TestController testController_;
+  std::shared_ptr<TestPlan> plan_;
+  std::shared_ptr<core::Processor> publishMqttProcessor_;
+};
+}  // namespace
+
+TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyTopic", "[publishMQTTTest]") {
+  publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_), Catch::EndsWith("Required property is empty: Topic"));
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyBrokerURI", "[publishMQTTTest]") {
+  publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_), Catch::EndsWith("Required property is empty: Broker URI"));
+  LogTestController::getInstance().reset();
+}
diff --git a/extensions/pdh/tests/CMakeLists.txt b/extensions/pdh/tests/CMakeLists.txt
index f8de542c0..3ffd79ca4 100644
--- a/extensions/pdh/tests/CMakeLists.txt
+++ b/extensions/pdh/tests/CMakeLists.txt
@@ -26,7 +26,6 @@ FOREACH(testfile ${PDH_TESTS})
     target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
     target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/pdh")
     target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/")
-    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
     createTests("${testfilename}")
     target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
     target_link_libraries(${testfilename} minifi-pdh)
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index 5ba79d1b4..205151b84 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -41,7 +41,7 @@ class Property {
    * as they will get coerced to the bool true and false, causing
    * further overwrites to inherit the bool validator.
    */
-  Property(std::string name, std::string description, std::string value, bool is_required, std::string valid_regex, std::vector<std::string> dependent_properties,
+  Property(std::string name, std::string description, const std::string& value, bool is_required, std::string valid_regex, std::vector<std::string> dependent_properties,
            std::vector<std::pair<std::string, std::string>> exclusive_of_properties)
       : name_(std::move(name)),
         description_(std::move(description)),
@@ -55,9 +55,9 @@ class Property {
     default_value_ = coerceDefaultValue(value);
   }
 
-  Property(const std::string name, const std::string description, std::string value)
-      : name_(name),
-        description_(description),
+  Property(std::string name, std::string description, const std::string& value)
+      : name_(std::move(name)),
+        description_(std::move(description)),
         is_required_(false),
         is_collection_(false),
         supports_el_(false),
@@ -65,9 +65,9 @@ class Property {
     default_value_ = coerceDefaultValue(value);
   }
 
-  Property(const std::string name, const std::string description)
-      : name_(name),
-        description_(description),
+  Property(std::string name, std::string description)
+      : name_(std::move(name)),
+        description_(std::move(description)),
         is_required_(false),
         is_collection_(true),
         supports_el_(false),
@@ -78,9 +78,7 @@ class Property {
   Property(const Property &other) = default;
 
   Property()
-      : name_(""),
-        description_(""),
-        is_required_(false),
+      : is_required_(false),
         is_collection_(false),
         supports_el_(false),
         is_transient_(false) {}
diff --git a/libminifi/test/mqtt-tests/CMakeLists.txt b/libminifi/test/mqtt-tests/CMakeLists.txt
deleted file mode 100644
index 55e4437b1..000000000
--- a/libminifi/test/mqtt-tests/CMakeLists.txt
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-file(GLOB KAFKA_INTEGRATION_TESTS  "*.cpp")
-
-SET(EXTENSIONS_TEST_COUNT 0)
-FOREACH(testfile ${KAFKA_INTEGRATION_TESTS})
-    get_filename_component(testfilename "${testfile}" NAME_WE)
-    add_executable("${testfilename}" "${testfile}")
-    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/librdkafka")
-    target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.1/src")
-    target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.1/src-cpp")
-    createTests("${testfilename}")
-    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
-    target_link_libraries(${testfilename} minifi-mqtt-extensions)
-    target_link_libraries(${testfilename} minifi-standard-processors)
-    MATH(EXPR EXTENSIONS_TEST_COUNT "${EXTENSIONS_TEST_COUNT}+1")
-    add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
-ENDFOREACH()
-message("-- Finished building ${KAFKA-EXTENSIONS_TEST_COUNT} Lib Kafka related test file(s)...")


[nifi-minifi-cpp] 01/03: MINIFICPP-1907 Add description to monadic operation wrappers

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit faa16d683a9e2298dbda9e794d2edf46d7d876ef
Author: Marton Szasz <sz...@apache.org>
AuthorDate: Tue Aug 16 02:16:54 2022 +0200

    MINIFICPP-1907 Add description to monadic operation wrappers
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1389
---
 .../utils/detail/MonadicOperationWrappers.h        | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/libminifi/include/utils/detail/MonadicOperationWrappers.h b/libminifi/include/utils/detail/MonadicOperationWrappers.h
index 6548ef4ed..f8e088621 100644
--- a/libminifi/include/utils/detail/MonadicOperationWrappers.h
+++ b/libminifi/include/utils/detail/MonadicOperationWrappers.h
@@ -46,18 +46,47 @@ struct filter_wrapper {
 };
 }  // namespace detail
 
+/**
+ * Transforms a wrapped value of T by calling the provided function T -> U, returning wrapped U.
+ * @param func Function that takes a wrapped value and returns a new value to be wrapped.
+ * @see flatMap if your transformation function itself returns a wrapped value
+ * @return Wrapped result of func
+ */
 template<typename T>
 detail::map_wrapper<T&&> map(T&& func) noexcept { return {std::forward<T>(func)}; }
 
+/**
+ * Transforms a wrapped value of T by calling the provided function T -> wrapped U, returning wrapped U.
+ * @param func Transforms the wrapped value using a function that itself returns a new wrapped value.
+ * @return Transformed value
+ */
 template<typename T>
 detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return {std::forward<T>(func)}; }
 
+/**
+ * For optional-like types, possibly provides a value for an empty object to be replaced with.
+ * @param func A value (function with no parameters) to replace a missing value with, or an action to be executed if the value is missing.
+ *     The value must be wrapped, and of the same type as the contained value.
+ * @see valueOrElse if the new value is always present, i.e. the function returns an unwrapped value.
+ * @return The old value if present, or the new value if missing.
+ */
 template<typename T>
 detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return {std::forward<T>(func)}; }
 
+/**
+ * For optional-like types, returns the present value or the provided value if missing.
+ * @param func A value (function with no parameters) to replace a missing value with. The value must not be wrapped.
+ * @see orElse if the new value may not be present, i.e. the function returns a wrapped value.
+ * @return The old value if present, or the new value if missing. Like std::optional::value_or, but lazily evaluated
+ */
 template<typename T>
 detail::value_or_else_wrapper<T&&> valueOrElse(T&& func) noexcept { return {std::forward<T>(func)}; }
 
+/**
+ * For optional-like types, only keep the present value if it satisfies the predicate
+ * @param func A predicate to filter on
+ * @return The value if it was present and satisfied the predicate, empty otherwise.
+ */
 template<typename T>
 detail::filter_wrapper<T&&> filter(T&& func) noexcept { return {std::forward<T>(func)}; }
 }  // namespace org::apache::nifi::minifi::utils


[nifi-minifi-cpp] 02/03: MINIFICPP-1905 - Enumerate only relevant subdir contents

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e31ee031bf43fb220ea62b7b6b4f09f45f09668d
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Mon Aug 15 11:28:30 2022 +0200

    MINIFICPP-1905 - Enumerate only relevant subdir contents
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1386
---
 libminifi/include/utils/file/FileUtils.h | 21 +++++---------------
 libminifi/test/unit/FilePatternTests.cpp | 33 ++++++++++++++++++++++++++++++++
 2 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 6ba8a3654..92a6884b2 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_FILE_FILEUTILS_H_
-#define LIBMINIFI_INCLUDE_UTILS_FILE_FILEUTILS_H_
+
+#pragma once
 
 #include <filesystem>
 #include <fstream>
@@ -80,12 +80,7 @@
 #endif
 
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-namespace file {
+namespace org::apache::nifi::minifi::utils::file {
 
 namespace FileUtils = ::org::apache::nifi::minifi::utils::file;
 
@@ -373,7 +368,7 @@ inline void list_dir(const std::string& dir,
     std::string path = entry.path().string();
 
     if (utils::file::is_directory(path)) {  // if this is a directory
-      if (dir_callback(dir)) {
+      if (dir_callback(path)) {
         list_dir(path, callback, logger, dir_callback);
       }
     } else {
@@ -756,10 +751,4 @@ inline std::optional<std::string> get_relative_path(const std::string& path, con
   return std::filesystem::relative(path, base_path).string();
 }
 
-}  // namespace file
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_UTILS_FILE_FILEUTILS_H_
+}  // namespace org::apache::nifi::minifi::utils::file
diff --git a/libminifi/test/unit/FilePatternTests.cpp b/libminifi/test/unit/FilePatternTests.cpp
index 7ce45cb19..e0c9fd70c 100644
--- a/libminifi/test/unit/FilePatternTests.cpp
+++ b/libminifi/test/unit/FilePatternTests.cpp
@@ -250,3 +250,36 @@ TEST_CASE("Excluding with directory wildcard exclusion") {
   REQUIRE_NOT_MATCHING(pattern.match(root / "one" / "six/"));
   REQUIRE_EXCLUDE(pattern.match(root / "one" / "ten" / "file.txt"));
 }
+
+TEST_CASE("Check only relevant subdir contents") {
+  TestController controller;
+  std::filesystem::path root{controller.createTempDirectory()};
+  // root
+  //   |- file1.txt
+  //   |- one
+  //   |   |- file2.txt
+  //   |
+  //   |- two
+  //       |- file3.txt
+  minifi::utils::file::create_dir((root / "one").string(), true);
+  minifi::utils::file::create_dir((root / "two").string(), true);
+
+  auto file1 = root / "file1.txt";
+  auto file2 = root / "one" / "file2.txt";
+  auto file3 = root / "two" / "file3.txt";
+
+  for (const auto& file : {file1, file2, file3}) {std::ofstream{file};}
+
+  std::set<std::filesystem::path> checked_files;
+  auto file_cb = [&] (const std::string& dir, const std::string& file) -> bool {
+    checked_files.insert(utils::file::concat_path(dir, file));
+    return true;
+  };
+  auto dir_cb = [&] (const std::string& dir) -> bool {
+    // only check the "one" subdir
+    return std::filesystem::path{dir} == root / "one";
+  };
+  utils::file::list_dir(root.string(), file_cb, controller.getLogger(), dir_cb);
+
+  REQUIRE((checked_files == std::set<std::filesystem::path>{file1, file2}));
+}