You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/24 16:24:33 UTC

[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

lordgamez commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1031637892


##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContex
 
 void AbstractMQTTProcessor::reconnect() {
   if (!client_) {
-    logger_->log_error("MQTT client is not existing while trying to reconnect");
-    return;
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect");
   }
   if (MQTTAsync_isConnected(client_)) {
-    logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
     return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
+
+  MQTTAsync_connectOptions conn_opts;
+  MQTTProperties connect_props = MQTTProperties_initializer;
+  MQTTProperties will_props = MQTTProperties_initializer;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    conn_opts = MQTTAsync_connectOptions_initializer5;
+    conn_opts.onSuccess5 = connectionSuccess5;
+    conn_opts.onFailure5 = connectionFailure5;
+    conn_opts.connectProperties = &connect_props;
+  } else {
+    conn_opts = MQTTAsync_connectOptions_initializer;
+    conn_opts.onSuccess = connectionSuccess;
+    conn_opts.onFailure = connectionFailure;
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+    conn_opts.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+    conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
+  }
+
   conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    setMqtt5ConnectOptions(conn_opts, connect_props, will_props);

Review Comment:
   I think we could refactor this to have a `setConnectOptions` that calls the specific `setMqtt5ConnectOptions` and `setMqtt3ConnectOptions` functions depending on the version and move all `conn_opts` operations there. That would make reading the version specific option settings easier.



##########
PROCESSORS.md:
##########
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                                                 |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI**        |               |                  | The URI to use to connect to the MQTT broker                                                                                |
-| **Topic**             |               |                  | The topic to subscribe to                                                                                                   |
-| Client ID             |               |                  | MQTT client ID to use                                                                                                       |
-| Quality of Service    | 0             |                  | The Quality of Service (QoS) to receive the message with. Accepts three values '0', '1' and '2'                             |
-| Connection Timeout    | 30 sec        |                  | Maximum time interval the client will wait for the network connection to the MQTT broker                                    |
-| Keep Alive Interval   | 60 sec        |                  | Defines the maximum time interval between messages being sent to the broker                                                 |
-| Max Flow Segment Size |               |                  | Maximum flow content payload segment size for the MQTT record                                                               |
-| Last Will Topic       |               |                  | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent            |
-| Last Will Message     |               |                  | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker |
-| Last Will QoS         | 0             |                  | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'                              |
-| Last Will Retain      | false         |                  | Whether to retain the client's Last Will                                                                                    |
-| Security Protocol     |               |                  | Protocol used to communicate with brokers                                                                                   |
-| Security CA           |               |                  | File or directory path to CA certificate(s) for verifying the broker's key                                                  |
-| Security Cert         |               |                  | Path to client's public key (PEM) used for authentication                                                                   |
-| Security Private Key  |               |                  | Path to client's private key (PEM) used for authentication                                                                  |
-| Security Pass Phrase  |               |                  | Private key passphrase                                                                                                      |
-| Username              |               |                  | Username to use when connecting to the broker                                                                               |
-| Password              |               |                  | Password to use when connecting to the broker                                                                               |
-| Clean Session         | true          |                  | Whether to start afresh rather than remembering previous subscriptions                                                      |
-| Queue Max Message     | 1000          |                  | Maximum number of messages allowed on the received MQTT queue                                                               |
+| Name                        | Default Value | Allowable Values            | Description                                                                                                                                                 |
+|-----------------------------|---------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**              |               |                             | The URI to use to connect to the MQTT broker                                                                                                                |
+| Client ID                   |               |                             | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!                                                                                    |
+| MQTT Version                | 3.x AUTO      | 3.x AUTO, 3.1.0, 3.1.1, 5.0 | The MQTT specification version when connecting to the broker.                                                                                               |
+| **Topic**                   |               |                             | The topic to subscribe to.                                                                                                                                  |
+| Clean Session               | true          |                             | Whether to start afresh rather than remembering previous subscriptions. Also make broker remember subscriptions after disconnected. WARNING: MQTT 3.x only. |
+| Clean Start                 | true          |                             | Whether to start afresh rather than remembering previous subscriptions. WARNING: MQTT 5.x only.                                                             |

Review Comment:
   Could these two properties be merged to one and used appropriately with each specific MQTT version?



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  // read lock

Review Comment:
   I don't think we can be sure that this is always a read lock as it depends on the implementations of the `onTriggerImpl` in its subclasses. We can remove the comment.



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()});
 
   if (const auto value = context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {

Review Comment:
   I think we rely too much on the mqtt_version_ and a lot of the function implementations are if-else branches depending on this version. A cleaner solution would be to collect these differing functionalities, create a separate abstract class for these functions with two separate inherited class implementations for the two versions. After we have the version in `onSchedule` we can instantiate the proper one and call the proper implementation in each case where it differs (strategy pattern). If needed the common parts can also be moved to the base class and only the differences implemented in the inherited classes (template method pattern).



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+    // we are shutting down
+    return;
+  }
+
+  // reconnect if needed

Review Comment:
   I'm not sure this comment adds value.



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+    // we are shutting down

Review Comment:
   Can we add a trace/debug log instead of this comment?



##########
extensions/mqtt/processors/PublishMQTT.h:
##########
@@ -62,72 +68,116 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  class ReadCallback {
+  void readProperties(const std::shared_ptr<core::ProcessContext>& context) override;
+  void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+  void initialize() override;
+
+ private:
+  /**
+   * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's Receive Maximum
+   */
+  class InFlightMessageCounter {
    public:
-    ReadCallback(PublishMQTT* processor, uint64_t flow_size, uint64_t max_seg_size, std::string topic, MQTTAsync client, int qos, bool retain)
-        : processor_(processor),
-          flow_size_(flow_size),
-          max_seg_size_(max_seg_size),
-          topic_(std::move(topic)),
-          client_(client),
-          qos_(qos),
-          retain_(retain) {
+    void setMqttVersion(const MqttVersions mqtt_version) {
+      mqtt_version_ = mqtt_version;
+    }
+
+    void setQoS(const MqttQoS qos) {
+      qos_ = qos;
+    }
+
+    void setMax(const uint16_t new_limit) {
+      limit_ = new_limit;
     }
 
-    int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
+    // increase on sending, wait if limit is reached
+    void increase();
 
-    size_t read_size_ = 0;
-    int status_ = 0;
+    // decrease on success or failure, notify
+    void decrease();
 
    private:
-    PublishMQTT* processor_;
-    uint64_t flow_size_;
-    uint64_t max_seg_size_;
-    std::string topic_;
-    MQTTAsync client_;
-
-    int qos_;
-    bool retain_;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+    uint16_t counter_{0};
+    uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
+    MqttVersions mqtt_version_;
+    MqttQoS qos_;
   };
 
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
-  void initialize() override;
+  // MQTT static async callbacks, calling their notify with context being pointer to a packaged_task to notify()
+  static void sendSuccess(void* context, MQTTAsync_successData* response);
+  static void sendSuccess5(void* context, MQTTAsync_successData5* response);
+  static void sendFailure(void* context, MQTTAsync_failureData* response);
+  static void sendFailure5(void* context, MQTTAsync_failureData5* response);
+
+  /**
+   * Resolves topic from expression language
+   */
+  std::string getTopic(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Resolves content type from expression language
+   */
+  std::string getContentType(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Sends an MQTT message asynchronously
+   * @param buffer contents of the message
+   * @param topic topic of the message
+   * @param content_type Content Type for MQTT 5
+   * @param flow_file Flow File being processed
+   * @return success of message sending
+   */
+  bool sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file);
+
+  /**
+   * Callback for asynchronous message sending
+   * @param success if message sending was successful
+   * @param response_code response code for failure only
+   * @param reason_code MQTT 5 reason code
+   * @return if message sending was successful
+   */
+  bool notify(bool success, std::optional<int> response_code, std::optional<MQTTReasonCodes> reason_code);
+
+  /**
+   * Set MQTT 5-exclusive properties
+   * @param message message object
+   * @param content_type content type
+   * @param flow_file Flow File being processed
+   */
+  void setMqtt5Properties(MQTTAsync_message& message, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Adds flow file attributes as user properties to an MQTT 5 message
+   * @param message message object
+   * @param flow_file Flow File being processed
+   */
+  static void addAttributesAsUserProperties(MQTTAsync_message& message, const std::shared_ptr<core::FlowFile>& flow_file);
 
- private:
-  // MQTT async callback
-  static void sendSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendSuccess(response);
-  }
-
-  // MQTT async callback
-  static void sendFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendFailure(response);
+  bool getCleanSession() const override {
+    return true;
   }
 
-  void onSendSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_debug("Successfully sent message to MQTT topic %s on broker %s", topic_, uri_);
+  bool getCleanStart() const override {
+    return true;
   }
 
-  void onSendFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Sending message failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for sending failure: %s", response->message);
-    }
+  std::chrono::seconds getSessionExpiryInterval() const override {
+    // non-persistent session as we only publish
+    return std::chrono::seconds{0};
   }
 
-  bool getCleanSession() const override {
-    return true;
+  void startupClient() override {
+    // there is no need to do anything like subscribe in the beginning
   }
 
-  bool startupClient() override {
-    // there is no need to do anything like subscribe on the beginning
-    return true;
-  }
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
 
   bool retain_ = false;
+  std::optional<std::chrono::seconds> message_expiry_interval_;
+  InFlightMessageCounter in_flight_message_counter_;

Review Comment:
   This may be a good candidate for a processor specific metric.



##########
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##########
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
     freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+    (V_3X_AUTO, "3.x AUTO"),
+    (V_3_1_0, "3.1.0"),
+    (V_3_1_1, "3.1.1"),
+    (V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+    (LEVEL_0, "0"),
+    (LEVEL_1, "1"),
+    (LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+    return std::array{
+      BrokerURI,
+      ClientID,
+      MqttVersion
+    };
+  }
+
+  static auto advancedProperties() {
     return std::array{
-            BrokerURI,
-            Topic,
-            ClientID,
-            QoS,
-            ConnectionTimeout,
-            KeepAliveInterval,
-            MaxFlowSegSize,
-            LastWillTopic,
-            LastWillMessage,
-            LastWillQoS,
-            LastWillRetain,
-            Username,
-            Password,
-            SecurityProtocol,
-            SecurityCA,
-            SecurityCert,
-            SecurityPrivateKey,
-            SecurityPrivateKeyPassword
+      QoS,
+      ConnectionTimeout,
+      KeepAliveInterval,
+      LastWillTopic,
+      LastWillMessage,
+      LastWillQoS,
+      LastWillRetain,
+      LastWillContentType,
+      Username,
+      Password,
+      SecurityProtocol,
+      SecurityCA,
+      SecurityCert,
+      SecurityPrivateKey,
+      SecurityPrivateKeyPassword
     };
   }
 
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
 
   void notifyStop() override {
     freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+    void operator()(MQTTAsync_message* message) {
+      MQTTAsync_freeMessage(&message);
+    }
+  };
+
+  struct SmartMessage {
+    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> contents;
+    std::string topic;
+  };
+
+  // defined by Paho MQTT C library
+  static constexpr int PAHO_MQTT_C_FAILURE_CODE = -9999999;
+  static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535;
+
+  /**
+   * Connect to MQTT broker. Synchronously waits until connection succeeds or fails.
+   */
   void reconnect();
 
+  /**
+   * Checks property consistency before connecting to broker
+   */
+  virtual void checkProperties() {
+  }
+
+  /**
+   * Checks broker limits and supported features vs our desired features after connecting to broker
+   */
+  void checkBrokerLimits();
+  virtual void checkBrokerLimitsImpl() = 0;
+
+  // variables being used for a synchronous connection and disconnection
+  std::shared_mutex client_mutex_;
+
   MQTTAsync client_ = nullptr;
   std::string uri_;
-  std::string topic_;
   std::chrono::seconds keep_alive_interval_{60};
-  uint64_t max_seg_size_ = std::numeric_limits<uint64_t>::max();
-  std::chrono::seconds connection_timeout_{30};
-  uint32_t qos_ = MQTT_QOS_1;
+  std::chrono::seconds connection_timeout_{10};
+  MqttQoS qos_{MqttQoS::LEVEL_0};
   std::string clientID_;
   std::string username_;
   std::string password_;
+  MqttVersions mqtt_version_{MqttVersions::V_3X_AUTO};
 
- private:
-  // MQTT async callback
-  static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onMessageReceived(topic_name, topic_len, message);
-    return 1;
-  }
-
-  // MQTT async callback
-  static void connectionLost(void *context, char* cause) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionLost(cause);
-  }
-
-  // MQTT async callback
-  static void connectionSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionSuccess(response);
-  }
+  // Supported operations
+  std::optional<bool> retain_available_;
+  std::optional<bool> wildcard_subscription_available_;
+  std::optional<bool> shared_subscription_available_;
 
-  // MQTT async callback
-  static void connectionFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionFailure(response);
-  }
+  std::optional<uint16_t> broker_topic_alias_maximum_;
+  std::optional<uint16_t> broker_receive_maximum_;
+  std::optional<uint8_t> maximum_qos_;
+  std::optional<uint32_t> maximum_packet_size_;
 
-  // MQTT async callback
-  static void disconnectionSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionSuccess(response);
-  }
-
-  // MQTT async callback
-  static void disconnectionFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionFailure(response);
-  }
+  std::optional<std::chrono::seconds> maximum_session_expiry_interval_;
+  std::optional<std::chrono::seconds> server_keep_alive_;
 
-  virtual void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
-    MQTTAsync_freeMessage(&message);
-    MQTTAsync_free(topic_name);
-  }
-
-  void onConnectionLost(char* cause) {
-    logger_->log_error("Connection lost to MQTT broker %s", uri_);
-    if (cause != nullptr) {
-      logger_->log_error("Cause for connection loss: %s", cause);
-    }
-  }
-
-  void onConnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
-    startupClient();
-  }
+ private:
+  /**
+   * Initializes local MQTT client and connects to broker.
+   */
+  void initializeClient();
 
-  void onConnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for connection failure: %s", response->message);
-    }
-  }
+  /**
+   * Calls disconnect() and releases local MQTT client
+   */
+  void freeResources();
 
-  void onDisconnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
-  }
+  /**
+   * Disconnect from MQTT broker. Synchronously waits until disconnection succeeds or fails.
+   */
+  void disconnect();
+
+  virtual void readProperties(const std::shared_ptr<core::ProcessContext>& context) = 0;
+  virtual void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) = 0;
+  virtual void startupClient() = 0;
+  void setBrokerLimits(MQTTAsync_successData5* response);
+
+  // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this"
+  static void connectionLost(void *context, char* cause);
+  static void connectionSuccess(void* context, MQTTAsync_successData* response);
+  static void connectionSuccess5(void* context, MQTTAsync_successData5* response);
+  static void connectionFailure(void* context, MQTTAsync_failureData* response);
+  static void connectionFailure5(void* context, MQTTAsync_failureData5* response);
+  static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message);
+
+  // MQTT async callback methods
+  void onConnectionLost(char* cause);
+  void onConnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5);
+  void onDisconnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5);
 
-  void onDisconnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for disconnection failure: %s", response->message);
-    }
+  /**
+   * Called if message is received. This is default implementation, to be overridden if subclass wants to use the message.
+   * @param topic topic of message
+   * @param message MQTT message
+   */
+  virtual void onMessageReceived(SmartMessage /*smartmessage*/) {
   }
 
   virtual bool getCleanSession() const = 0;
-  virtual bool startupClient() = 0;
-
-  void freeResources();
-
-  /**
-   * Checks property consistency before connecting to broker
-   */
-  virtual void checkProperties() {
+  virtual bool getCleanStart() const = 0;
+  virtual std::chrono::seconds getSessionExpiryInterval() const = 0;
+  void setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const;
+  virtual void setMqtt5ConnectOptionsImpl(MQTTProperties& /*connect_props*/) const {

Review Comment:
   Could the `setMqtt5ConnectOptionsImpl` be removed and the `setMqtt5ConnectOptions` made virtual? In that case we could just override that if needed and call the base class's implementation additionally before adding our own implementation. I could work with `onTrigger` and `onTriggerImpl` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org