You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/12/08 13:24:49 UTC

[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

lordgamez commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1043349754


##########
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##########
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
     freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+    (V_3X_AUTO, "3.x AUTO"),
+    (V_3_1_0, "3.1.0"),
+    (V_3_1_1, "3.1.1"),
+    (V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+    (LEVEL_0, "0"),
+    (LEVEL_1, "1"),
+    (LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+    return std::array{
+      BrokerURI,
+      ClientID,
+      MqttVersion
+    };
+  }
+
+  static auto advancedProperties() {
     return std::array{
-            BrokerURI,
-            Topic,
-            ClientID,
-            QoS,
-            ConnectionTimeout,
-            KeepAliveInterval,
-            MaxFlowSegSize,
-            LastWillTopic,
-            LastWillMessage,
-            LastWillQoS,
-            LastWillRetain,
-            Username,
-            Password,
-            SecurityProtocol,
-            SecurityCA,
-            SecurityCert,
-            SecurityPrivateKey,
-            SecurityPrivateKeyPassword
+      QoS,
+      ConnectionTimeout,
+      KeepAliveInterval,
+      LastWillTopic,
+      LastWillMessage,
+      LastWillQoS,
+      LastWillRetain,
+      LastWillContentType,
+      Username,
+      Password,
+      SecurityProtocol,
+      SecurityCA,
+      SecurityCert,
+      SecurityPrivateKey,
+      SecurityPrivateKeyPassword
     };
   }
 
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
 
   void notifyStop() override {
     freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+    void operator()(MQTTAsync_message* message) {
+      MQTTAsync_freeMessage(&message);
+    }
+  };
+
+  struct SmartMessage {
+    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> contents;
+    std::string topic;
+  };
+
+  // defined by Paho MQTT C library
+  static constexpr int PAHO_MQTT_C_FAILURE_CODE = -9999999;
+  static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535;
+
+  /**
+   * Connect to MQTT broker. Synchronously waits until connection succeeds or fails.
+   */
   void reconnect();
 
+  /**
+   * Checks property consistency before connecting to broker
+   */
+  virtual void checkProperties() {
+  }
+
+  /**
+   * Checks broker limits and supported features vs our desired features after connecting to broker
+   */
+  void checkBrokerLimits();
+  virtual void checkBrokerLimitsImpl() = 0;
+
+  // variables being used for a synchronous connection and disconnection
+  std::shared_mutex client_mutex_;
+
   MQTTAsync client_ = nullptr;
   std::string uri_;
-  std::string topic_;
   std::chrono::seconds keep_alive_interval_{60};
-  uint64_t max_seg_size_ = std::numeric_limits<uint64_t>::max();
-  std::chrono::seconds connection_timeout_{30};
-  uint32_t qos_ = MQTT_QOS_1;
+  std::chrono::seconds connection_timeout_{10};
+  MqttQoS qos_{MqttQoS::LEVEL_0};
   std::string clientID_;
   std::string username_;
   std::string password_;
+  MqttVersions mqtt_version_{MqttVersions::V_3X_AUTO};
 
- private:
-  // MQTT async callback
-  static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onMessageReceived(topic_name, topic_len, message);
-    return 1;
-  }
-
-  // MQTT async callback
-  static void connectionLost(void *context, char* cause) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionLost(cause);
-  }
-
-  // MQTT async callback
-  static void connectionSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionSuccess(response);
-  }
+  // Supported operations
+  std::optional<bool> retain_available_;
+  std::optional<bool> wildcard_subscription_available_;
+  std::optional<bool> shared_subscription_available_;
 
-  // MQTT async callback
-  static void connectionFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionFailure(response);
-  }
+  std::optional<uint16_t> broker_topic_alias_maximum_;
+  std::optional<uint16_t> broker_receive_maximum_;
+  std::optional<uint8_t> maximum_qos_;
+  std::optional<uint32_t> maximum_packet_size_;
 
-  // MQTT async callback
-  static void disconnectionSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionSuccess(response);
-  }
-
-  // MQTT async callback
-  static void disconnectionFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionFailure(response);
-  }
+  std::optional<std::chrono::seconds> maximum_session_expiry_interval_;
+  std::optional<std::chrono::seconds> server_keep_alive_;
 
-  virtual void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) {
-    MQTTAsync_freeMessage(&message);
-    MQTTAsync_free(topic_name);
-  }
-
-  void onConnectionLost(char* cause) {
-    logger_->log_error("Connection lost to MQTT broker %s", uri_);
-    if (cause != nullptr) {
-      logger_->log_error("Cause for connection loss: %s", cause);
-    }
-  }
-
-  void onConnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
-    startupClient();
-  }
+ private:
+  /**
+   * Initializes local MQTT client and connects to broker.
+   */
+  void initializeClient();
 
-  void onConnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for connection failure: %s", response->message);
-    }
-  }
+  /**
+   * Calls disconnect() and releases local MQTT client
+   */
+  void freeResources();
 
-  void onDisconnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
-  }
+  /**
+   * Disconnect from MQTT broker. Synchronously waits until disconnection succeeds or fails.
+   */
+  void disconnect();
+
+  virtual void readProperties(const std::shared_ptr<core::ProcessContext>& context) = 0;
+  virtual void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) = 0;
+  virtual void startupClient() = 0;
+  void setBrokerLimits(MQTTAsync_successData5* response);
+
+  // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this"
+  static void connectionLost(void *context, char* cause);
+  static void connectionSuccess(void* context, MQTTAsync_successData* response);
+  static void connectionSuccess5(void* context, MQTTAsync_successData5* response);
+  static void connectionFailure(void* context, MQTTAsync_failureData* response);
+  static void connectionFailure5(void* context, MQTTAsync_failureData5* response);
+  static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message);
+
+  // MQTT async callback methods
+  void onConnectionLost(char* cause);
+  void onConnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5);
+  void onDisconnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5);
 
-  void onDisconnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for disconnection failure: %s", response->message);
-    }
+  /**
+   * Called if message is received. This is default implementation, to be overridden if subclass wants to use the message.
+   * @param topic topic of message
+   * @param message MQTT message
+   */
+  virtual void onMessageReceived(SmartMessage /*smartmessage*/) {
   }
 
   virtual bool getCleanSession() const = 0;
-  virtual bool startupClient() = 0;
-
-  void freeResources();
-
-  /**
-   * Checks property consistency before connecting to broker
-   */
-  virtual void checkProperties() {
+  virtual bool getCleanStart() const = 0;
+  virtual std::chrono::seconds getSessionExpiryInterval() const = 0;
+  void setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const;
+  virtual void setMqtt5ConnectOptionsImpl(MQTTProperties& /*connect_props*/) const {

Review Comment:
   True, although maybe it's the naming that doesn't really fit here, maybe it could be something like `setProcessorSpecificMqtt5ConnectOptions`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org