You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/02/07 12:52:31 UTC

[nifi-minifi-cpp] 04/04: MINIFICPP-1840 - Add support for MQTT 5 Closes #1432

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e9ca206f91a09476e2b83bf2a484736ec2ce2f70
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Tue Feb 7 11:19:50 2023 +0100

    MINIFICPP-1840 - Add support for MQTT 5
    Closes #1432
    
    Co-authored-by: Ferenc Gerlits <fg...@gmail.com>
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 PROCESSORS.md                                      | 104 +++---
 docker/test/integration/features/mqtt.feature      | 253 +++++++++++--
 docker/test/integration/steps/steps.py             |  11 +-
 .../mqtt/processors/AbstractMQTTProcessor.cpp      | 394 ++++++++++++++++++---
 extensions/mqtt/processors/AbstractMQTTProcessor.h | 249 +++++++------
 .../AbstractMQTTProcessorStaticDefinitions.cpp     |  66 +++-
 extensions/mqtt/processors/ConsumeMQTT.cpp         | 356 +++++++++++++++----
 extensions/mqtt/processors/ConsumeMQTT.h           | 145 +++++---
 extensions/mqtt/processors/PublishMQTT.cpp         | 287 ++++++++++++---
 extensions/mqtt/processors/PublishMQTT.h           | 149 +++++---
 extensions/mqtt/tests/ConsumeMQTTTests.cpp         | 106 +++++-
 extensions/mqtt/tests/PublishMQTTTests.cpp         |  29 +-
 libminifi/include/core/ConfigurableComponent.h     |  17 +-
 libminifi/include/utils/Enum.h                     |   5 +-
 libminifi/src/core/ConfigurableComponent.cpp       |  25 +-
 15 files changed, 1699 insertions(+), 497 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 97b659854..2e91293ee 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -338,28 +338,34 @@ This Processor gets the contents of a FlowFile from a MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                                                 |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI**        |               |                  | The URI to use to connect to the MQTT broker                                                                                |
-| **Topic**             |               |                  | The topic to subscribe to                                                                                                   |
-| Client ID             |               |                  | MQTT client ID to use                                                                                                       |
-| Quality of Service    | 0             |                  | The Quality of Service (QoS) to receive the message with. Accepts three values '0', '1' and '2'                             |
-| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
-| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages being sent to the broker                                                 |
-| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                                               |
-| Last Will Topic       |               |                  | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
-| Last Will Message     |               |                  | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
-| Last Will QoS         | 0             |                  | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'                              |
-| Last Will Retain      | false         |                  | Whether to retain the client's Last Will                                                                                    |
-| Security Protocol     |               |                  | Protocol used to communicate with brokers                                                                                   |
-| Security CA           |               |                  | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
-| Security Cert         |               |                  | Path to client's public key (PEM) used for authentication                                                                   |
-| Security Private Key  |               |                  | Path to client's private key (PEM) used for authentication                                                                  |
-| Security Pass Phrase  |               |                  | Private key passphrase                                                                                                      |
-| Username              |               |                  | Username to use when connecting to the broker                                                                               |
-| Password              |               |                  | Password to use when connecting to the broker                                                                               |
-| Clean Session         | true          |                  | Whether to start afresh rather than remembering previous subscriptions                                                      |
-| Queue Max Message     | 1000          |                  | Maximum number of messages allowed on the received MQTT queue                                                               |
+| Name                        | Default Value | Allowable Values            | Description                                                                                                                                               |
+|-----------------------------|---------------|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**              |               |                             | The URI to use to connect to the MQTT broker                                                                                                              |
+| Client ID                   |               |                             | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!                                                                                  |
+| MQTT Version                | 3.x AUTO      | 3.x AUTO, 3.1.0, 3.1.1, 5.0 | The MQTT specification version when connecting to the broker.                                                                                             |
+| **Topic**                   |               |                             | The topic to subscribe to.                                                                                                                                |
+| Clean Session               | true          |                             | Whether to start afresh rather than remembering previous subscriptions. If true, then make broker forget subscriptions after disconnected. MQTT 3.x only. |
+| Clean Start                 | true          |                             | Whether to start afresh rather than remembering previous subscriptions. MQTT 5.x only.                                                                    |
+| Session Expiry Interval     | 0 s           |                             | Time to delete session on broker after client is disconnected. MQTT 5.x only.                                                                             |
+| Queue Max Message           | 1000          |                             | Maximum number of messages allowed on the received MQTT queue                                                                                             |
+| Attribute From Content Type |               |                             | Name of FlowFile attribute to be filled from content type of received message. MQTT 5.x only.                                                             |
+| Topic Alias Maximum         | 0             |                             | Maximum number of topic aliases to use. If set to 0, then topic aliases cannot be used. MQTT 5.x only.                                                                      |
+| Receive Maximum             | 65535         |                             | Maximum number of unacknowledged messages allowed. MQTT 5.x only.                                                                                         |
+| Quality of Service          | 0             | 0, 1 and 2                  | The Quality of Service (QoS) to receive the message with.                                                                                                 |
+| Connection Timeout          | 10 sec        |                             | Maximum time interval the client will wait for the network connection to the MQTT broker                                                                  |
+| Keep Alive Interval         | 60 sec        |                             | Defines the maximum time interval between messages being received from the broker                                                                         |
+| Last Will Topic             |               |                             | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent                                          |
+| Last Will Message           |               |                             | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker                               |
+| Last Will QoS               | 0             | 0, 1 and 2                  | The Quality of Service (QoS) to send the last will with.                                                                                                  |
+| Last Will Retain            | false         |                             | Whether to retain the client's Last Will                                                                                                                  |
+| Last Will Content Type      |               |                             | Content type of the client's Last Will. MQTT 5.x only.                                                                                                    |
+| Username                    |               |                             | Username to use when connecting to the broker                                                                                                             |
+| Password                    |               |                             | Password to use when connecting to the broker                                                                                                             |
+| Security Protocol           |               |                             | Protocol used to communicate with brokers                                                                                                                 |
+| Security CA                 |               |                             | File or directory path to CA certificate(s) for verifying the broker's key                                                                                |
+| Security Cert               |               |                             | Path to client's public key (PEM) used for authentication                                                                                                 |
+| Security Private Key        |               |                             | Path to client's private key (PEM) used for authentication                                                                                                |
+| Security Pass Phrase        |               |                             | Private key passphrase                                                                                                                                    |
 
 
 ### Relationships
@@ -369,6 +375,13 @@ In the list below, the names of required properties appear in bold. Any other pr
 | success | FlowFiles that are sent successfully to the destination are transferred to this relationship |
 
 
+### Output Attributes
+
+| Attribute     | Description               |
+|---------------|---------------------------|
+| _mqtt.broker_ | URI of the sending broker |
+| _mqtt.topic_  | Topic of the message      |
+
 ## ConsumeWindowsEventLog
 
 ### Description
@@ -1842,27 +1855,30 @@ PublishMQTT serializes FlowFile content as an MQTT payload, sending the message
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                                                 |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI**        |               |                  | The URI to use to connect to the MQTT broker                                                                                |
-| **Topic**             |               |                  | The topic to publish to                                                                                                     |
-| Client ID             |               |                  | MQTT client ID to use                                                                                                       |
-| Quality of Service    | 0             |                  | The Quality of Service (QoS) to send the message with. Accepts three values '0', '1' and '2'                                |
-| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
-| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages being sent to the broker                                                 |
-| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                                               |
-| Last Will Topic       |               |                  | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
-| Last Will Message     |               |                  | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
-| Last Will QoS         | 0             |                  | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'                              |
-| Last Will Retain      | false         |                  | Whether to retain the client's Last Will                                                                                    |
-| Security Protocol     |               |                  | Protocol used to communicate with brokers                                                                                   |
-| Security CA           |               |                  | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
-| Security Cert         |               |                  | Path to client's public key (PEM) used for authentication                                                                   |
-| Security Private Key  |               |                  | Path to client's private key (PEM) used for authentication                                                                  |
-| Security Pass Phrase  |               |                  | Private key passphrase                                                                                                      |
-| Username              |               |                  | Username to use when connecting to the broker                                                                               |
-| Password              |               |                  | Password to use when connecting to the broker                                                                               |
-| Retain                | false         |                  | Retain published message in broker                                                                                          |
+| Name                    | Default Value | Allowable Values            | Description                                                                                                                 |
+|-------------------------|---------------|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**          |               |                             | The URI to use to connect to the MQTT broker                                                                                |
+| Client ID               |               |                             | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!                                                    |
+| MQTT Version            | 3.x AUTO      | 3.x AUTO, 3.1.0, 3.1.1, 5.0 | The MQTT specification version when connecting to the broker.                                                               |
+| **Topic**               |               |                             | The topic to publish to. <br/>**Supports Expression Language: true**                                                        |
+| Retain                  | false         |                             | Retain published message in broker                                                                                          |
+| Message Expiry Interval |               |                             | Time while message is valid and will be forwarded by broker. MQTT 5.x only.                                        |
+| Content Type            |               |                             | Content type of the message. MQTT 5.x only. <br/>**Supports Expression Language: true**                            |
+| Quality of Service      | 0             | 0, 1 and 2                  | The Quality of Service (QoS) to send the message with.                                                                      |
+| Connection Timeout      | 10 sec        |                             | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
+| Keep Alive Interval     | 60 sec        |                             | Defines the maximum time interval between messages being sent to the broker                                                 |
+| Last Will Topic         |               |                             | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
+| Last Will Message       |               |                             | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
+| Last Will QoS           | 0             | 0, 1 and 2                  | The Quality of Service (QoS) to send the last will with.                                                                    |
+| Last Will Retain        | false         |                             | Whether to retain the client's Last Will                                                                                    |
+| Last Will Content Type  |               |                             | Content type of the client's Last Will. MQTT 5.x only.                                                             |
+| Username                |               |                             | Username to use when connecting to the broker                                                                               |
+| Password                |               |                             | Password to use when connecting to the broker                                                                               |
+| Security Protocol       |               |                             | Protocol used to communicate with brokers                                                                                   |
+| Security CA             |               |                             | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
+| Security Cert           |               |                             | Path to client's public key (PEM) used for authentication                                                                   |
+| Security Private Key    |               |                             | Path to client's private key (PEM) used for authentication                                                                  |
+| Security Pass Phrase    |               |                             | Private key passphrase                                                                                                      |
 
 
 ### Relationships
@@ -2400,6 +2416,8 @@ In the list below, the names of required properties appear in bold. Any other pr
 ### Description
 
 Routes FlowFiles based on their Attributes using the Attribute Expression Language.
+Any number of user-defined dynamic properties can be added, which all support the Attribute Expression Language. Relationships matching the name of the properties will be added.
+FlowFiles will be routed to all the relationships whose matching property evaluates to "true". Unmatched FlowFiles will be routed to the "unmatched" relationship, while failed ones to "failure".
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature
index 1f5204875..2d64c04af 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/docker/test/integration/features/mqtt.feature
@@ -21,10 +21,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
   Background:
     Given the content of "/tmp/output" is monitored
 
-  Scenario: A MiNiFi instance transfers data to an MQTT broker
+  Scenario Outline: A MiNiFi instance transfers data to an MQTT broker
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
     And the "success" relationship of the PublishMQTT processor is connected to the PutFile
@@ -35,10 +36,16 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
 
-  Scenario: If the MQTT broker does not exist, then no flow files are processed
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: If the MQTT broker does not exist, then no flow files are processed
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
     And the "success" relationship of the PublishMQTT processor is connected to the PutFile
@@ -47,10 +54,16 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     When the MiNiFi instance starts up
     Then no files are placed in the monitored directory in 30 seconds of running time
 
-  Scenario: Verify delivery of message when MQTT broker is unstable
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: Verify delivery of message when MQTT broker is unstable
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
     And the "success" relationship of the PublishMQTT processor is connected to the PutFile
@@ -62,12 +75,19 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
 
-  Scenario: A MiNiFi instance publishes and consumes data to/from an MQTT broker
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: A MiNiFi instance publishes and consumes data to/from an MQTT broker
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -81,14 +101,23 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
 
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 3.1.0    |
+    | 3.1.1    |
+    | 5.0      |
+
   Scenario Outline: Subscription to topics with wildcards
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
     And the "Topic" property of the PublishMQTT processor is set to "test/my/topic"
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
     And the "Topic" property of the ConsumeMQTT processor is set to "<topic wildcard pattern>"
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -103,21 +132,26 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And the MQTT broker has a log line matching "Received PUBLISH from .*test/my/topic.*\(4 bytes\)"
 
     Examples: Topic wildcard patterns
-    | topic wildcard pattern |
-    | test/#                 |
-    | test/+/topic           |
+    | topic wildcard pattern | version  |
+    | test/#                 | 3.x AUTO |
+    | test/+/topic           | 3.x AUTO |
+    | test/#                 | 5.0      |
+    | test/+/topic           | 5.0      |
 
-  Scenario: Subscription and publishing with disconnecting clients in persistent sessions
+  Scenario Outline: Subscription and publishing with disconnecting clients in persistent sessions
     # publishing MQTT client
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
     And a PublishMQTT processor in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "Quality of Service" property of the PublishMQTT processor is set to "1"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     # consuming MQTT client
     And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
-    And the "Clean Session" property of the ConsumeMQTT processor is set to "false"
+    And the "<persistent_session_property_1>" property of the ConsumeMQTT processor is set to "<persistent_session_property_1_value>"
+    And the "<persistent_session_property_2>" property of the ConsumeMQTT processor is set to "<persistent_session_property_2_value>"
     And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
 
@@ -135,13 +169,20 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And the MQTT broker has 2 log lines matching "New client connected from .* as consumer-client"
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
 
+    Examples: MQTT versions
+    | version  | persistent_session_property_1 | persistent_session_property_1_value | persistent_session_property_2 | persistent_session_property_2_value |
+    | 3.x AUTO | Clean Session                 |  false                              | Clean Session                 | false                               |
+    | 5.0      | Clean Start                   |  false                              | Session Expiry Interval       | 1 h                                 |
+
   Scenario Outline: UTF-8 topics and messages
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "Topic" property of the PublishMQTT processor is set to "<topic>"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And the "Topic" property of the ConsumeMQTT processor is set to "<topic>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
@@ -156,21 +197,24 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And a flowfile with the content "<message>" is placed in the monitored directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from .*<topic>"
 
-    Examples: Topic wildcard patterns
-    | topic                  | message     |
-    | Лев Николаевич Толстой | Война и мир |
-    | 孙子                    | 孫子兵法     |
-    | محمد بن موسی خوارزمی   | ٱلْجَبْر       |
-    | תַּלְמוּד                  | תּוֹרָה        |
+    Examples: Topics, messages and version
+    | topic                  | message     | version  |
+    | Лев Николаевич Толстой | Война и мир | 3.x AUTO |
+    | 孙子                    | 孫子兵法     | 3.x AUTO |
+    | 孙子                    | 孫子兵法     | 5.0      |
+    | محمد بن موسی خوارزمی   | ٱلْجَبْر       | 3.x AUTO |
+    | תַּלְמוּד                  | תּוֹרָה        | 3.x AUTO |
 
 
-  Scenario: QoS 0 message flow is correct
+  Scenario Outline: QoS 0 message flow is correct
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "Quality of Service" property of the PublishMQTT processor is set to "0"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "0"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
@@ -186,13 +230,20 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And the MQTT broker has a log line matching "Received PUBLISH from publisher-client \(d0, q0, r0, m0, 'testtopic'.*\(4 bytes\)"
     And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q0, r0, m0, 'testtopic',.*\(4 bytes\)\)"
 
-  Scenario: QoS 1 Subscriber sends PUBACK on a PUBLISH message, with correct packet ID
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: QoS 1 Subscriber sends PUBACK on a PUBLISH message, with correct packet ID
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "Quality of Service" property of the PublishMQTT processor is set to "1"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
@@ -210,14 +261,20 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q1, r0, m1, 'testtopic',.*\(4 bytes\)\)"
     And the MQTT broker has a log line matching "Received PUBACK from consumer-client \(Mid: 1, RC:0\)"
 
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
 
-  Scenario: QoS 2 message flow is correct
+  Scenario Outline: QoS 2 message flow is correct
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "Quality of Service" property of the PublishMQTT processor is set to "2"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And the "Quality of Service" property of the ConsumeMQTT processor is set to "2"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And "ConsumeMQTT" processor is a start node
@@ -239,16 +296,23 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     And the MQTT broker has a log line matching "Sending PUBREL to consumer-client \(m1\)"
     And the MQTT broker has a log line matching "Received PUBCOMP from consumer-client \(Mid: 1, RC:0\)"
 
-  Scenario: Retained message
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: Retained message
     # publishing MQTT client
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
     And a file with the content "test" is present in "/tmp/input"
     And a PublishMQTT processor in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "Retain" property of the PublishMQTT processor is set to "true"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     # consuming MQTT client
     And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
 
@@ -264,16 +328,23 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
 
     And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
 
-  Scenario: Last will
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: Last will
     # publishing MQTT client with last will
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
     And a PublishMQTT processor in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
     And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic"
     And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message"
     And the "success" relationship of the GetFile processor is connected to the PublishMQTT
 
     # consuming MQTT client set to consume last will topic
     And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic"
     And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
     And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
@@ -282,13 +353,19 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
 
     When all instances start up
     Then the MQTT broker has a log line matching "Sending CONNACK to publisher-client"
-    Then the MQTT broker has a log line matching "Sending CONNACK to consumer-client"
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
     And "publisher-client" flow is killed
     And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client"
     And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds
 
-  Scenario: Keep Alive
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: Keep Alive
     Given a ConsumeMQTT processor set up to communicate with an MQTT broker instance
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
     And the "Keep Alive Interval" property of the ConsumeMQTT processor is set to "1 sec"
 
     And an MQTT broker is set up in correspondence with the ConsumeMQTT
@@ -296,3 +373,135 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
     When both instances start up
     Then the MQTT broker has a log line matching "Received PINGREQ from consumer-client"
     Then the MQTT broker has a log line matching "Sending PINGRESP to consumer-client"
+
+    Examples: MQTT versions
+    | version  |
+    | 3.x AUTO |
+    | 5.0      |
+
+  Scenario Outline: Message Expiry Interval - MQTT 5
+    # publishing MQTT client
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+    And a PublishMQTT processor in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to "5.0"
+    And the "Message Expiry Interval" property of the PublishMQTT processor is set to "<message_expiry_interval>"
+    And the "Quality of Service" property of the PublishMQTT processor is set to "1"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    # consuming MQTT client
+    And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0"
+    And the "Quality of Service" property of the ConsumeMQTT processor is set to "1"
+    And the "Clean Start" property of the ConsumeMQTT processor is set to "false"
+    And the "Session Expiry Interval" property of the ConsumeMQTT processor is set to "1 h"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+    And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When all instances start up
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And "consumer-client" flow is stopped
+    And the MQTT broker has a log line matching "Received DISCONNECT from consumer-client"
+
+    And a file with the content "test" is placed in "/tmp/input"
+    And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
+
+    And 2 seconds later
+    And "consumer-client" flow is restarted
+    And the MQTT broker has 2 log lines matching "New client connected from .* as consumer-client"
+    And <expectation_num_files> placed in the monitored directory in <expectation_time_limit>
+
+    Examples: Message Expiry Intervals
+    | message_expiry_interval | expectation_num_files                  | expectation_time_limit     |
+    | 1 h                     | a flowfile with the content "test" is  | less than 60 seconds       |
+    | 1 s                     | no files are                           | 30 seconds of running time |
+
+  Scenario: User properties - MQTT 5
+    # publishing MQTT client: GetFile -> UpdateAttribute (my_attr1:true, my_attr2:true) -> PublishMQTT
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+    And a UpdateAttribute processor in the "publisher-client" flow
+    And the "my_attr1" property of the UpdateAttribute processor is set to "true"
+    And the "my_attr2" property of the UpdateAttribute processor is set to "true"
+    And a PublishMQTT processor in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to "5.0"
+    And the "success" relationship of the GetFile processor is connected to the UpdateAttribute
+    And the "success" relationship of the UpdateAttribute processor is connected to the PublishMQTT
+
+    # consuming MQTT client: ConsumeMQTT -> RouteOnAttribute (my_attr1) -> RouteOnAttribute (my_attr2) -> PutFile
+    And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0"
+    And a RouteOnAttribute processor with the name "RouteAttr1" in the "consumer-client" flow
+    And the "matched_my_attr1" property of the RouteAttr1 processor is set to "${my_attr1}"
+    And a RouteOnAttribute processor with the name "RouteAttr2" in the "consumer-client" flow
+    And the "matched_my_attr2" property of the RouteAttr2 processor is set to "${my_attr2}"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+    And the "success" relationship of the ConsumeMQTT processor is connected to the RouteAttr1
+    And the "matched_my_attr1" relationship of the RouteAttr1 processor is connected to the RouteAttr2
+    And the "matched_my_attr2" relationship of the RouteAttr2 processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "test" is placed in "/tmp/input"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
+
+  Scenario: Content type - MQTT 5
+    # publishing MQTT client: GetFile -> PublishMQTT (Content Type: text/plain)
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+    And a PublishMQTT processor in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to "5.0"
+    And the "Content Type" property of the PublishMQTT processor is set to "text/plain"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    # consuming MQTT client: ConsumeMQTT (Attribute From Content Type: content_type) -> RouteOnAttribute (content_type = text/plain) -> PutFile
+    And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0"
+    And the "Attribute From Content Type" property of the ConsumeMQTT processor is set to "content_type"
+    And a RouteOnAttribute processor in the "consumer-client" flow
+    And the "matched_content_type" property of the RouteOnAttribute processor is set to "${content_type:equals('text/plain')}"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+    And the "success" relationship of the ConsumeMQTT processor is connected to the RouteOnAttribute
+    And the "matched_content_type" relationship of the RouteOnAttribute processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When both instances start up
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And the MQTT broker has a log line matching "New client connected from .* as publisher-client"
+    And a file with the content "test" is placed in "/tmp/input"
+    And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
+
+  Scenario: Will content type - MQTT 5
+    # publishing MQTT client: GetFile -> PublishMQTT (Last Will Content Type: text/plain)
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+    And a PublishMQTT processor in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to "5.0"
+    And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic"
+    And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message"
+    And the "Last Will Content Type" property of the PublishMQTT processor is set to "text/plain"
+    And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+    # consuming MQTT client: ConsumeMQTT (Attribute From Content Type: content_type, Topic: last_will_topic) -> RouteOnAttribute (content_type = text/plain) -> PutFile
+    And a ConsumeMQTT processor in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0"
+    And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic"
+    And the "Attribute From Content Type" property of the ConsumeMQTT processor is set to "content_type"
+    And a RouteOnAttribute processor in the "consumer-client" flow
+    And the "matched_content_type" property of the RouteOnAttribute processor is set to "${content_type:equals('text/plain')}"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+    And the "success" relationship of the ConsumeMQTT processor is connected to the RouteOnAttribute
+    And the "matched_content_type" relationship of the RouteOnAttribute processor is connected to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+    When all instances start up
+    Then the MQTT broker has a log line matching "Sending CONNACK to publisher-client"
+    And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+    And "publisher-client" flow is killed
+    And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client"
+    And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index ecc4b7661..686c33ebe 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -224,11 +224,11 @@ def step_impl(context, property_name, processor_name, attribute_key, attribute_v
     processor.set_property(property_name, filtering)
 
 
-@given("the scheduling period of the {processor_name} processor is set to \"{sceduling_period}\"")
-def step_impl(context, processor_name, sceduling_period):
+@given("the scheduling period of the {processor_name} processor is set to \"{scheduling_period}\"")
+def step_impl(context, processor_name, scheduling_period):
     processor = context.test.get_node_by_name(processor_name)
     processor.set_scheduling_strategy("TIMER_DRIVEN")
-    processor.set_scheduling_period(sceduling_period)
+    processor.set_scheduling_period(scheduling_period)
 
 
 @given("these processor properties are set")
@@ -634,6 +634,11 @@ def step_impl(context, container_name):
     context.test.start(container_name)
 
 
+@then("{duration} later")
+def step_impl(context, duration):
+    time.sleep(humanfriendly.parse_timespan(duration))
+
+
 @when("content \"{content}\" is added to file \"{file_name}\" present in directory \"{path}\" {seconds:d} seconds later")
 def step_impl(context, content, file_name, path, seconds):
     time.sleep(seconds)
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 713b68be4..46b6024a3 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -20,6 +20,7 @@
 #include <utility>
 
 #include "utils/StringUtils.h"
+#include "utils/ProcessorConfigUtils.h"
 #include "core/ProcessContext.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -27,44 +28,41 @@ namespace org::apache::nifi::minifi::processors {
 void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
   if (auto value = context->getProperty(BrokerURI)) {
     uri_ = std::move(*value);
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
   }
+  logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+
+  mqtt_version_ = utils::parseEnumProperty<MqttVersions>(*context, MqttVersion);
+  logger_->log_debug("AbstractMQTTProcessor: MQTT Specification Version: %s", mqtt_version_.toString());
+
   if (auto value = context->getProperty(ClientID)) {
     clientID_ = std::move(*value);
-    logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
-  }
-  if (auto value = context->getProperty(Topic)) {
-    topic_ = std::move(*value);
-    logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
+  } else if (mqtt_version_ == MqttVersions::V_3_1_0) {
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 3.1.0 specification does not support empty client IDs");
   }
+  logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+
   if (auto value = context->getProperty(Username)) {
     username_ = std::move(*value);
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
   }
+  logger_->log_debug("AbstractMQTTProcessor: Username [%s]", username_);
+
   if (auto value = context->getProperty(Password)) {
     password_ = std::move(*value);
-    logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
   }
+  logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
 
   if (const auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
     keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
-    logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()});
-  }
-
-  if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
-    max_seg_size_ = {*value};
-    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", max_seg_size_);
   }
+  logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()});
 
   if (const auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
     connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
-    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()});
   }
+  logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()});
 
-  if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
-    qos_ = {*value};
-    logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
-  }
+  qos_ = utils::parseEnumProperty<MqttQoS>(*context, QoS);
+  logger_->log_debug("AbstractMQTTProcessor: QoS [%d]", qos_.value());
 
   if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
     if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
@@ -105,30 +103,43 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContex
       last_will_->message = last_will_message_.c_str();
     }
 
-    if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
-      logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]", *value);
-      last_will_qos_ = {*value};
-      last_will_->qos = gsl::narrow<int>(last_will_qos_);
-    }
+    last_will_qos_ = utils::parseEnumProperty<MqttQoS>(*context, LastWillQoS);
+    logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%d]", last_will_qos_.value());
+    last_will_->qos = last_will_qos_.value();
 
     if (const auto value = context->getProperty<bool>(LastWillRetain)) {
       logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", *value);
       last_will_retain_ = {*value};
       last_will_->retained = last_will_retain_;
     }
+
+    if (auto value = context->getProperty(LastWillContentType)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Content Type [%s]", *value);
+      last_will_content_type_ = std::move(*value);
+    }
   }
 
+  readProperties(context);
   checkProperties();
+  initializeClient();
+}
+
+void AbstractMQTTProcessor::initializeClient() {
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
 
   if (!client_) {
-    if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
-      logger_->log_error("Creating MQTT client failed");
+    MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer;
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      options.MQTTVersion = MQTTVERSION_5;
+    }
+    if (MQTTAsync_createWithOptions(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr, &options) != MQTTASYNC_SUCCESS) {
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Creating MQTT client failed");
     }
   }
   if (client_) {
     if (MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, nullptr) == MQTTASYNC_FAILURE) {
-      logger_->log_error("Setting MQTT client callbacks failed");
-      return;
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Setting MQTT client callbacks failed");
     }
     // call reconnect to bootstrap
     reconnect();
@@ -137,49 +148,324 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContex
 
 void AbstractMQTTProcessor::reconnect() {
   if (!client_) {
-    logger_->log_error("MQTT client is not existing while trying to reconnect");
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect");
+  }
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
     return;
   }
+
+  MQTTProperties connect_properties = MQTTProperties_initializer;
+  MQTTProperties will_properties = MQTTProperties_initializer;
+
+  ConnectFinishedTask connect_finished_task(
+          [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
+            onConnectFinished(success_data, success_data_5, failure_data, failure_data_5);
+          });
+
+  const MQTTAsync_connectOptions connect_options = createConnectOptions(connect_properties, will_properties, connect_finished_task);
+
+  logger_->log_info("Reconnecting to %s", uri_);
   if (MQTTAsync_isConnected(client_)) {
-    logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+
+  const int ret = MQTTAsync_connect(client_, &connect_options);
+  MQTTProperties_free(&connect_properties);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret);
     return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
-  conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
-  conn_opts.connectTimeout = gsl::narrow<int>(connection_timeout_.count());
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+MQTTAsync_connectOptions AbstractMQTTProcessor::createConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties, ConnectFinishedTask& connect_finished_task) {
+  MQTTAsync_connectOptions connect_options = [this, &connect_properties, &will_properties] {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      return createMqtt5ConnectOptions(connect_properties, will_properties);
+    } else {
+      return createMqtt3ConnectOptions();
+    }
+  }();
+
+  connect_options.context = &connect_finished_task;
+  connect_options.connectTimeout = gsl::narrow<int>(connection_timeout_.count());
+  connect_options.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
   if (!username_.empty()) {
-    conn_opts.username = username_.c_str();
-    conn_opts.password = password_.c_str();
+    connect_options.username = username_.c_str();
+    connect_options.password = password_.c_str();
   }
   if (sslOpts_) {
-    conn_opts.ssl = &*sslOpts_;
+    connect_options.ssl = &*sslOpts_;
   }
   if (last_will_) {
-    conn_opts.will = &*last_will_;
+    connect_options.will = &*last_will_;
   }
 
-  logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
-  if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret);
+  return connect_options;
+}
+
+MQTTAsync_connectOptions AbstractMQTTProcessor::createMqtt3ConnectOptions() const {
+  MQTTAsync_connectOptions connect_options = MQTTAsync_connectOptions_initializer;
+  connect_options.onSuccess = connectionSuccess;
+  connect_options.onFailure = connectionFailure;
+  connect_options.cleansession = getCleanSession();
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+    connect_options.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+    connect_options.MQTTVersion = MQTTVERSION_3_1_1;
   }
+
+  return connect_options;
+}
+
+MQTTAsync_connectOptions AbstractMQTTProcessor::createMqtt5ConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties) const {
+  MQTTAsync_connectOptions connect_options = MQTTAsync_connectOptions_initializer5;
+  connect_options.onSuccess5 = connectionSuccess5;
+  connect_options.onFailure5 = connectionFailure5;
+  connect_options.connectProperties = &connect_properties;
+
+  connect_options.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = gsl::narrow<unsigned int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_properties, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = gsl::narrow<int>(last_will_content_type_.length());
+    property.value.data.data = const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_properties, &property);
+  }
+
+  connect_options.willProperties = &will_properties;
+
+  setProcessorSpecificMqtt5ConnectOptions(connect_properties);
+
+  return connect_options;
+}
+
+void AbstractMQTTProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+    logger_->log_debug("Null-op in onTrigger, processor is shutting down.");
+    return;
+  }
+
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+    logger_->log_error("Could not work with MQTT broker because disconnected to %s", uri_);
+    yield();
+    return;
+  }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-    MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer;
-    disconnect_options.context = this;
-    disconnect_options.onSuccess = disconnectionSuccess;
-    disconnect_options.onFailure = disconnectionFailure;
-    disconnect_options.timeout = gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
-    MQTTAsync_disconnect(client_, &disconnect_options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+    return;
   }
-  if (client_) {
-    MQTTAsync_destroy(&client_);
+
+  disconnect();
+
+  MQTTAsync_destroy(&client_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+    return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer;
+  ConnectFinishedTask disconnect_finished_task(
+          [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
+            onDisconnectFinished(success_data, success_data_5, failure_data, failure_data_5);
+          });
+  disconnect_options.context = &disconnect_finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    disconnect_options.onSuccess5 = connectionSuccess5;
+    disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+    disconnect_options.onSuccess = connectionSuccess;
+    disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, &disconnect_options);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with error code [%d]", uri_, ret);
+    return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& out_var) {
+    const int value = MQTTProperties_getNumericValue(&response->properties, property_code);
+    if (value != PAHO_MQTT_C_FAILURE_CODE) {
+      if constexpr (std::is_same_v<decltype(out_var), std::optional<std::chrono::seconds>&>) {
+        out_var = std::chrono::seconds(value);
+      } else {
+        out_var = gsl::narrow<typename std::remove_reference_t<decltype(out_var)>::value_type>(value);
+      }
+    } else {
+      out_var.reset();
+    }
+  };
+
+  readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
+  readProperty(MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE, wildcard_subscription_available_);
+  readProperty(MQTTPROPERTY_CODE_SHARED_SUBSCRIPTION_AVAILABLE, shared_subscription_available_);
+
+  readProperty(MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM, broker_topic_alias_maximum_);
+  readProperty(MQTTPROPERTY_CODE_RECEIVE_MAXIMUM, broker_receive_maximum_);
+  readProperty(MQTTPROPERTY_CODE_MAXIMUM_QOS, maximum_qos_);
+  readProperty(MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE, maximum_packet_size_);
+
+  readProperty(MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL, maximum_session_expiry_interval_);
+  readProperty(MQTTPROPERTY_CODE_SERVER_KEEP_ALIVE, server_keep_alive_);
+}
+
+void AbstractMQTTProcessor::checkBrokerLimits() {
+  try {
+    if (server_keep_alive_.has_value() && server_keep_alive_ < keep_alive_interval_) {
+      std::ostringstream os;
+      os << "Set Keep Alive Interval (" << keep_alive_interval_.count() << " s) is longer than the maximum supported by the broker (" << server_keep_alive_->count() << " s)";
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, os.str());
+    }
+
+    if (maximum_qos_.has_value() && qos_.value() > maximum_qos_) {
+      std::ostringstream os;
+      os << "Set QoS (" << qos_.value() << ") is higher than the maximum supported by the broker (" << *maximum_qos_ << ")";
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, os.str());
+    }
+
+    checkBrokerLimitsImpl();
+  }
+  catch (...) {
+    disconnect();
+    throw;
+  }
+}
+
+void AbstractMQTTProcessor::connectionLost(void *context, char* cause) {
+  auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+  processor->onConnectionLost(cause);
+}
+
+
+void AbstractMQTTProcessor::connectionSuccess(void* context, MQTTAsync_successData* response) {
+  auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
+  (*task)(response, nullptr, nullptr, nullptr);
+}
+
+void AbstractMQTTProcessor::connectionSuccess5(void* context, MQTTAsync_successData5* response) {
+  auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
+  (*task)(nullptr, response, nullptr, nullptr);
+}
+
+void AbstractMQTTProcessor::connectionFailure(void* context, MQTTAsync_failureData* response) {
+  auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
+  (*task)(nullptr, nullptr, response, nullptr);
+}
+
+void AbstractMQTTProcessor::connectionFailure5(void* context, MQTTAsync_failureData5* response) {
+  auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
+  (*task)(nullptr, nullptr, nullptr, response);
+}
+
+int AbstractMQTTProcessor::msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message) {
+  auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+  processor->onMessageReceived(SmartMessage{std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>(message), std::string(topic_name, topic_len)});
+  MQTTAsync_free(topic_name);
+  return 1;
+}
+
+void AbstractMQTTProcessor::onConnectionLost(char* cause) {
+  logger_->log_error("Connection lost to MQTT broker %s", uri_);
+  if (cause != nullptr) {
+    logger_->log_error("Cause for connection loss: %s", cause);
+  }
+}
+
+void AbstractMQTTProcessor::onConnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5,
+                                              MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
+  if (success_data) {
+    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
+    startupClient();
+    return;
+  }
+
+  if (success_data_5) {
+    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
+    logger_->log_info("Reason code for connection success: %d: %s", success_data_5->reasonCode, MQTTReasonCode_toString(success_data_5->reasonCode));
+    setBrokerLimits(success_data_5);
+    checkBrokerLimits();
+    startupClient();
+    return;
+  }
+
+  if (failure_data) {
+    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, failure_data->code);
+    if (failure_data->message != nullptr) {
+      logger_->log_error("Detailed reason for connection failure: %s", failure_data->message);
+    }
+    return;
+  }
+
+  if (failure_data_5) {
+    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, failure_data_5->code);
+    if (failure_data_5->message != nullptr) {
+      logger_->log_error("Detailed reason for connection failure: %s", failure_data_5->message);
+    }
+    logger_->log_error("Reason code for connection failure: %d: %s", failure_data_5->reasonCode, MQTTReasonCode_toString(failure_data_5->reasonCode));
+  }
+}
+
+void AbstractMQTTProcessor::onDisconnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5,
+                                                 MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
+  if (success_data) {
+    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
+    return;
+  }
+
+  if (success_data_5) {
+    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
+    logger_->log_info("Reason code for disconnection success: %d: %s", success_data_5->reasonCode, MQTTReasonCode_toString(success_data_5->reasonCode));
+    return;
+  }
+
+  if (failure_data) {
+    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, failure_data->code);
+    if (failure_data->message != nullptr) {
+      logger_->log_error("Detailed reason for disconnection failure: %s", failure_data->message);
+    }
+    return;
+  }
+
+  if (failure_data_5) {
+    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, failure_data_5->code);
+    if (failure_data_5->message != nullptr) {
+      logger_->log_error("Detailed reason for disconnection failure: %s", failure_data_5->message);
+    }
+    logger_->log_error("Reason code for disconnection failure: %d: %s", failure_data_5->reasonCode, MQTTReasonCode_toString(failure_data_5->reasonCode));
   }
 }
 
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index d9c1458df..58aca34c0 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -21,19 +21,17 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <shared_mutex>
 
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"
 #include "MQTTAsync.h"
 
 namespace org::apache::nifi::minifi::processors {
 
-static constexpr uint8_t MQTT_QOS_0 = 0;
-static constexpr uint8_t MQTT_QOS_1 = 1;
-static constexpr uint8_t MQTT_QOS_2 = 2;
-
 static constexpr const char* const MQTT_SECURITY_PROTOCOL_SSL = "ssl";
 
 class AbstractMQTTProcessor : public core::Processor {
@@ -46,150 +44,182 @@ class AbstractMQTTProcessor : public core::Processor {
     freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+    (V_3X_AUTO, "3.x AUTO"),
+    (V_3_1_0, "3.1.0"),
+    (V_3_1_1, "3.1.1"),
+    (V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+    (LEVEL_0, "0"),
+    (LEVEL_1, "1"),
+    (LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
     return std::array{
-            BrokerURI,
-            Topic,
-            ClientID,
-            QoS,
-            ConnectionTimeout,
-            KeepAliveInterval,
-            MaxFlowSegSize,
-            LastWillTopic,
-            LastWillMessage,
-            LastWillQoS,
-            LastWillRetain,
-            Username,
-            Password,
-            SecurityProtocol,
-            SecurityCA,
-            SecurityCert,
-            SecurityPrivateKey,
-            SecurityPrivateKeyPassword
+      BrokerURI,
+      ClientID,
+      MqttVersion
+    };
+  }
+
+  static auto advancedProperties() {
+    return std::array{
+      QoS,
+      ConnectionTimeout,
+      KeepAliveInterval,
+      LastWillTopic,
+      LastWillMessage,
+      LastWillQoS,
+      LastWillRetain,
+      LastWillContentType,
+      Username,
+      Password,
+      SecurityProtocol,
+      SecurityCA,
+      SecurityCert,
+      SecurityPrivateKey,
+      SecurityPrivateKeyPassword
     };
   }
 
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
 
   void notifyStop() override {
     freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+    void operator()(MQTTAsync_message* message) {
+      MQTTAsync_freeMessage(&message);
+    }
+  };
+
+  struct SmartMessage {
+    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> contents;
+    std::string topic;
+  };
+
+  // defined by Paho MQTT C library
+  static constexpr int PAHO_MQTT_C_FAILURE_CODE = -9999999;
+  static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535;
+
+  /**
+   * Connect to MQTT broker. Synchronously waits until connection succeeds or fails.
+   */
   void reconnect();
 
+  /**
+   * Checks property consistency before connecting to broker
+   */
+  virtual void checkProperties() {
+  }
+
+  /**
+   * Checks broker limits and supported features vs our desired features after connecting to broker
+   */
+  void checkBrokerLimits();
+  virtual void checkBrokerLimitsImpl() = 0;
+
+  // variables being used for a synchronous connection and disconnection
+  std::shared_mutex client_mutex_;
+
   MQTTAsync client_ = nullptr;
   std::string uri_;
-  std::string topic_;
   std::chrono::seconds keep_alive_interval_{60};
-  uint64_t max_seg_size_ = std::numeric_limits<uint64_t>::max();
-  std::chrono::seconds connection_timeout_{30};
-  uint32_t qos_ = MQTT_QOS_1;
+  std::chrono::seconds connection_timeout_{10};
+  MqttQoS qos_{MqttQoS::LEVEL_0};
   std::string clientID_;
   std::string username_;
   std::string password_;
+  MqttVersions mqtt_version_{MqttVersions::V_3X_AUTO};
 
- private:
-  // MQTT async callback
-  static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onMessageReceived(topic_name, topic_len, message);
-    return 1;
-  }
-
-  // MQTT async callback
-  static void connectionLost(void *context, char* cause) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionLost(cause);
-  }
-
-  // MQTT async callback
-  static void connectionSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionSuccess(response);
-  }
+  // Supported operations
+  std::optional<bool> retain_available_;
+  std::optional<bool> wildcard_subscription_available_;
+  std::optional<bool> shared_subscription_available_;
 
-  // MQTT async callback
-  static void connectionFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionFailure(response);
-  }
+  std::optional<uint16_t> broker_topic_alias_maximum_;
+  std::optional<uint16_t> broker_receive_maximum_;
+  std::optional<uint8_t> maximum_qos_;
+  std::optional<uint32_t> maximum_packet_size_;
 
-  // MQTT async callback
-  static void disconnectionSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionSuccess(response);
-  }
+  std::optional<std::chrono::seconds> maximum_session_expiry_interval_;
+  std::optional<std::chrono::seconds> server_keep_alive_;
 
-  // MQTT async callback
-  static void disconnectionFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionFailure(response);
-  }
-
-  virtual void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
-    MQTTAsync_freeMessage(&message);
-    MQTTAsync_free(topic_name);
-  }
-
-  void onConnectionLost(char* cause) {
-    logger_->log_error("Connection lost to MQTT broker %s", uri_);
-    if (cause != nullptr) {
-      logger_->log_error("Cause for connection loss: %s", cause);
-    }
-  }
+ private:
+  using ConnectFinishedTask = std::packaged_task<void(MQTTAsync_successData*, MQTTAsync_successData5*, MQTTAsync_failureData*, MQTTAsync_failureData5*)>;
 
-  void onConnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
-    startupClient();
-  }
+  /**
+   * Initializes local MQTT client and connects to broker.
+   */
+  void initializeClient();
 
-  void onConnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for connection failure: %s", response->message);
-    }
-  }
+  /**
+   * Calls disconnect() and releases local MQTT client
+   */
+  void freeResources();
 
-  void onDisconnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
-  }
+  /**
+   * Disconnect from MQTT broker. Synchronously waits until disconnection succeeds or fails.
+   */
+  void disconnect();
+
+  virtual void readProperties(const std::shared_ptr<core::ProcessContext>& context) = 0;
+  virtual void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) = 0;
+  virtual void startupClient() = 0;
+  void setBrokerLimits(MQTTAsync_successData5* response);
+
+  // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this"
+  static void connectionLost(void *context, char* cause);
+  static void connectionSuccess(void* context, MQTTAsync_successData* response);
+  static void connectionSuccess5(void* context, MQTTAsync_successData5* response);
+  static void connectionFailure(void* context, MQTTAsync_failureData* response);
+  static void connectionFailure5(void* context, MQTTAsync_failureData5* response);
+  static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message);
+
+  // MQTT async callback methods
+  void onConnectionLost(char* cause);
+  void onConnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5);
+  void onDisconnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5);
 
-  void onDisconnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for disconnection failure: %s", response->message);
-    }
+  /**
+   * Called if message is received. This is default implementation, to be overridden if subclass wants to use the message.
+   * @param topic topic of message
+   * @param message MQTT message
+   */
+  virtual void onMessageReceived(SmartMessage /*smartmessage*/) {
   }
 
   virtual bool getCleanSession() const = 0;
-  virtual bool startupClient() = 0;
-
-  void freeResources();
-
-  /**
-   * Checks property consistency before connecting to broker
-   */
-  virtual void checkProperties() {
+  virtual bool getCleanStart() const = 0;
+  virtual std::chrono::seconds getSessionExpiryInterval() const = 0;
+  MQTTAsync_connectOptions createConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties, ConnectFinishedTask& connect_finished_task);
+  MQTTAsync_connectOptions createMqtt3ConnectOptions() const;
+  MQTTAsync_connectOptions createMqtt5ConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties) const;
+  virtual void setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& /*connect_props*/) const {
   }
 
   // SSL
@@ -203,8 +233,9 @@ class AbstractMQTTProcessor : public core::Processor {
   std::optional<MQTTAsync_willOptions> last_will_;
   std::string last_will_topic_;
   std::string last_will_message_;
-  uint32_t last_will_qos_ = MQTT_QOS_1;
+  MqttQoS last_will_qos_{MqttQoS::LEVEL_0};
   bool last_will_retain_ = false;
+  std::string last_will_content_type_;
 
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AbstractMQTTProcessor>::getLogger(uuid_);
 };
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp b/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp
index cfc7ee7ea..c97e2094e 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessorStaticDefinitions.cpp
@@ -31,27 +31,22 @@ const core::Property AbstractMQTTProcessor::BrokerURI(
   core::PropertyBuilder::createProperty("Broker URI")->
     withDescription("The URI to use to connect to the MQTT broker")->
     isRequired(true)->
-    supportsExpressionLanguage(true)->
     build());
 
 const core::Property AbstractMQTTProcessor::ClientID(
         core::PropertyBuilder::createProperty("Client ID")->
-        withDescription("MQTT client ID to use")->
-        supportsExpressionLanguage(true)->
+        withDescription("MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!")->
         build());
 
-const core::Property AbstractMQTTProcessor::Topic(
-  core::PropertyBuilder::createProperty("Topic")->
-    withDescription("The topic to publish or subscribe to")->
-    isRequired(true)->
-    supportsExpressionLanguage(true)->
-    build());
+const core::Property AbstractMQTTProcessor::QoS(
+        core::PropertyBuilder::createProperty("Quality of Service")->
+                withDescription("The Quality of Service (QoS) of messages. Accepts values '0', '1' and '2'")->
+                withDefaultValue(toString(MqttQoS::LEVEL_0))->
+                withAllowableValues(MqttQoS::values())->
+                build());
 
-const core::Property AbstractMQTTProcessor::QoS("Quality of Service",
-                                                "The Quality of Service (QoS) to send or receive the message with. Accepts three values '0', '1' and '2'", std::to_string(MQTT_QOS_0));
 const core::Property AbstractMQTTProcessor::KeepAliveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
-const core::Property AbstractMQTTProcessor::ConnectionTimeout("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT broker", "30 sec");
-const core::Property AbstractMQTTProcessor::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
+const core::Property AbstractMQTTProcessor::ConnectionTimeout("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT broker", "10 sec");
 const core::Property AbstractMQTTProcessor::Username("Username", "Username to use when connecting to the broker", "");
 const core::Property AbstractMQTTProcessor::Password("Password", "Password to use when connecting to the broker", "");
 const core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
@@ -62,13 +57,40 @@ const core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassword("Security
 const core::Property AbstractMQTTProcessor::LastWillTopic("Last Will Topic", "The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent", "");
 const core::Property AbstractMQTTProcessor::LastWillMessage("Last Will Message",
                                                             "The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker", "");
-const core::Property AbstractMQTTProcessor::LastWillQoS("Last Will QoS", "The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'", std::to_string(MQTT_QOS_0));
+
+const core::Property AbstractMQTTProcessor::LastWillQoS(
+        core::PropertyBuilder::createProperty("Last Will QoS")->
+                withDescription("The Quality of Service (QoS) to send the last will with. Accepts values '0', '1' and '2'")->
+                withDefaultValue(toString(MqttQoS::LEVEL_0))->
+                withAllowableValues(MqttQoS::values())->
+                build());
+
 const core::Property AbstractMQTTProcessor::LastWillRetain("Last Will Retain", "Whether to retain the client's Last Will", "false");
+const core::Property AbstractMQTTProcessor::LastWillContentType("Last Will Content Type", "Content type of the client's Last Will. MQTT 5.x only.", "");
+
+const core::Property AbstractMQTTProcessor::MqttVersion(
+        core::PropertyBuilder::createProperty("MQTT Version")->
+                withDescription("The MQTT specification version when connecting to the broker. See the allowable value descriptions for more details.")->
+                withDefaultValue(toString(MqttVersions::V_3X_AUTO))->
+                withAllowableValues(MqttVersions::values())->
+                build());
 
 // ConsumeMQTT
 
-const core::Property ConsumeMQTT::CleanSession("Clean Session", "Whether to start afresh rather than remembering previous subscriptions.", "true");
+const core::Property ConsumeMQTT::Topic(
+        core::PropertyBuilder::createProperty("Topic")->
+                withDescription("The topic to subscribe to")->
+                isRequired(true)->
+                build());
+
+const core::Property ConsumeMQTT::CleanSession("Clean Session", "Whether to start afresh rather than remembering previous subscriptions. "
+                                                                "Also make broker remember subscriptions after disconnected. MQTT 3.x only.", "true");
+const core::Property ConsumeMQTT::CleanStart("Clean Start", "Whether to start afresh rather than remembering previous subscriptions. MQTT 5.x only.", "true");
+const core::Property ConsumeMQTT::SessionExpiryInterval("Session Expiry Interval", "Time to delete session on broker after client is disconnected. MQTT 5.x only.", "0 s");
 const core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", "1000");
+const core::Property ConsumeMQTT::AttributeFromContentType("Attribute From Content Type", "Name of FlowFile attribute to be filled from content type of received message. MQTT 5.x only.", "");
+const core::Property ConsumeMQTT::TopicAliasMaximum("Maximum number of topic aliases to use. If set to 0, then topic aliases cannot be used. MQTT 5.x only.", "0");
+const core::Property ConsumeMQTT::ReceiveMaximum("Receive Maximum", "Maximum number of unacknowledged messages allowed. MQTT 5.x only.", std::to_string(MQTT_MAX_RECEIVE_MAXIMUM));
 
 const core::Relationship ConsumeMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
 
@@ -77,7 +99,21 @@ REGISTER_RESOURCE(ConsumeMQTT, Processor);
 
 // PublishMQTT
 
+const core::Property PublishMQTT::Topic(
+        core::PropertyBuilder::createProperty("Topic")->
+                withDescription("The topic to publish to")->
+                isRequired(true)->
+                supportsExpressionLanguage(true)->
+                build());
+
 const core::Property PublishMQTT::Retain("Retain", "Retain published message in broker", "false");
+const core::Property PublishMQTT::MessageExpiryInterval("Message Expiry Interval", "Time while message is valid and will be forwarded by broker. MQTT 5.x only.", "");
+
+const core::Property PublishMQTT::ContentType(
+        core::PropertyBuilder::createProperty("Content Type")->
+                withDescription("Content type of the message. MQTT 5.x only.")->
+                supportsExpressionLanguage(true)->
+                build());
 
 const core::Relationship PublishMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
 const core::Relationship PublishMQTT::Failure("failure", "FlowFiles that failed to be sent to the destination are transferred to this relationship");
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index ba82871ff..21166aa32 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()});
 
   if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, std::ref(write_callback));
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = MQTTProperties_propertyCount(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = MQTTProperties_getPropertyAt(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY, i);
+    std::string key(property->value.data.data, property->value.data.len);
+    std::string value(property->value.value.data, property->value.value.len);
+    session->putAttribute(flow_file, key, value);
+  }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0 || attribute_from_content_type_.empty()) {
+    return;
+  }
+
+  MQTTProperty* property = MQTTProperties_getProperty(&message.contents->properties, MQTTPROPERTY_CODE_CONTENT_TYPE);
+  if (property == nullptr) {
+    return;
+  }
+
+  std::string content_type(property->value.data.data, property->value.data.len);
+  session->putAttribute(flow_file, attribute_from_content_type_, content_type);
 }
 
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
   MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
   response_options.context = this;
-  response_options.onSuccess = subscriptionSuccess;
-  response_options.onFailure = subscriptionFailure;
-  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_), &response_options);
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = subscriptionSuccess5;
+    response_options.onFailure5 = subscriptionFailure5;
+  } else {
+    response_options.onSuccess = subscriptionSuccess;
+    response_options.onFailure = subscriptionFailure;
+  }
+
+  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_.value()), &response_options);
   if (ret != MQTTASYNC_SUCCESS) {
     logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
-    return false;
+    return;
   }
   logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
-  return true;
 }
 
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
-  MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    resolveTopicFromAlias(smart_message);
+  }
+
+  if (smart_message.topic.empty()) {
+    logger_->log_error("Received message without topic");
+    return;
+  }
+
+  enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+  auto raw_alias = MQTTProperties_getNumericValue(&smart_message.contents->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+  std::optional<uint16_t> alias;
+  if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+    alias = gsl::narrow<uint16_t>(raw_alias);
+  }
+
+  auto& topic = smart_message.topic;
 
-  const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
-  const size_t msgLen = message->payloadlen;
-  const std::string messageText(msgPayload, msgLen);
-  logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s", messageText, topic_, uri_);
+  if (alias.has_value()) {
+    if (*alias > topic_alias_maximum_) {
+      logger_->log_error("Broker does not respect client's Topic Alias Maximum, sent a greater value: %" PRIu16 " > %" PRIu16, *alias, topic_alias_maximum_);
+      return;
+    }
 
-  std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> smartMessage(message);
-  enqueueReceivedMQTTMsg(std::move(smartMessage));
+    // if topic is empty, this is just a usage of a previously stored alias (look it up), otherwise a new one (store it)
+    if (topic.empty()) {
+      const auto iter = alias_to_topic_.find(*alias);
+      if (iter == alias_to_topic_.end()) {
+        logger_->log_error("Broker sent an alias that was not known to client before: %" PRIu16, *alias);
+      } else {
+        topic = iter->second;
+      }
+    } else {
+      alias_to_topic_[*alias] = topic;
+    }
+  } else if (topic.empty()) {
+    logger_->log_error("Received message without topic and alias");
+  }
 }
 
 void ConsumeMQTT::checkProperties() {
-  if (!cleanSession_ && clientID_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions");
+  if (mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ == MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO) {
+    if (isPropertyExplicitlySet(CleanStart)) {
+      logger_->log_warn("MQTT 3.x specification does not support Clean Start. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(SessionExpiryInterval)) {
+      logger_->log_warn("MQTT 3.x specification does not support Session Expiry Intervals. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(AttributeFromContentType)) {
+      logger_->log_warn("MQTT 3.x specification does not support Content Types and thus attributes cannot be created from them. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(TopicAliasMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Topic Alias Maximum. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(ReceiveMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Receive Maximum. Property is not used.");
+    }
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0 && isPropertyExplicitlySet(CleanSession)) {
+    logger_->log_warn("MQTT 5.0 specification does not support Clean Session. Property is not used.");
+  }
+
+  if (clientID_.empty()) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (Session Expiry Interval > 0) sessions");
+      }
+    } else if (!clean_session_) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions");
+    }
+  }
+
+  if (qos_ == MqttQoS::LEVEL_0) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        logger_->log_warn("Messages are not preserved during client disconnection "
+                          "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.");
+      }
+    } else if (!clean_session_) {
+      logger_->log_warn("Messages are not preserved during client disconnection "
+                        "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.");
+    }
+  }
+}
+
+void ConsumeMQTT::checkBrokerLimitsImpl() {
+  auto hasWildcards = [] (std::string_view topic) {
+    return std::any_of(topic.begin(), topic.end(), [] (const char ch) {return ch == '+' || ch == '#';});
+  };
+
+  if (wildcard_subscription_available_ == false && hasWildcards(topic_)) {
+    std::ostringstream os;
+    os << "Broker does not support wildcards but topic \"" << topic_ <<"\" has them";
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+  }
+
+  if (maximum_session_expiry_interval_.has_value() && session_expiry_interval_ > maximum_session_expiry_interval_) {
+    std::ostringstream os;
+    os << "Set Session Expiry Interval (" << session_expiry_interval_.count() <<" s) is longer than the maximum supported by the broker (" << maximum_session_expiry_interval_->count() << " s).";
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+  }
+
+  if (utils::StringUtils::startsWith(topic_, "$share/")) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      // shared topic are supported on MQTT 5, unless explicitly denied by broker
+      if (shared_subscription_available_ == false) {
+        std::ostringstream os;
+        os << "Shared topic feature with topic \"" << topic_ << "\" is not supported by broker";
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+      }
+    } else {
+      logger_->log_warn("Shared topic feature with topic \"%s\" might not be supported by broker on MQTT 3.x");
+    }
+  }
+}
+
+void ConsumeMQTT::setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& connect_props) const {
+  if (topic_alias_maximum_ > 0) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM;
+    property.value.integer2 = topic_alias_maximum_;
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (receive_maximum_ < MQTT_MAX_RECEIVE_MAXIMUM) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_RECEIVE_MAXIMUM;
+    property.value.integer2 = receive_maximum_;
+    MQTTProperties_add(&connect_props, &property);
+  }
+}
+
+void ConsumeMQTT::subscriptionSuccess(void* context, MQTTAsync_successData* /*response*/) {
+  auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
+  processor->onSubscriptionSuccess();
+}
+
+void ConsumeMQTT::subscriptionSuccess5(void* context, MQTTAsync_successData5* /*response*/) {
+  auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
+  processor->onSubscriptionSuccess();
+}
+
+void ConsumeMQTT::subscriptionFailure(void* context, MQTTAsync_failureData* response) {
+  auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
+  processor->onSubscriptionFailure(response);
+}
+
+void ConsumeMQTT::subscriptionFailure5(void* context, MQTTAsync_failureData5* response) {
+  auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
+  processor->onSubscriptionFailure5(response);
+}
+
+void ConsumeMQTT::onSubscriptionSuccess() {
+  logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", topic_, uri_);
+}
+
+void ConsumeMQTT::onSubscriptionFailure(MQTTAsync_failureData* response) {
+  logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code);
+  if (response->message != nullptr) {
+    logger_->log_error("Detailed reason for subscription failure: %s", response->message);
   }
-  if (!cleanSession_ && qos_ == 0) {
-    logger_->log_warn("Messages are not preserved during client disconnection "
-                      "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.");
+}
+
+void ConsumeMQTT::onSubscriptionFailure5(MQTTAsync_failureData5* response) {
+  logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code);
+  if (response->message != nullptr) {
+    logger_->log_error("Detailed reason for subscription failure: %s", response->message);
   }
+  logger_->log_error("Reason code for subscription failure: %d: %s", response->reasonCode, MQTTReasonCode_toString(response->reasonCode));
 }
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index 3486c526c..5c332a8c4 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -16,10 +16,11 @@
  */
 #pragma once
 
-#include <deque>
 #include <limits>
 #include <memory>
+#include <queue>
 #include <string>
+#include <unordered_map>
 #include <utility>
 
 #include "FlowFileRecord.h"
@@ -35,27 +36,35 @@
 
 namespace org::apache::nifi::minifi::processors {
 
-#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
-#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
-
 class ConsumeMQTT : public processors::AbstractMQTTProcessor {
  public:
   explicit ConsumeMQTT(std::string name, const utils::Identifier& uuid = {})
       : processors::AbstractMQTTProcessor(std::move(name), uuid) {
-    maxQueueSize_ = 100;
   }
 
   EXTENSIONAPI static constexpr const char* Description = "This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. "
       "The the payload of the MQTT message becomes content of a FlowFile";
 
+  EXTENSIONAPI static const core::Property Topic;
   EXTENSIONAPI static const core::Property CleanSession;
+  EXTENSIONAPI static const core::Property CleanStart;
+  EXTENSIONAPI static const core::Property SessionExpiryInterval;
   EXTENSIONAPI static const core::Property QueueBufferMaxMessage;
+  EXTENSIONAPI static const core::Property AttributeFromContentType;
+  EXTENSIONAPI static const core::Property TopicAliasMaximum;
+  EXTENSIONAPI static const core::Property ReceiveMaximum;
 
   static auto properties() {
-    return utils::array_cat(AbstractMQTTProcessor::properties(), std::array{
+    return utils::array_cat(AbstractMQTTProcessor::basicProperties(), std::array{
+      Topic,
       CleanSession,
-      QueueBufferMaxMessage
-    });
+      CleanStart,
+      SessionExpiryInterval,
+      QueueBufferMaxMessage,
+      AttributeFromContentType,
+      TopicAliasMaximum,
+      ReceiveMaximum
+    }, AbstractMQTTProcessor::advancedProperties());
   }
 
   EXTENSIONAPI static const core::Relationship Success;
@@ -68,63 +77,109 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  static constexpr const char* const MQTT_TOPIC_ATTRIBUTE = "mqtt.topic";
+  static constexpr const char* const MQTT_BROKER_ATTRIBUTE = "mqtt.broker";
+
+  void readProperties(const std::shared_ptr<core::ProcessContext>& context) override;
+  void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
   void initialize() override;
 
  private:
-  struct MQTTMessageDeleter {
-    void operator()(MQTTAsync_message* message) {
-      MQTTAsync_freeMessage(&message);
+  class WriteCallback {
+   public:
+    explicit WriteCallback(const SmartMessage& message, std::shared_ptr<core::logging::Logger> logger)
+      : message_(message)
+      , logger_(std::move(logger)) {
     }
-  };
 
-  void getReceivedMQTTMsg(std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>>& msg_queue) {
-    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message;
-    while (queue_.try_dequeue(message)) {
-      msg_queue.push_back(std::move(message));
+    int64_t operator() (const std::shared_ptr<io::OutputStream>& stream);
+
+    [[nodiscard]] bool getSuccessStatus() const {
+      return success_status_;
     }
-  }
 
-  // MQTT async callback
-  static void subscriptionSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
-    processor->onSubscriptionSuccess(response);
-  }
+   private:
+    const SmartMessage& message_;
+    std::shared_ptr<core::logging::Logger> logger_;
+    bool success_status_ = true;
+  };
 
-  // MQTT async callback
-  static void subscriptionFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
-    processor->onSubscriptionFailure(response);
-  }
+  // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this"
+  static void subscriptionSuccess(void* context, MQTTAsync_successData* response);
+  static void subscriptionSuccess5(void* context, MQTTAsync_successData5* response);
+  static void subscriptionFailure(void* context, MQTTAsync_failureData* response);
+  static void subscriptionFailure5(void* context, MQTTAsync_failureData5* response);
+
+  // MQTT non-static async callbacks
+  void onSubscriptionSuccess();
+  void onSubscriptionFailure(MQTTAsync_failureData* response);
+  void onSubscriptionFailure5(MQTTAsync_failureData5* response);
+  void onMessageReceived(SmartMessage smart_message) override;
+
+  /**
+   * Enqueues received MQTT message into internal message queue.
+   * Called as a callback on a separate thread than onTrigger, as a reaction to message incoming.
+   * @param message message to put to queue
+   */
+  void enqueueReceivedMQTTMsg(SmartMessage message);
+
+  /**
+   * Called in onTrigger to return the whole internal message queue
+   * @return message queue of messages received since previous onTrigger
+   */
+  std::queue<SmartMessage> getReceivedMqttMessages();
+
+  /**
+   * Subscribes to topic
+   */
+  void startupClient() override;
 
-  void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", topic_, uri_);
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
+
+  /**
+   * Resolve topic name if it was sent with an alias instead of a regular topic name
+   * @param smart_message message to process
+   */
+  void resolveTopicFromAlias(SmartMessage& smart_message);
+
+  bool getCleanSession() const override {
+    return clean_session_;
   }
 
-  void onSubscriptionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for subscription failure: %s", response->message);
-    }
+  bool getCleanStart() const override {
+    return clean_start_;
   }
 
-  bool getCleanSession() const override {
-    return cleanSession_;
+  std::chrono::seconds getSessionExpiryInterval() const override {
+    return session_expiry_interval_;
   }
 
-  void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) override;
+  /**
+   * Turn MQTT 5 User Properties to Flow File attributes
+   */
+  void putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
 
-  void enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message);
+  /**
+   * Fill a user-requested Flow File attribute from content type
+   */
+  void fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
 
-  bool startupClient() override;
+  void setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& connect_props) const override;
 
-  void checkProperties() override;
+  std::string topic_;
+  bool clean_session_ = true;
+  bool clean_start_ = true;
+  std::chrono::seconds session_expiry_interval_{0};
+  uint64_t max_queue_size_ = 1000;
+  std::string attribute_from_content_type_;
+
+  uint16_t topic_alias_maximum_{0};
+  uint16_t receive_maximum_{MQTT_MAX_RECEIVE_MAXIMUM};
+  std::unordered_map<uint16_t, std::string> alias_to_topic_;
 
+  moodycamel::ConcurrentQueue<SmartMessage> queue_;
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConsumeMQTT>::getLogger(uuid_);
-  bool cleanSession_ = true;
-  uint64_t maxQueueSize_;
-  moodycamel::ConcurrentQueue<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> queue_;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index 218a4220a..37e453eaa 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -16,10 +16,10 @@
  */
 #include "PublishMQTT.h"
 
+#include <algorithm>
 #include <cinttypes>
 #include <memory>
 #include <optional>
-#include <string>
 #include <vector>
 
 #include "utils/StringUtils.h"
@@ -27,83 +27,266 @@
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
 
+namespace {
+class RetriableError : public std::runtime_error {
+  using std::runtime_error::runtime_error;
+};
+}  // namespace
+
 namespace org::apache::nifi::minifi::processors {
 
+using SendFinishedTask = std::packaged_task<bool(bool, std::optional<int>, std::optional<MQTTReasonCodes>)>;
+
 void PublishMQTT::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (!context->getProperty(Topic).has_value()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "PublishMQTT: Topic is required");
+  }
+
   if (const auto retain_opt = context->getProperty<bool>(Retain)) {
     retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+    message_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+    logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setEnabled(mqtt_version_ == MqttVersions::V_5_0 && qos_ != MqttQoS::LEVEL_0);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_);
+  if (!flow_file) {
     yield();
     return;
   }
 
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-    return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+    const auto result = session->readBuffer(flow_file);
+    if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file), flow_file)) {
+      logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_);
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+    logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, ex.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) {
+  static constexpr size_t max_packet_size = 256_MiB - 1;
+  if (buffer.size() > max_packet_size) {
+    logger_->log_error("Sending message failed because MQTT limit maximum packet size [%u] is exceeded by FlowFile of [%zu]", std::to_string(max_packet_size), buffer.size());
+    return false;
   }
 
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow<int>(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-    logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-    session->transfer(flowFile, Failure);
+  if (maximum_packet_size_.has_value() && buffer.size() > *(maximum_packet_size_)) {
+    logger_->log_error("Sending message failed because broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+                                   *maximum_packet_size_, buffer.size());
+    return false;
+  }
+
+  MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+  message_to_publish.payload = const_cast<std::byte*>(buffer.data());
+  message_to_publish.payloadlen = buffer.size();
+  message_to_publish.qos = qos_.value();
+  message_to_publish.retained = retain_;
+
+  setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+  MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = sendSuccess5;
+    response_options.onFailure5 = sendFailure5;
   } else {
-    logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_);
-    session->transfer(flowFile, Success);
-  }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
-  if (flow_size_ < max_seg_size_)
-    max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
-  std::vector<std::byte> buffer(max_seg_size_);
-  read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-    // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-    const auto readRet = stream->read(buffer);
-    if (io::isError(readRet)) {
-      status_ = -1;
-      return gsl::narrow<int64_t>(read_size_);
+    response_options.onSuccess = sendSuccess;
+    response_options.onFailure = sendFailure;
+  }
+
+  // save context for callback
+  SendFinishedTask send_finished_task(
+          [this] (const bool success, const std::optional<int> response_code, const std::optional<MQTTReasonCodes> reason_code) {
+            return notify(success, response_code, reason_code);
+          });
+  response_options.context = &send_finished_task;
+
+  in_flight_message_counter_.increase();
+
+  const int error_code = MQTTAsync_sendMessage(client_, topic.c_str(), &message_to_publish, &response_options);
+  if (error_code != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_sendMessage failed on topic '%s', MQTT broker %s with error code [%d]", topic, uri_, error_code);
+    // early fail, sending attempt did not succeed, no need to wait for callback
+    in_flight_message_counter_.decrease();
+    return false;
+  }
+
+  return send_finished_task.get_future().get();
+}
+
+void PublishMQTT::checkProperties() {
+  if ((mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ == MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO)) {
+    if (isPropertyExplicitlySet(MessageExpiryInterval)) {
+      logger_->log_warn("MQTT 3.x specification does not support Message Expiry Intervals. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(ContentType)) {
+      logger_->log_warn("MQTT 3.x specification does not support Content Types. Property is not used.");
+    }
+  }
+}
+
+void PublishMQTT::checkBrokerLimitsImpl() {
+  if (retain_available_ == false && retain_) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Retain was set but broker does not support it");
+  }
+}
+
+void PublishMQTT::sendSuccess(void* context, MQTTAsync_successData* /*response*/) {
+  auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context);
+  (*send_finished_task)(true, std::nullopt, std::nullopt);
+}
+
+void PublishMQTT::sendSuccess5(void* context, MQTTAsync_successData5* response) {
+  auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context);
+  (*send_finished_task)(true, std::nullopt, response->reasonCode);
+}
+
+void PublishMQTT::sendFailure(void* context, MQTTAsync_failureData* response) {
+  auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context);
+  (*send_finished_task)(false, response->code, std::nullopt);
+}
+
+void PublishMQTT::sendFailure5(void* context, MQTTAsync_failureData5* response) {
+  auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context);
+  (*send_finished_task)(false, response->code, response->reasonCode);
+}
+
+bool PublishMQTT::notify(const bool success, const std::optional<int> response_code, const std::optional<MQTTReasonCodes> reason_code) {
+  in_flight_message_counter_.decrease();
+
+  if (success) {
+    logger_->log_debug("Successfully sent message to MQTT broker %s", uri_);
+    if (reason_code.has_value()) {
+      logger_->log_error("Additional reason code for sending success: %d: %s", *reason_code, MQTTReasonCode_toString(*reason_code));
     }
-    if (readRet > 0) {
-      MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
-      pubmsg.payload = buffer.data();
-      pubmsg.payloadlen = gsl::narrow<int>(readRet);
-      pubmsg.qos = qos_;
-      pubmsg.retained = retain_;
-      MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
-      response_options.context = processor_;
-      response_options.onSuccess = PublishMQTT::sendSuccess;
-      response_options.onFailure = PublishMQTT::sendFailure;
-      if (MQTTAsync_sendMessage(client_, topic_.c_str(), &pubmsg, &response_options) != MQTTASYNC_SUCCESS) {
-        status_ = -1;
-        return -1;
-      }
-      read_size_ += gsl::narrow<size_t>(readRet);
-    } else {
-      break;
+  } else {
+    logger_->log_error("Sending message failed to MQTT broker %s with response code %d", uri_, *response_code);
+    if (reason_code.has_value()) {
+      logger_->log_error("Reason code for sending failure: %d: %s", *reason_code, MQTTReasonCode_toString(*reason_code));
     }
   }
-  return gsl::narrow<int64_t>(read_size_);
+
+  return success;
+}
+
+std::string PublishMQTT::getTopic(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  if (auto value = context->getProperty(Topic, flow_file)) {
+    logger_->log_debug("PublishMQTT: Topic resolved as \"%s\"", *value);
+    return *value;
+  }
+  throw minifi::Exception(ExceptionType::GENERAL_EXCEPTION, "Could not resolve required property Topic");
+}
+
+std::string PublishMQTT::getContentType(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  if (auto value = context->getProperty(ContentType, flow_file)) {
+    logger_->log_debug("PublishMQTT: Content Type resolved as \"%s\"", *value);
+    return *value;
+  }
+  return "";
+}
+
+void PublishMQTT::setMqtt5Properties(MQTTAsync_message& message, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  if (mqtt_version_ != MqttVersions::V_5_0) {
+    return;
+  }
+
+  if (message_expiry_interval_.has_value()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL;
+    property.value.integer4 = message_expiry_interval_->count();
+    MQTTProperties_add(&message.properties, &property);
+  }
+
+  if (!content_type.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = content_type.length();
+    property.value.data.data = const_cast<char*>(content_type.data());
+    MQTTProperties_add(&message.properties, &property);
+  }
+
+  addAttributesAsUserProperties(message, flow_file);
+}
+
+void PublishMQTT::addAttributesAsUserProperties(MQTTAsync_message& message, const std::shared_ptr<core::FlowFile>& flow_file) {
+  for (const auto& [key, value] : *flow_file->getAttributesPtr()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
+
+    // key
+    property.value.data.len = key.length();
+    property.value.data.data = const_cast<char*>(key.data());
+
+    // value
+    property.value.value.len = value.length();
+    property.value.value.data = const_cast<char*>(value.data());
+
+    MQTTProperties_add(&message.properties, &property);
+  }
+}
+
+void PublishMQTT::InFlightMessageCounter::setMax(const uint16_t new_limit) {
+  if (!enabled_) {
+    return;
+  }
+
+  {
+    std::lock_guard lock{mutex_};
+    limit_ = new_limit;
+  }
+  cv_.notify_one();
+}
+
+// increase on sending, wait if limit is reached
+void PublishMQTT::InFlightMessageCounter::increase() {
+  using namespace std::literals::chrono_literals;
+
+  if (!enabled_) {
+    return;
+  }
+
+  std::unique_lock lock{mutex_};
+  const bool success = cv_.wait_for(lock, 5s, [this] { return counter_ < limit_; });
+  if (!success) {
+    throw RetriableError{"Timed out while waiting for a free upload slot on the MQTT server"};
+  }
+  ++counter_;
+}
+
+// decrease on success or failure, notify
+void PublishMQTT::InFlightMessageCounter::decrease() {
+  if (!enabled_) {
+    return;
+  }
+
+  {
+    std::lock_guard lock{mutex_};
+    --counter_;
+  }
+  cv_.notify_one();
 }
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index 12acbd524..396810a9d 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -16,12 +16,12 @@
  */
 #pragma once
 
-#include <vector>
-#include <string>
+#include <limits>
 #include <memory>
+#include <string>
+#include <unordered_map>
 #include <utility>
-
-#include <limits>
+#include <vector>
 
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -43,12 +43,18 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
 
   EXTENSIONAPI static constexpr const char* Description = "PublishMQTT serializes FlowFile content as an MQTT payload, sending the message to the configured topic and broker.";
 
+  EXTENSIONAPI static const core::Property Topic;
   EXTENSIONAPI static const core::Property Retain;
+  EXTENSIONAPI static const core::Property MessageExpiryInterval;
+  EXTENSIONAPI static const core::Property ContentType;
 
   static auto properties() {
-    return utils::array_cat(AbstractMQTTProcessor::properties(), std::array{
-      Retain
-    });
+    return utils::array_cat(AbstractMQTTProcessor::basicProperties(), std::array{
+      Topic,
+      Retain,
+      MessageExpiryInterval,
+      ContentType
+    }, AbstractMQTTProcessor::advancedProperties());
   }
 
   EXTENSIONAPI static const core::Relationship Success;
@@ -62,72 +68,103 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  class ReadCallback {
-   public:
-    ReadCallback(PublishMQTT* processor, uint64_t flow_size, uint64_t max_seg_size, std::string topic, MQTTAsync client, int qos, bool retain)
-        : processor_(processor),
-          flow_size_(flow_size),
-          max_seg_size_(max_seg_size),
-          topic_(std::move(topic)),
-          client_(client),
-          qos_(qos),
-          retain_(retain) {
-    }
+  void readProperties(const std::shared_ptr<core::ProcessContext>& context) override;
+  void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+  void initialize() override;
 
-    int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
+ private:
+  /**
+   * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's Receive Maximum
+   */
+  class InFlightMessageCounter {
+   public:
+    void setEnabled(bool status) { enabled_ = status; }
 
-    size_t read_size_ = 0;
-    int status_ = 0;
+    void setMax(uint16_t new_limit);
+    void increase();
+    void decrease();
 
    private:
-    PublishMQTT* processor_;
-    uint64_t flow_size_;
-    uint64_t max_seg_size_;
-    std::string topic_;
-    MQTTAsync client_;
-
-    int qos_;
-    bool retain_;
+    bool enabled_ = false;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+    uint16_t counter_{0};
+    uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
   };
 
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
-  void initialize() override;
-
- private:
-  // MQTT async callback
-  static void sendSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendSuccess(response);
-  }
+  // MQTT static async callbacks, calling their notify with context being pointer to a packaged_task to notify()
+  static void sendSuccess(void* context, MQTTAsync_successData* response);
+  static void sendSuccess5(void* context, MQTTAsync_successData5* response);
+  static void sendFailure(void* context, MQTTAsync_failureData* response);
+  static void sendFailure5(void* context, MQTTAsync_failureData5* response);
+
+  /**
+   * Resolves topic from expression language
+   */
+  std::string getTopic(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Resolves content type from expression language
+   */
+  std::string getContentType(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Sends an MQTT message asynchronously
+   * @param buffer contents of the message
+   * @param topic topic of the message
+   * @param content_type Content Type for MQTT 5
+   * @param flow_file Flow File being processed
+   * @return success of message sending
+   */
+  bool sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file);
+
+  /**
+   * Callback for asynchronous message sending
+   * @param success if message sending was successful
+   * @param response_code response code for failure only
+   * @param reason_code MQTT 5 reason code
+   * @return if message sending was successful
+   */
+  bool notify(bool success, std::optional<int> response_code, std::optional<MQTTReasonCodes> reason_code);
+
+  /**
+   * Set MQTT 5-exclusive properties
+   * @param message message object
+   * @param content_type content type
+   * @param flow_file Flow File being processed
+   */
+  void setMqtt5Properties(MQTTAsync_message& message, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Adds flow file attributes as user properties to an MQTT 5 message
+   * @param message message object
+   * @param flow_file Flow File being processed
+   */
+  static void addAttributesAsUserProperties(MQTTAsync_message& message, const std::shared_ptr<core::FlowFile>& flow_file);
 
-  // MQTT async callback
-  static void sendFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendFailure(response);
+  bool getCleanSession() const override {
+    return true;
   }
 
-  void onSendSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_debug("Successfully sent message to MQTT topic %s on broker %s", topic_, uri_);
+  bool getCleanStart() const override {
+    return true;
   }
 
-  void onSendFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Sending message failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for sending failure: %s", response->message);
-    }
+  std::chrono::seconds getSessionExpiryInterval() const override {
+    // non-persistent session as we only publish
+    return std::chrono::seconds{0};
   }
 
-  bool getCleanSession() const override {
-    return true;
+  void startupClient() override {
+    // there is no need to do anything like subscribe in the beginning
   }
 
-  bool startupClient() override {
-    // there is no need to do anything like subscribe on the beginning
-    return true;
-  }
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
 
   bool retain_ = false;
+  std::optional<std::chrono::seconds> message_expiry_interval_;
+  InFlightMessageCounter in_flight_message_counter_;
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PublishMQTT>::getLogger(uuid_);
 };
 
diff --git a/extensions/mqtt/tests/ConsumeMQTTTests.cpp b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
index ac18118f7..1aef43572 100644
--- a/extensions/mqtt/tests/ConsumeMQTTTests.cpp
+++ b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
@@ -38,19 +38,21 @@ struct Fixture {
 };
 }  // namespace
 
+using namespace std::literals::chrono_literals;
+
 TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyTopic", "[consumeMQTTTest]") {
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
   REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), Catch::EndsWith("Required property is empty: Topic"));
 }
 
 TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyBrokerURI", "[consumeMQTTTest]") {
-  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
   REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), Catch::EndsWith("Required property is empty: Broker URI"));
 }
 
 TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID", "[consumeMQTTTest]") {
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
-  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "1");
   consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession, "false");
 
@@ -58,27 +60,119 @@ TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID", "[consumeMQ
     Catch::EndsWith("Processor must have a Client ID for durable (non-clean) sessions"));
 }
 
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID_V_5", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "1");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion, toString(minifi::processors::AbstractMQTTProcessor::MqttVersions::V_5_0));
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval, "1 h");
+
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
+                      Catch::EndsWith("Processor must have a Client ID for durable (Session Expiry Interval > 0) sessions"));
+}
+
 TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") {
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID, "subscriber");
-  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "1");
   consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession, "false");
 
   REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
   REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
-    "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved."));
+    "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.", 1s));
 }
 
 TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0", "[consumeMQTTTest]") {
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID, "subscriber");
-  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
   consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "0");
   consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession, "false");
 
   REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
 
   REQUIRE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
-    "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved."));
+    "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID_V_5", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID, "subscriber");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "1");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion, toString(minifi::processors::AbstractMQTTProcessor::MqttVersions::V_5_0));
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval, "1 h");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
+                                                          "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0_V_5", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID, "subscriber");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS, "0");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion, toString(minifi::processors::AbstractMQTTProcessor::MqttVersions::V_5_0));
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval, "1 h");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+
+  REQUIRE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
+                                                    "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyClientID_V_3_1_0", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion, toString(minifi::processors::AbstractMQTTProcessor::MqttVersions::V_3_1_0));
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), Catch::EndsWith("MQTT 3.1.0 specification does not support empty client IDs"));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanStart_V_3", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanStart, "true");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Clean Start. Property is not used.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_SessionExpiryInterval_V_3", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval, "1 h");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Session Expiry Intervals. Property is not used.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanSession_V_5", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion, toString(minifi::processors::AbstractMQTTProcessor::MqttVersions::V_5_0));
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval, "0 s");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession, "true");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 5.0 specification does not support Clean Session. Property is not used.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_TopicAliasMaximum_V_3", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::TopicAliasMaximum, "1");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Topic Alias Maximum. Property is not used.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_ReceiveMaximum_V_3", "[consumeMQTTTest]") {
+  consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic, "mytopic");
+  consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::ReceiveMaximum, "1");
+
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Receive Maximum. Property is not used.", 1s));
 }
diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp b/extensions/mqtt/tests/PublishMQTTTests.cpp
index 40ab25a4a..0111da0a8 100644
--- a/extensions/mqtt/tests/PublishMQTTTests.cpp
+++ b/extensions/mqtt/tests/PublishMQTTTests.cpp
@@ -20,6 +20,8 @@
 #include "TestBase.h"
 #include "../processors/PublishMQTT.h"
 
+using namespace std::literals::chrono_literals;
+
 namespace {
 struct Fixture {
   Fixture() {
@@ -41,11 +43,32 @@ struct Fixture {
 TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyTopic", "[publishMQTTTest]") {
   publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
   REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_), Catch::EndsWith("Required property is empty: Topic"));
-  LogTestController::getInstance().reset();
 }
 
 TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyBrokerURI", "[publishMQTTTest]") {
-  publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::Topic, "mytopic");
+  publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic, "mytopic");
   REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_), Catch::EndsWith("Required property is empty: Broker URI"));
-  LogTestController::getInstance().reset();
+}
+
+TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3_1_0", "[publishMQTTTest]") {
+  publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic, "mytopic");
+  publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion, toString(minifi::processors::AbstractMQTTProcessor::MqttVersions::V_3_1_0));
+  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_), Catch::EndsWith("MQTT 3.1.0 specification does not support empty client IDs"));
+}
+
+TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3", "[publishMQTTTest]") {
+  publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic, "mytopic");
+  publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::MessageExpiryInterval, "60 sec");
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_));
+  REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Message Expiry Intervals. Property is not used.", 1s));
+}
+
+TEST_CASE_METHOD(Fixture, "PublishMQTTTest_ContentType_V_3", "[publishMQTTTest]") {
+  publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic, "mytopic");
+  publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI, "127.0.0.1:1883");
+  publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::ContentType, "text/plain");
+  REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_));
+  REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Content Types. Property is not used.", 1s));
 }
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
index b909ca71c..895b6f49b 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -57,7 +57,7 @@ class ConfigurableComponent {
    * @return result of getting property.
    */
   template<typename T>
-  bool getProperty(const std::string name, T &value) const;
+  bool getProperty(const std::string& name, T &value) const;
 
   template<typename T = std::string>
   std::enable_if_t<std::is_default_constructible<T>::value, std::optional<T>>
@@ -81,7 +81,7 @@ class ConfigurableComponent {
    * @param value property value.
    * @return result of setting property.
    */
-  bool setProperty(const std::string name, std::string value);
+  bool setProperty(const std::string& name, const std::string& value);
 
   /**
    * Updates the Property from the key (name), adding value
@@ -94,7 +94,7 @@ class ConfigurableComponent {
    * @param value property value.
    * @return whether property was set or not
    */
-  bool setProperty(const Property& prop, std::string value);
+  bool setProperty(const Property& prop, const std::string& value);
 
   /**
      * Sets the property using the provided name
@@ -127,7 +127,7 @@ class ConfigurableComponent {
    * @param value
    * @return
    */
-  bool getDynamicProperty(const std::string name, std::string &value) const;
+  bool getDynamicProperty(const std::string& name, std::string &value) const;
 
   /**
    * Sets the value of a new dynamic property.
@@ -136,7 +136,7 @@ class ConfigurableComponent {
    * @param value
    * @return
    */
-  bool setDynamicProperty(const std::string name, std::string value);
+  bool setDynamicProperty(const std::string& name, const std::string& value);
 
   /**
    * Updates the value of an existing dynamic property.
@@ -173,6 +173,11 @@ class ConfigurableComponent {
    */
   std::map<std::string, Property> getProperties() const;
 
+  /**
+   * @return if property exists and is explicitly set, not just falling back to default value
+   */
+  bool isPropertyExplicitlySet(const Property&) const;
+
   virtual ~ConfigurableComponent();
 
   virtual void initialize() {
@@ -206,7 +211,7 @@ class ConfigurableComponent {
 };
 
 template<typename T>
-bool ConfigurableComponent::getProperty(const std::string name, T &value) const {
+bool ConfigurableComponent::getProperty(const std::string& name, T &value) const {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   const auto property_name_and_object = properties_.find(name);
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
index 9921030e7..88126ef46 100644
--- a/libminifi/include/utils/Enum.h
+++ b/libminifi/include/utils/Enum.h
@@ -39,6 +39,7 @@ namespace utils {
     constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
     explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
     explicit Clazz(const char* str) : value_{parse(str).value_} {} \
+    explicit Clazz(std::nullptr_t) = delete; \
    private: \
     Type value_; \
    public: \
@@ -127,7 +128,7 @@ namespace utils {
 #define SMART_ENUM(Clazz, ...) \
   struct Clazz { \
     using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
-    enum Type { \
+    enum Type : int { \
       FOR_EACH(FIRST, COMMA, (__VA_ARGS__)) \
     }; \
     SMART_ENUM_BODY(Clazz, __VA_ARGS__) \
@@ -136,7 +137,7 @@ namespace utils {
 #define SMART_ENUM_EXTEND(Clazz, base, base_fields, ...) \
   struct Clazz { \
     using Base = base; \
-    enum Type { \
+    enum Type : int { \
       FOR_EACH(INCLUDE_BASE_FIELD, COMMA, base_fields), \
       FOR_EACH(FIRST, COMMA, (__VA_ARGS__)) \
     }; \
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 157306a20..135e9229b 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -25,11 +25,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
 ConfigurableComponent::ConfigurableComponent()
     : accept_all_properties_(false),
@@ -57,7 +53,7 @@ bool ConfigurableComponent::getProperty(const std::string &name, Property &prop)
  * @param value property value.
  * @return result of setting property.
  */
-bool ConfigurableComponent::setProperty(const std::string name, std::string value) {
+bool ConfigurableComponent::setProperty(const std::string& name, const std::string& value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto it = properties_.find(name);
 
@@ -114,7 +110,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s
  * @param value property value.
  * @return whether property was set or not
  */
-bool ConfigurableComponent::setProperty(const Property& prop, std::string value) {
+bool ConfigurableComponent::setProperty(const Property& prop, const std::string& value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto it = properties_.find(prop.getName());
 
@@ -185,7 +181,7 @@ void ConfigurableComponent::setSupportedProperties(gsl::span<const core::Propert
   }
 }
 
-bool ConfigurableComponent::getDynamicProperty(const std::string name, std::string &value) const {
+bool ConfigurableComponent::getDynamicProperty(const std::string& name, std::string &value) const {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   auto &&it = dynamic_properties_.find(name);
@@ -224,7 +220,7 @@ bool ConfigurableComponent::createDynamicProperty(const std::string &name, const
   return true;
 }
 
-bool ConfigurableComponent::setDynamicProperty(const std::string name, std::string value) {
+bool ConfigurableComponent::setDynamicProperty(const std::string& name, const std::string& value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto &&it = dynamic_properties_.find(name);
 
@@ -290,8 +286,9 @@ std::map<std::string, Property> ConfigurableComponent::getProperties() const {
   return result;
 }
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+bool ConfigurableComponent::isPropertyExplicitlySet(const Property& searched_prop) const {
+  Property prop;
+  return getProperty(searched_prop.getName(), prop) && !prop.getValues().empty();
+}
+
+}  // namespace org::apache::nifi::minifi::core