You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/21 16:35:39 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1024188173


##########
PROCESSORS.md:
##########
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                                                 |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI**        |               |                  | The URI to use to connect to the MQTT broker                                                                                |
-| **Topic**             |               |                  | The topic to subscribe to                                                                                                   |
-| Client ID             |               |                  | MQTT client ID to use                                                                                                       |
-| Quality of Service    | 0             |                  | The Quality of Service (QoS) to receive the message with. Accepts three values '0', '1' and '2'                             |
-| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
-| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages being sent to the broker                                                 |
-| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                                               |
-| Last Will Topic       |               |                  | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
-| Last Will Message     |               |                  | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
-| Last Will QoS         | 0             |                  | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'                              |
-| Last Will Retain      | false         |                  | Whether to retain the client's Last Will                                                                                    |
-| Security Protocol     |               |                  | Protocol used to communicate with brokers                                                                                   |
-| Security CA           |               |                  | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
-| Security Cert         |               |                  | Path to client's public key (PEM) used for authentication                                                                   |
-| Security Private Key  |               |                  | Path to client's private key (PEM) used for authentication                                                                  |
-| Security Pass Phrase  |               |                  | Private key passphrase                                                                                                      |
-| Username              |               |                  | Username to use when connecting to the broker                                                                               |
-| Password              |               |                  | Password to use when connecting to the broker                                                                               |
-| Clean Session         | true          |                  | Whether to start afresh rather than remembering previous subscriptions                                                      |
-| Queue Max Message     | 1000          |                  | Maximum number of messages allowed on the received MQTT queue                                                               |
+| Name                        | Default Value | Allowable Values            | Description                                                                                                                                                 |
+|-----------------------------|---------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**              |               |                             | The URI to use to connect to the MQTT broker                                                                                                                |
+| Client ID                   |               |                             | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!                                                                                    |
+| MQTT Version                | 3.x AUTO      | 3.x AUTO, 3.1.0, 3.1.1, 5.0 | The MQTT specification version when connecting to the broker.                                                                                               |
+| **Topic**                   |               |                             | The topic to subscribe to.                                                                                                                                  |
+| Clean Session               | true          |                             | Whether to start afresh rather than remembering previous subscriptions. Also make broker remember subscriptions after disconnected. WARNING: MQTT 3.x only. |
+| Clean Start                 | true          |                             | Whether to start afresh rather than remembering previous subscriptions. WARNING: MQTT 5.x only.                                                             |

Review Comment:
   I think all-caps WARNING is too much:
   ```suggestion
   | Clean Start                 | true          |                             | Whether to start afresh rather than remembering previous subscriptions. (MQTT 5.x only)                                                                      |
   ```
   (also elsewhere)



##########
libminifi/include/utils/Enum.h:
##########
@@ -39,6 +39,7 @@ namespace utils {
     constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
     explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
     explicit Clazz(const char* str) : value_{parse(str).value_} {} \
+    explicit Clazz(std::nullptr_t) = delete; \

Review Comment:
   what does this do?



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()});
 
   if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);

Review Comment:
   I don't think this works, as `write()` will work on a temporary `std::function` containing a copy of `write_callback`, so `success_status_` won't be set.
   ```suggestion
         session->write(flow_file, std::ref(write_callback));
   ```



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }

Review Comment:
   I don't think `MQTTProperties_add()` copies the property object: it just copies the `&property` pointer into `connect_props->array`.  So `connect_props` will contain a pointer to this object on the stack which will have been rolled back by the time we use `connect_props`.
   
   (the same problem happens elsewhere, too, eg. in `ConsumeMQTT::setMqtt5ConnectOptionsImpl()` and `PublishMQTT::setMqtt5Properties()`)



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()});
 
   if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = MQTTProperties_propertyCount(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = MQTTProperties_getPropertyAt(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY, i);

Review Comment:
   this can return null if the `i`th property is not a user property (ie., it's an integer property) -- are we sure that can never happen?



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()});
 
   if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = MQTTProperties_propertyCount(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = MQTTProperties_getPropertyAt(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY, i);
+    std::string key(property->value.data.data, property->value.data.len);
+    std::string value(property->value.value.data, property->value.value.len);
+    session->putAttribute(flow_file, key, value);
+  }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0 || attribute_from_content_type_.empty()) {
+    return;
+  }
+
+  MQTTProperty* property = MQTTProperties_getProperty(&message.contents->properties, MQTTPROPERTY_CODE_CONTENT_TYPE);
+  if (property == nullptr) {
+    return;
+  }
+
+  std::string content_type(property->value.data.data, property->value.data.len);
+  session->putAttribute(flow_file, attribute_from_content_type_, content_type);
 }
 
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
   MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
   response_options.context = this;
-  response_options.onSuccess = subscriptionSuccess;
-  response_options.onFailure = subscriptionFailure;
-  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_), &response_options);
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = subscriptionSuccess5;
+    response_options.onFailure5 = subscriptionFailure5;
+  } else {
+    response_options.onSuccess = subscriptionSuccess;
+    response_options.onFailure = subscriptionFailure;
+  }
+
+  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_.value()), &response_options);
   if (ret != MQTTASYNC_SUCCESS) {
     logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
-    return false;
+    return;
   }
   logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
-  return true;
 }
 
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
-  MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    resolveTopicFromAlias(smart_message);
+  }
+
+  if (smart_message.topic.empty()) {
+    logger_->log_error("Received message without topic");
+    return;
+  }
+
+  enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+  auto raw_alias = MQTTProperties_getNumericValue(&smart_message.contents->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+  std::optional<uint16_t> alias;
+  if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+    alias = gsl::narrow<uint16_t>(raw_alias);
+  }
+
+  auto& topic = smart_message.topic;
 
-  const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
-  const size_t msgLen = message->payloadlen;
-  const std::string messageText(msgPayload, msgLen);
-  logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s", messageText, topic_, uri_);
+  if (alias.has_value()) {
+    if (*alias > topic_alias_maximum_) {
+      logger_->log_error("Broker does not respect client's Topic Alias Maximum, sent a greater value: %" PRIu16 " > %" PRIu16, *alias, topic_alias_maximum_);
+      return;
+    }
 
-  std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> smartMessage(message);
-  enqueueReceivedMQTTMsg(std::move(smartMessage));
+    // if topic is empty, this is just a usage of a previously stored alias (look it up), otherwise a new one (store it)
+    if (topic.empty()) {
+      const auto iter = alias_to_topic_.find(*alias);
+      if (iter == alias_to_topic_.end()) {
+        logger_->log_error("Broker sent an alias that was not known to client before: %" PRIu16, *alias);
+      } else {
+        topic = iter->second;
+      }
+    } else {
+      alias_to_topic_[*alias] = topic;
+    }
+  } else if (topic.empty()) {
+    logger_->log_error("Received message without topic and alias");
+  }
 }
 
 void ConsumeMQTT::checkProperties() {
-  if (!cleanSession_ && clientID_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions");
+  if (mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ == MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO) {
+    if (isPropertyExplicitlySet(CleanStart)) {
+      logger_->log_warn("MQTT 3.x specification does not support Clean Start. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(SessionExpiryInterval)) {
+      logger_->log_warn("MQTT 3.x specification does not support Session Expiry Intervals. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(AttributeFromContentType)) {
+      logger_->log_warn("MQTT 3.x specification does not support Content Types and thus attributes cannot be created from them. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(TopicAliasMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Topic Alias Maximum. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(ReceiveMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Receive Maximum. Property is not used.");
+    }
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0 && isPropertyExplicitlySet(CleanSession)) {
+    logger_->log_warn("MQTT 5.0 specification does not support Clean Session. Property is not used.");
+  }
+
+  if (clientID_.empty()) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (Session Expiry Interval > 0) sessions");
+      }
+    } else if (!clean_session_) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions");
+    }
+  }
+
+  if (qos_ == MqttQoS::LEVEL_0) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        logger_->log_warn("Messages are not preserved during client disconnection "
+                          "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.");
+      }
+    } else if (!clean_session_) {
+      logger_->log_warn("Messages are not preserved during client disconnection "
+                        "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.");
+    }
+  }
+}
+
+void ConsumeMQTT::checkBrokerLimitsImpl() {
+  auto hasWildcards = [] (std::string_view topic) {
+    return std::any_of(topic.begin(), topic.end(), [] (const char ch) {return ch == '+' || ch == '#';});
+  };
+
+  if (wildcard_subscription_available_ == false && hasWildcards(topic_)) {
+    std::ostringstream os;
+    os << "Broker does not support wildcards but topic \"" << topic_ <<"\" has them";
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+  }
+
+  if (maximum_session_expiry_interval_.has_value() && session_expiry_interval_ > maximum_session_expiry_interval_) {
+    std::ostringstream os;
+    os << "Set Session Expiry Interval (" << session_expiry_interval_.count() <<" s) is longer then maximum supported by broker (" << maximum_session_expiry_interval_->count() << " s).";
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+  }
+
+  if (utils::StringUtils::startsWith(topic_, "$share/")) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      // shared topic are supported on MQTT 5, unless explicitly denied by broker
+      if (shared_subscription_available_.has_value() && !*shared_subscription_available_) {

Review Comment:
   only a suggestion, but this would be more readable to me:
   ```suggestion
         if (shared_subscription_available_ == false) {
   ```



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()});
 
   if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = MQTTProperties_propertyCount(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = MQTTProperties_getPropertyAt(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY, i);
+    std::string key(property->value.data.data, property->value.data.len);
+    std::string value(property->value.value.data, property->value.value.len);
+    session->putAttribute(flow_file, key, value);
+  }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0 || attribute_from_content_type_.empty()) {
+    return;
+  }
+
+  MQTTProperty* property = MQTTProperties_getProperty(&message.contents->properties, MQTTPROPERTY_CODE_CONTENT_TYPE);
+  if (property == nullptr) {
+    return;
+  }
+
+  std::string content_type(property->value.data.data, property->value.data.len);
+  session->putAttribute(flow_file, attribute_from_content_type_, content_type);
 }
 
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
   MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
   response_options.context = this;
-  response_options.onSuccess = subscriptionSuccess;
-  response_options.onFailure = subscriptionFailure;
-  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_), &response_options);
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = subscriptionSuccess5;
+    response_options.onFailure5 = subscriptionFailure5;
+  } else {
+    response_options.onSuccess = subscriptionSuccess;
+    response_options.onFailure = subscriptionFailure;
+  }
+
+  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_.value()), &response_options);
   if (ret != MQTTASYNC_SUCCESS) {
     logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
-    return false;
+    return;
   }
   logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
-  return true;
 }
 
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
-  MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    resolveTopicFromAlias(smart_message);
+  }
+
+  if (smart_message.topic.empty()) {
+    logger_->log_error("Received message without topic");
+    return;
+  }
+
+  enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+  auto raw_alias = MQTTProperties_getNumericValue(&smart_message.contents->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+  std::optional<uint16_t> alias;
+  if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+    alias = gsl::narrow<uint16_t>(raw_alias);
+  }
+
+  auto& topic = smart_message.topic;
 
-  const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
-  const size_t msgLen = message->payloadlen;
-  const std::string messageText(msgPayload, msgLen);
-  logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s", messageText, topic_, uri_);
+  if (alias.has_value()) {
+    if (*alias > topic_alias_maximum_) {
+      logger_->log_error("Broker does not respect client's Topic Alias Maximum, sent a greater value: %" PRIu16 " > %" PRIu16, *alias, topic_alias_maximum_);
+      return;
+    }
 
-  std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> smartMessage(message);
-  enqueueReceivedMQTTMsg(std::move(smartMessage));
+    // if topic is empty, this is just a usage of a previously stored alias (look it up), otherwise a new one (store it)
+    if (topic.empty()) {
+      const auto iter = alias_to_topic_.find(*alias);
+      if (iter == alias_to_topic_.end()) {
+        logger_->log_error("Broker sent an alias that was not known to client before: %" PRIu16, *alias);
+      } else {
+        topic = iter->second;
+      }
+    } else {
+      alias_to_topic_[*alias] = topic;
+    }
+  } else if (topic.empty()) {
+    logger_->log_error("Received message without topic and alias");
+  }
 }
 
 void ConsumeMQTT::checkProperties() {
-  if (!cleanSession_ && clientID_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions");
+  if (mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ == MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO) {
+    if (isPropertyExplicitlySet(CleanStart)) {
+      logger_->log_warn("MQTT 3.x specification does not support Clean Start. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(SessionExpiryInterval)) {
+      logger_->log_warn("MQTT 3.x specification does not support Session Expiry Intervals. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(AttributeFromContentType)) {
+      logger_->log_warn("MQTT 3.x specification does not support Content Types and thus attributes cannot be created from them. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(TopicAliasMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Topic Alias Maximum. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(ReceiveMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Receive Maximum. Property is not used.");
+    }
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0 && isPropertyExplicitlySet(CleanSession)) {
+    logger_->log_warn("MQTT 5.0 specification does not support Clean Session. Property is not used.");
+  }
+
+  if (clientID_.empty()) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (Session Expiry Interval > 0) sessions");
+      }
+    } else if (!clean_session_) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions");
+    }
+  }
+
+  if (qos_ == MqttQoS::LEVEL_0) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        logger_->log_warn("Messages are not preserved during client disconnection "
+                          "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.");
+      }
+    } else if (!clean_session_) {
+      logger_->log_warn("Messages are not preserved during client disconnection "
+                        "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.");
+    }
+  }
+}
+
+void ConsumeMQTT::checkBrokerLimitsImpl() {
+  auto hasWildcards = [] (std::string_view topic) {
+    return std::any_of(topic.begin(), topic.end(), [] (const char ch) {return ch == '+' || ch == '#';});
+  };
+
+  if (wildcard_subscription_available_ == false && hasWildcards(topic_)) {
+    std::ostringstream os;
+    os << "Broker does not support wildcards but topic \"" << topic_ <<"\" has them";
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+  }
+
+  if (maximum_session_expiry_interval_.has_value() && session_expiry_interval_ > maximum_session_expiry_interval_) {
+    std::ostringstream os;
+    os << "Set Session Expiry Interval (" << session_expiry_interval_.count() <<" s) is longer then maximum supported by broker (" << maximum_session_expiry_interval_->count() << " s).";

Review Comment:
   nitpicking:
   ```suggestion
       os << "Set Session Expiry Interval (" << session_expiry_interval_.count() <<" s) is longer than the maximum supported by the broker (" << maximum_session_expiry_interval_->count() << " s).";
   ```



##########
PROCESSORS.md:
##########
@@ -2364,6 +2380,8 @@ In the list below, the names of required properties appear in bold. Any other pr
 ### Description
 
 Routes FlowFiles based on their Attributes using the Attribute Expression Language.
+Any number of user-defined dynamic properties can be added, which all support the Attribute Expression Language. Relationships matching the name of the properties will be added.
+FlowFiles will be routed to all the relationships whose matching property evaluates to "true". Unmatched FlowFiles will be routed for "unmatched" relationship, while failed ones to "failure".

Review Comment:
   very minor, but:
   ```suggestion
   FlowFiles will be routed to all the relationships whose matching property evaluates to "true". Unmatched FlowFiles will be routed to the "unmatched" relationship, while failed ones to "failure".
   ```



##########
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##########
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
     freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+    (V_3X_AUTO, "3.x AUTO"),
+    (V_3_1_0, "3.1.0"),
+    (V_3_1_1, "3.1.1"),
+    (V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+    (LEVEL_0, "0"),
+    (LEVEL_1, "1"),
+    (LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+    return std::array{
+      BrokerURI,
+      ClientID,
+      MqttVersion
+    };
+  }
+
+  static auto advancedProperties() {
     return std::array{
-            BrokerURI,
-            Topic,
-            ClientID,
-            QoS,
-            ConnectionTimeout,
-            KeepAliveInterval,
-            MaxFlowSegSize,
-            LastWillTopic,
-            LastWillMessage,
-            LastWillQoS,
-            LastWillRetain,
-            Username,
-            Password,
-            SecurityProtocol,
-            SecurityCA,
-            SecurityCert,
-            SecurityPrivateKey,
-            SecurityPrivateKeyPassword
+      QoS,
+      ConnectionTimeout,
+      KeepAliveInterval,
+      LastWillTopic,
+      LastWillMessage,
+      LastWillQoS,
+      LastWillRetain,
+      LastWillContentType,
+      Username,
+      Password,
+      SecurityProtocol,
+      SecurityCA,
+      SecurityCert,
+      SecurityPrivateKey,
+      SecurityPrivateKeyPassword
     };
   }
 
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
 
   void notifyStop() override {
     freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+    void operator()(MQTTAsync_message* message) {
+      MQTTAsync_freeMessage(&message);

Review Comment:
   I don't think the `&` is needed here



##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (!context->getProperty(Topic).has_value()) {
+    logger_->log_error("PublishMQTT: could not get Topic");
+  }

Review Comment:
   How can the `Topic` property become valid later?  If it cannot, then throwing here instead of in `onTrigger()` would make more sense.  (And if it can, then this is not an error.)



##########
libminifi/include/utils/Enum.h:
##########
@@ -127,7 +128,7 @@ namespace utils {
 #define SMART_ENUM(Clazz, ...) \
   struct Clazz { \
     using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
-    enum Type { \
+    enum Type : int { \

Review Comment:
   I'm not against this, but what is the reason for it?



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+    // we are shutting down
+    return;
+  }
+
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+    logger_->log_error("Could not work with MQTT broker because disconnected to %s", uri_);
+    yield();
+    return;
+  }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-    MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer;
-    disconnect_options.context = this;
-    disconnect_options.onSuccess = disconnectionSuccess;
-    disconnect_options.onFailure = disconnectionFailure;
-    disconnect_options.timeout = gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
-    MQTTAsync_disconnect(client_, &disconnect_options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+    return;
   }
-  if (client_) {
-    MQTTAsync_destroy(&client_);
+
+  disconnect();
+
+  MQTTAsync_destroy(&client_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+    return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer;
+  std::packaged_task<void(MQTTAsync_successData*, MQTTAsync_successData5*, MQTTAsync_failureData*, MQTTAsync_failureData5*)> disconnect_finished_task(
+          [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
+            onDisconnectFinished(success_data, success_data_5, failure_data, failure_data_5);
+          });
+  disconnect_options.context = &disconnect_finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    disconnect_options.onSuccess5 = connectionSuccess5;
+    disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+    disconnect_options.onSuccess = connectionSuccess;
+    disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, &disconnect_options);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with error code [%d]", uri_, ret);
+    return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& out_var) {
+    const int value = MQTTProperties_getNumericValue(&response->properties, property_code);
+    if (value != PAHO_MQTT_C_FAILURE_CODE) {
+      if constexpr (std::is_same_v<decltype(out_var), std::optional<std::chrono::seconds>&>) {
+        out_var = std::chrono::seconds(value);
+      } else {
+        out_var = gsl::narrow<typename std::remove_reference_t<decltype(out_var)>::value_type>(value);
+      }
+    } else {
+      out_var.reset();
+    }
+  };
+
+  readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
+  readProperty(MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE, wildcard_subscription_available_);
+  readProperty(MQTTPROPERTY_CODE_SHARED_SUBSCRIPTION_AVAILABLE, shared_subscription_available_);
+
+  readProperty(MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM, broker_topic_alias_maximum_);
+  readProperty(MQTTPROPERTY_CODE_RECEIVE_MAXIMUM, broker_receive_maximum_);
+  readProperty(MQTTPROPERTY_CODE_MAXIMUM_QOS, maximum_qos_);
+  readProperty(MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE, maximum_packet_size_);
+
+  readProperty(MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL, maximum_session_expiry_interval_);
+  readProperty(MQTTPROPERTY_CODE_SERVER_KEEP_ALIVE, server_keep_alive_);
+}
+
+void AbstractMQTTProcessor::checkBrokerLimits() {
+  try {
+    if (server_keep_alive_.has_value() && server_keep_alive_ < keep_alive_interval_) {
+      std::ostringstream os;
+      os << "Set Keep Alive Interval (" << keep_alive_interval_.count() << " s) is longer then maximum supported by broker (" << server_keep_alive_->count() << " s)";
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, os.str());
+    }
+
+    if (maximum_qos_.has_value() && qos_.value() > maximum_qos_) {
+      std::ostringstream os;
+      os << "Set QoS (" << qos_.value() << ") is higher than maximum supported by broker (" << *maximum_qos_ << ")";
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, os.str());
+    }
+
+    checkBrokerLimitsImpl();
+  }
+  catch (...) {
+    disconnect();
+    throw;
+  }
+}
+
+void AbstractMQTTProcessor::connectionLost(void *context, char* cause) {
+  auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+  processor->onConnectionLost(cause);
+}
+
+
+void AbstractMQTTProcessor::connectionSuccess(void* context, MQTTAsync_successData* response) {
+  auto* task = reinterpret_cast<std::packaged_task<void(MQTTAsync_successData*, MQTTAsync_successData5*, MQTTAsync_failureData*, MQTTAsync_failureData5*)>*>(context);

Review Comment:
   Check with @szaszm because I don't want you to change this and then have to revert the change, but I think this `packaged_task<...>` type is long enough and frequently-used enough to merit a typedef.



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()});
 
   if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = MQTTProperties_propertyCount(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = MQTTProperties_getPropertyAt(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY, i);
+    std::string key(property->value.data.data, property->value.data.len);
+    std::string value(property->value.value.data, property->value.value.len);
+    session->putAttribute(flow_file, key, value);
+  }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0 || attribute_from_content_type_.empty()) {
+    return;
+  }
+
+  MQTTProperty* property = MQTTProperties_getProperty(&message.contents->properties, MQTTPROPERTY_CODE_CONTENT_TYPE);
+  if (property == nullptr) {
+    return;
+  }
+
+  std::string content_type(property->value.data.data, property->value.data.len);
+  session->putAttribute(flow_file, attribute_from_content_type_, content_type);
 }
 
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
   MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
   response_options.context = this;
-  response_options.onSuccess = subscriptionSuccess;
-  response_options.onFailure = subscriptionFailure;
-  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_), &response_options);
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = subscriptionSuccess5;
+    response_options.onFailure5 = subscriptionFailure5;
+  } else {
+    response_options.onSuccess = subscriptionSuccess;
+    response_options.onFailure = subscriptionFailure;
+  }
+
+  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_.value()), &response_options);
   if (ret != MQTTASYNC_SUCCESS) {
     logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
-    return false;
+    return;
   }
   logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
-  return true;
 }
 
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
-  MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    resolveTopicFromAlias(smart_message);
+  }
+
+  if (smart_message.topic.empty()) {
+    logger_->log_error("Received message without topic");
+    return;
+  }
+
+  enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+  auto raw_alias = MQTTProperties_getNumericValue(&smart_message.contents->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+  std::optional<uint16_t> alias;
+  if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+    alias = gsl::narrow<uint16_t>(raw_alias);
+  }
+
+  auto& topic = smart_message.topic;
 
-  const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
-  const size_t msgLen = message->payloadlen;
-  const std::string messageText(msgPayload, msgLen);
-  logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s", messageText, topic_, uri_);
+  if (alias.has_value()) {
+    if (*alias > topic_alias_maximum_) {

Review Comment:
   ```suggestion
       if (topic_alias_maximum_ != 0 && *alias > topic_alias_maximum_) {
   ```
   ?



##########
PROCESSORS.md:
##########
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                                                 |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI**        |               |                  | The URI to use to connect to the MQTT broker                                                                                |
-| **Topic**             |               |                  | The topic to subscribe to                                                                                                   |
-| Client ID             |               |                  | MQTT client ID to use                                                                                                       |
-| Quality of Service    | 0             |                  | The Quality of Service (QoS) to receive the message with. Accepts three values '0', '1' and '2'                             |
-| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
-| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages being sent to the broker                                                 |
-| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                                               |
-| Last Will Topic       |               |                  | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
-| Last Will Message     |               |                  | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
-| Last Will QoS         | 0             |                  | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'                              |
-| Last Will Retain      | false         |                  | Whether to retain the client's Last Will                                                                                    |
-| Security Protocol     |               |                  | Protocol used to communicate with brokers                                                                                   |
-| Security CA           |               |                  | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
-| Security Cert         |               |                  | Path to client's public key (PEM) used for authentication                                                                   |
-| Security Private Key  |               |                  | Path to client's private key (PEM) used for authentication                                                                  |
-| Security Pass Phrase  |               |                  | Private key passphrase                                                                                                      |
-| Username              |               |                  | Username to use when connecting to the broker                                                                               |
-| Password              |               |                  | Password to use when connecting to the broker                                                                               |
-| Clean Session         | true          |                  | Whether to start afresh rather than remembering previous subscriptions                                                      |
-| Queue Max Message     | 1000          |                  | Maximum number of messages allowed on the received MQTT queue                                                               |
+| Name                        | Default Value | Allowable Values            | Description                                                                                                                                                 |
+|-----------------------------|---------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**              |               |                             | The URI to use to connect to the MQTT broker                                                                                                                |
+| Client ID                   |               |                             | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!                                                                                    |
+| MQTT Version                | 3.x AUTO      | 3.x AUTO, 3.1.0, 3.1.1, 5.0 | The MQTT specification version when connecting to the broker.                                                                                               |
+| **Topic**                   |               |                             | The topic to subscribe to.                                                                                                                                  |
+| Clean Session               | true          |                             | Whether to start afresh rather than remembering previous subscriptions. Also make broker remember subscriptions after disconnected. WARNING: MQTT 3.x only. |

Review Comment:
   the second sentence is not clear to me: is the broker going to remember subscriptions after a disconnect when this is set to `true` or to `false`?



##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (!context->getProperty(Topic).has_value()) {
+    logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty<bool>(Retain)) {
     retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+    message_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+    logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_);
-    yield();
+  if (!flow_file) {
     return;
   }
 
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-    return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+    const auto result = session->readBuffer(flow_file);
+    if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file), flow_file)) {
+      logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_);
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+    logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, ex.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) {
+  if (buffer.size() > 268'435'455) {
+    logger_->log_error("Sending message failed because MQTT limit maximum packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
   }
 
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow<int>(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-    logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-    session->transfer(flowFile, Failure);
+  if (maximum_packet_size_.has_value() && buffer.size() > *(maximum_packet_size_)) {
+    logger_->log_error("Sending message failed because broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+                                   *maximum_packet_size_, buffer.size());
+  }
+
+  MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+  message_to_publish.payload = const_cast<std::byte*>(buffer.data());
+  message_to_publish.payloadlen = buffer.size();
+  message_to_publish.qos = qos_.value();
+  message_to_publish.retained = retain_;
+
+  setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+  MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = sendSuccess5;
+    response_options.onFailure5 = sendFailure5;
   } else {
-    logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_);
-    session->transfer(flowFile, Success);
-  }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
-  if (flow_size_ < max_seg_size_)
-    max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
-  std::vector<std::byte> buffer(max_seg_size_);
-  read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-    // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-    const auto readRet = stream->read(buffer);
-    if (io::isError(readRet)) {
-      status_ = -1;
-      return gsl::narrow<int64_t>(read_size_);
+    response_options.onSuccess = sendSuccess;
+    response_options.onFailure = sendFailure;
+  }
+
+  // save context for callback
+  std::packaged_task<bool(bool, std::optional<int>, std::optional<MQTTReasonCodes>)> send_finished_task(
+          [this] (const bool success, const std::optional<int> response_code, const std::optional<MQTTReasonCodes> reason_code) {
+            return notify(success, response_code, reason_code);
+          });
+  response_options.context = &send_finished_task;
+
+  in_flight_message_counter_.increase();
+
+  const int error_code = MQTTAsync_sendMessage(client_, topic.c_str(), &message_to_publish, &response_options);
+  if (error_code != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_sendMessage failed on topic '%s', MQTT broker %s with error code [%d]", topic, uri_, error_code);
+    // early fail, sending attempt did not succeed, no need to wait for callback
+    in_flight_message_counter_.decrease();
+    return false;
+  }
+
+  return send_finished_task.get_future().get();
+}
+
+void PublishMQTT::checkProperties() {
+  if ((mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ == MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO)) {
+    if (isPropertyExplicitlySet(MessageExpiryInterval)) {
+      logger_->log_warn("MQTT 3.x specification does not support Message Expiry Intervals. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(ContentType)) {
+      logger_->log_warn("MQTT 3.x specification does not support Content Types. Property is not used.");
+    }
+  }
+}
+
+void PublishMQTT::checkBrokerLimitsImpl() {
+  if (retain_available_.has_value() && !*retain_available_ && retain_) {

Review Comment:
   I would do
   ```suggestion
     if (retain_available_ == false && retain_) {
   ```
   here, too



##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (!context->getProperty(Topic).has_value()) {
+    logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty<bool>(Retain)) {
     retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+    message_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+    logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_);
-    yield();
+  if (!flow_file) {
     return;
   }
 
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-    return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+    const auto result = session->readBuffer(flow_file);
+    if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file), flow_file)) {
+      logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_);
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+    logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, ex.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) {
+  if (buffer.size() > 268'435'455) {
+    logger_->log_error("Sending message failed because MQTT limit maximum packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
   }
 
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow<int>(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-    logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-    session->transfer(flowFile, Failure);
+  if (maximum_packet_size_.has_value() && buffer.size() > *(maximum_packet_size_)) {
+    logger_->log_error("Sending message failed because broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+                                   *maximum_packet_size_, buffer.size());
+  }
+
+  MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+  message_to_publish.payload = const_cast<std::byte*>(buffer.data());
+  message_to_publish.payloadlen = buffer.size();
+  message_to_publish.qos = qos_.value();
+  message_to_publish.retained = retain_;
+
+  setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+  MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer;
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = sendSuccess5;
+    response_options.onFailure5 = sendFailure5;
   } else {
-    logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_);
-    session->transfer(flowFile, Success);
-  }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
-  if (flow_size_ < max_seg_size_)
-    max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
-  std::vector<std::byte> buffer(max_seg_size_);
-  read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-    // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-    const auto readRet = stream->read(buffer);
-    if (io::isError(readRet)) {
-      status_ = -1;
-      return gsl::narrow<int64_t>(read_size_);
+    response_options.onSuccess = sendSuccess;
+    response_options.onFailure = sendFailure;
+  }
+
+  // save context for callback
+  std::packaged_task<bool(bool, std::optional<int>, std::optional<MQTTReasonCodes>)> send_finished_task(
+          [this] (const bool success, const std::optional<int> response_code, const std::optional<MQTTReasonCodes> reason_code) {
+            return notify(success, response_code, reason_code);
+          });
+  response_options.context = &send_finished_task;
+
+  in_flight_message_counter_.increase();
+
+  const int error_code = MQTTAsync_sendMessage(client_, topic.c_str(), &message_to_publish, &response_options);
+  if (error_code != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_sendMessage failed on topic '%s', MQTT broker %s with error code [%d]", topic, uri_, error_code);
+    // early fail, sending attempt did not succeed, no need to wait for callback
+    in_flight_message_counter_.decrease();
+    return false;
+  }
+
+  return send_finished_task.get_future().get();
+}
+
+void PublishMQTT::checkProperties() {
+  if ((mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ == MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO)) {
+    if (isPropertyExplicitlySet(MessageExpiryInterval)) {
+      logger_->log_warn("MQTT 3.x specification does not support Message Expiry Intervals. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(ContentType)) {
+      logger_->log_warn("MQTT 3.x specification does not support Content Types. Property is not used.");
+    }
+  }
+}
+
+void PublishMQTT::checkBrokerLimitsImpl() {
+  if (retain_available_.has_value() && !*retain_available_ && retain_) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Retain was set but broker does not support it");
+  }
+}
+
+void PublishMQTT::sendSuccess(void* context, MQTTAsync_successData* /*response*/) {
+  auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool, std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+  (*send_finished_task)(true, std::nullopt, std::nullopt);
+}
+
+void PublishMQTT::sendSuccess5(void* context, MQTTAsync_successData5* response) {
+  auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool, std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+  (*send_finished_task)(true, std::nullopt, response->reasonCode);
+}
+
+void PublishMQTT::sendFailure(void* context, MQTTAsync_failureData* response) {
+  auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool, std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+  (*send_finished_task)(false, response->code, std::nullopt);
+}
+
+void PublishMQTT::sendFailure5(void* context, MQTTAsync_failureData5* response) {
+  auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool, std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+  (*send_finished_task)(false, response->code, response->reasonCode);
+}
+
+bool PublishMQTT::notify(const bool success, const std::optional<int> response_code, const std::optional<MQTTReasonCodes> reason_code) {
+  in_flight_message_counter_.decrease();
+
+  if (success) {
+    logger_->log_debug("Successfully sent message to MQTT broker %s", uri_);
+    if (reason_code.has_value()) {
+      logger_->log_error("Additional reason code for sending success: %d: %s", *reason_code, MQTTReasonCode_toString(*reason_code));

Review Comment:
   in `ConsumeMQTT`, we only use `MQTTReasonCode_toString()` in the v5.0 case; is it OK to use it here in the v3.x case, too?



##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (!context->getProperty(Topic).has_value()) {
+    logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty<bool>(Retain)) {
     retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+    message_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+    logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_);
-    yield();
+  if (!flow_file) {
     return;
   }
 
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-    return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+    const auto result = session->readBuffer(flow_file);
+    if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file), flow_file)) {
+      logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_);
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+    logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, ex.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) {
+  if (buffer.size() > 268'435'455) {

Review Comment:
   Can you give this constant a name which explains where it comes from, please?  Also, `0x0FFF'FFFF` would be better than `268'435'455`.



##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (!context->getProperty(Topic).has_value()) {
+    logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty<bool>(Retain)) {
     retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+    message_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+    logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_);
-    yield();
+  if (!flow_file) {
     return;

Review Comment:
   We usually `yield()` in cases like this; why not here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org