You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/23 18:59:23 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1034 - MQTT processors doesnt work

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a2bb1b  MINIFICPP-1034 - MQTT processors doesnt work
8a2bb1b is described below

commit 8a2bb1bd96ab24740a993b54a432fdea71cbd663
Author: Arpad Boda <ar...@gmail.com>
AuthorDate: Thu Sep 19 16:26:56 2019 +0000

    MINIFICPP-1034 - MQTT processors doesnt work
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    Approved by dbakai on GH
    
    This closes #650
---
 .../mqtt/controllerservice/MQTTControllerService.h |  8 ++--
 .../mqtt/processors/AbstractMQTTProcessor.cpp      | 45 ++++++++--------------
 extensions/mqtt/processors/AbstractMQTTProcessor.h | 29 +++++---------
 extensions/mqtt/processors/ConsumeMQTT.cpp         | 28 ++++++--------
 extensions/mqtt/processors/ConsumeMQTT.h           | 11 ++++--
 extensions/mqtt/processors/PublishMQTT.cpp         | 35 +++++++----------
 extensions/mqtt/processors/PublishMQTT.h           |  9 +++--
 7 files changed, 68 insertions(+), 97 deletions(-)

diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.h b/extensions/mqtt/controllerservice/MQTTControllerService.h
index 1633a38..cdc4ed8 100644
--- a/extensions/mqtt/controllerservice/MQTTControllerService.h
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.h
@@ -31,16 +31,16 @@
 #include "concurrentqueue.h"
 #include "MQTTClient.h"
 
-#define MQTT_QOS_0 "0"
-#define MQTT_QOS_1 "1"
-#define MQTT_QOS_2 "2"
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace controllers {
 
+static constexpr const char* const MQTT_QOS_0 = "0";
+static constexpr const char* const MQTT_QOS_1 = "1";
+static constexpr const char* const MQTT_QOS_2 = "2";
+
 class Message {
  public:
   // empty constructor facilitates moves
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 345c6c5..501746d 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -46,32 +46,15 @@ core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directo
 core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
 core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
 core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
-core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
-core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
 
-void AbstractMQTTProcessor::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(BrokerURL);
-  properties.insert(CleanSession);
-  properties.insert(ClientID);
-  properties.insert(UserName);
-  properties.insert(PassWord);
-  properties.insert(KeepLiveInterval);
-  properties.insert(ConnectionTimeOut);
-  properties.insert(QOS);
-  properties.insert(Topic);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(relationships);
-  MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer;
-  sslEnabled_ = false;
+const std::set<core::Property> AbstractMQTTProcessor::getSupportedProperties() {
+  return {BrokerURL, CleanSession, ClientID, UserName, PassWord, KeepLiveInterval, ConnectionTimeOut, QOS, Topic};
 }
 
-void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+  sslEnabled_ = false;
+  sslopts_ = MQTTClient_SSLOptions_initializer;
+
   std::string value;
   int64_t valInt;
   value = "";
@@ -131,7 +114,6 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc
   if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
     if (value == MQTT_SECURITY_PROTOCOL_SSL) {
       sslEnabled_ = true;
-      logger_->log_debug("AbstractMQTTProcessor: ssl enable");
       value = "";
       if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
         logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
@@ -180,14 +162,21 @@ bool AbstractMQTTProcessor::reconnect() {
     conn_opts.username = userName_.c_str();
     conn_opts.password = passWord_.c_str();
   }
-  if (sslEnabled_)
+  if (sslEnabled_) {
     conn_opts.ssl = &sslopts_;
-  if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
-    logger_->log_error("Failed to connect to MQTT broker %s", uri_);
+  }
+  int ret = MQTTClient_connect(client_, &conn_opts);
+  if (ret != MQTTCLIENT_SUCCESS) {
+    logger_->log_error("Failed to connect to MQTT broker %s (%d)", uri_, ret);
     return false;
   }
   if (isSubscriber_) {
-    MQTTClient_subscribe(client_, topic_.c_str(), qos_);
+    ret = MQTTClient_subscribe(client_, topic_.c_str(), qos_);
+    if(ret != MQTTCLIENT_SUCCESS) {
+      logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
+      return false;
+    }
+    logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
   }
   return true;
 }
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index cabdae4..03c118b 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -28,18 +28,18 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "MQTTClient.h"
 
-#define MQTT_QOS_0 "0"
-#define MQTT_QOS_1 "1"
-#define MQTT_QOS_2 "2"
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define MQTT_SECURITY_PROTOCOL_SSL "ssl"
+static constexpr const char* const MQTT_QOS_0 = "0";
+static constexpr const char* const MQTT_QOS_1 = "1";
+static constexpr const char* const MQTT_QOS_2 = "2";
+
+static constexpr const char* const MQTT_SECURITY_PROTOCOL_PLAINTEXT = "plaintext";
+static constexpr const char* const MQTT_SECURITY_PROTOCOL_SSL = "ssl";
 
 // AbstractMQTTProcessor Class
 class AbstractMQTTProcessor : public core::Processor {
@@ -87,10 +87,6 @@ class AbstractMQTTProcessor : public core::Processor {
   static core::Property SecurityPrivateKey;
   static core::Property SecurityPrivateKeyPassWord;
 
-  // Supported Relationships
-  static core::Relationship Failure;
-  static core::Relationship Success;
-
  public:
   /**
    * Function that's executed when the processor is scheduled.
@@ -98,15 +94,8 @@ class AbstractMQTTProcessor : public core::Processor {
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
-  // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  }
-  // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  }
-  // Initialize, over write by NiFi AbstractMQTTProcessor
-  virtual void initialize(void);
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
   // MQTT async callbacks
   static void msgDelivered(void *context, MQTTClient_deliveryToken dt) {
     AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
@@ -134,6 +123,8 @@ class AbstractMQTTProcessor : public core::Processor {
   }
 
  protected:
+  static const std::set<core::Property> getSupportedProperties();
+
   MQTTClient client_;
   MQTTClient_deliveryToken delivered_token_;
   std::string uri_;
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index 472d35f..3c8f37b 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -38,30 +38,21 @@ namespace processors {
 core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
 core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", "");
 
+core::Relationship ConsumeMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
+
 void ConsumeMQTT::initialize() {
   // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(BrokerURL);
-  properties.insert(CleanSession);
-  properties.insert(ClientID);
-  properties.insert(UserName);
-  properties.insert(PassWord);
-  properties.insert(KeepLiveInterval);
-  properties.insert(ConnectionTimeOut);
-  properties.insert(QOS);
-  properties.insert(Topic);
+  std::set<core::Property> properties(AbstractMQTTProcessor::getSupportedProperties());
   properties.insert(MaxFlowSegSize);
   properties.insert(QueueBufferMaxMessage);
   setSupportedProperties(properties);
   // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
+  setSupportedRelationships({Success});
 }
 
 bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
   if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_debug("MQTT queue full");
+    logger_->log_warn("MQTT queue full");
     return false;
   } else {
     if (message->payloadlen > maxSegSize_)
@@ -72,8 +63,8 @@ bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
   }
 }
 
-void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
-  AbstractMQTTProcessor::onSchedule(context, sessionFactory);
+void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
@@ -90,7 +81,10 @@ void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession
 
 void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   // reconnect if necessary
-  reconnect();
+  if(!reconnect()) {
+    yield();
+  }
+
   std::deque<MQTTClient_message *> msg_queue;
   getReceivedMQTTMsg(msg_queue);
   while (!msg_queue.empty()) {
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index a1ea13d..18c0b33 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -68,6 +68,9 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
   // Supported Properties
   static core::Property MaxFlowSegSize;
   static core::Property QueueBufferMaxMessage;
+
+  static core::Relationship Success;
+
   // Nest Callback Class for write stream
   class WriteCallback : public OutputStreamCallback {
    public:
@@ -92,12 +95,12 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
   // OnTrigger method, implemented by NiFi ConsumeMQTT
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi ConsumeMQTT
-  virtual void initialize(void);
-  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+  void initialize(void) override;
+  bool enqueueReceiveMQTTMsg(MQTTClient_message *message) override;
 
  protected:
   void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index 411cc2d..24ba49d 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -38,30 +38,21 @@ namespace processors {
 core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in broker", "false");
 core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
 
+core::Relationship PublishMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
+core::Relationship PublishMQTT::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
+
 void PublishMQTT::initialize() {
   // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(BrokerURL);
-  properties.insert(CleanSession);
-  properties.insert(ClientID);
-  properties.insert(UserName);
-  properties.insert(PassWord);
-  properties.insert(KeepLiveInterval);
-  properties.insert(ConnectionTimeOut);
-  properties.insert(QOS);
-  properties.insert(Topic);
+  std::set<core::Property> properties(AbstractMQTTProcessor::getSupportedProperties());
   properties.insert(Retain);
   properties.insert(MaxFlowSegSize);
   setSupportedProperties(properties);
   // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(relationships);
+  setSupportedRelationships({Success, Failure});
 }
 
-void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
-  AbstractMQTTProcessor::onSchedule(context, sessionFactory);
+void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
@@ -76,15 +67,15 @@ void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession
 }
 
 void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
-
-  if (!flowFile) {
+  if (!reconnect()) {
+    logger_->log_error("MQTT connect to %s failed", uri_);
+    yield();
     return;
   }
+	
+  std::shared_ptr<core::FlowFile> flowFile = session->get();
 
-  if (!reconnect()) {
-    logger_->log_error("MQTT connect to %s failed", uri_);
-    session->transfer(flowFile, Failure);
+  if (!flowFile) {
     return;
   }
 
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index f6c01ab..6d6c834 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -58,6 +58,9 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
   static core::Property Retain;
   static core::Property MaxFlowSegSize;
 
+  static core::Relationship Failure;
+  static core::Relationship Success;
+
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
@@ -123,11 +126,11 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
   // OnTrigger method, implemented by NiFi PublishMQTT
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi PublishMQTT
-  virtual void initialize(void);
+  void initialize(void) override;
 
  protected: