You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/23 18:59:23 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1034 - MQTT
processors doesnt work
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 8a2bb1b MINIFICPP-1034 - MQTT processors doesnt work
8a2bb1b is described below
commit 8a2bb1bd96ab24740a993b54a432fdea71cbd663
Author: Arpad Boda <ar...@gmail.com>
AuthorDate: Thu Sep 19 16:26:56 2019 +0000
MINIFICPP-1034 - MQTT processors doesnt work
Signed-off-by: Arpad Boda <ab...@apache.org>
Approved by dbakai on GH
This closes #650
---
.../mqtt/controllerservice/MQTTControllerService.h | 8 ++--
.../mqtt/processors/AbstractMQTTProcessor.cpp | 45 ++++++++--------------
extensions/mqtt/processors/AbstractMQTTProcessor.h | 29 +++++---------
extensions/mqtt/processors/ConsumeMQTT.cpp | 28 ++++++--------
extensions/mqtt/processors/ConsumeMQTT.h | 11 ++++--
extensions/mqtt/processors/PublishMQTT.cpp | 35 +++++++----------
extensions/mqtt/processors/PublishMQTT.h | 9 +++--
7 files changed, 68 insertions(+), 97 deletions(-)
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.h b/extensions/mqtt/controllerservice/MQTTControllerService.h
index 1633a38..cdc4ed8 100644
--- a/extensions/mqtt/controllerservice/MQTTControllerService.h
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.h
@@ -31,16 +31,16 @@
#include "concurrentqueue.h"
#include "MQTTClient.h"
-#define MQTT_QOS_0 "0"
-#define MQTT_QOS_1 "1"
-#define MQTT_QOS_2 "2"
-
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace controllers {
+static constexpr const char* const MQTT_QOS_0 = "0";
+static constexpr const char* const MQTT_QOS_1 = "1";
+static constexpr const char* const MQTT_QOS_2 = "2";
+
class Message {
public:
// empty constructor facilitates moves
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 345c6c5..501746d 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -46,32 +46,15 @@ core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directo
core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
-core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
-core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
-void AbstractMQTTProcessor::initialize() {
- // Set the supported properties
- std::set<core::Property> properties;
- properties.insert(BrokerURL);
- properties.insert(CleanSession);
- properties.insert(ClientID);
- properties.insert(UserName);
- properties.insert(PassWord);
- properties.insert(KeepLiveInterval);
- properties.insert(ConnectionTimeOut);
- properties.insert(QOS);
- properties.insert(Topic);
- setSupportedProperties(properties);
- // Set the supported relationships
- std::set<core::Relationship> relationships;
- relationships.insert(Success);
- relationships.insert(Failure);
- setSupportedRelationships(relationships);
- MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer;
- sslEnabled_ = false;
+const std::set<core::Property> AbstractMQTTProcessor::getSupportedProperties() {
+ return {BrokerURL, CleanSession, ClientID, UserName, PassWord, KeepLiveInterval, ConnectionTimeOut, QOS, Topic};
}
-void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+ sslEnabled_ = false;
+ sslopts_ = MQTTClient_SSLOptions_initializer;
+
std::string value;
int64_t valInt;
value = "";
@@ -131,7 +114,6 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == MQTT_SECURITY_PROTOCOL_SSL) {
sslEnabled_ = true;
- logger_->log_debug("AbstractMQTTProcessor: ssl enable");
value = "";
if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
@@ -180,14 +162,21 @@ bool AbstractMQTTProcessor::reconnect() {
conn_opts.username = userName_.c_str();
conn_opts.password = passWord_.c_str();
}
- if (sslEnabled_)
+ if (sslEnabled_) {
conn_opts.ssl = &sslopts_;
- if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
- logger_->log_error("Failed to connect to MQTT broker %s", uri_);
+ }
+ int ret = MQTTClient_connect(client_, &conn_opts);
+ if (ret != MQTTCLIENT_SUCCESS) {
+ logger_->log_error("Failed to connect to MQTT broker %s (%d)", uri_, ret);
return false;
}
if (isSubscriber_) {
- MQTTClient_subscribe(client_, topic_.c_str(), qos_);
+ ret = MQTTClient_subscribe(client_, topic_.c_str(), qos_);
+ if(ret != MQTTCLIENT_SUCCESS) {
+ logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
+ return false;
+ }
+ logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
}
return true;
}
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index cabdae4..03c118b 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -28,18 +28,18 @@
#include "core/logging/LoggerConfiguration.h"
#include "MQTTClient.h"
-#define MQTT_QOS_0 "0"
-#define MQTT_QOS_1 "1"
-#define MQTT_QOS_2 "2"
-
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
-#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define MQTT_SECURITY_PROTOCOL_SSL "ssl"
+static constexpr const char* const MQTT_QOS_0 = "0";
+static constexpr const char* const MQTT_QOS_1 = "1";
+static constexpr const char* const MQTT_QOS_2 = "2";
+
+static constexpr const char* const MQTT_SECURITY_PROTOCOL_PLAINTEXT = "plaintext";
+static constexpr const char* const MQTT_SECURITY_PROTOCOL_SSL = "ssl";
// AbstractMQTTProcessor Class
class AbstractMQTTProcessor : public core::Processor {
@@ -87,10 +87,6 @@ class AbstractMQTTProcessor : public core::Processor {
static core::Property SecurityPrivateKey;
static core::Property SecurityPrivateKeyPassWord;
- // Supported Relationships
- static core::Relationship Failure;
- static core::Relationship Success;
-
public:
/**
* Function that's executed when the processor is scheduled.
@@ -98,15 +94,8 @@ class AbstractMQTTProcessor : public core::Processor {
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
- // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- }
- // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
- virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- }
- // Initialize, over write by NiFi AbstractMQTTProcessor
- virtual void initialize(void);
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
// MQTT async callbacks
static void msgDelivered(void *context, MQTTClient_deliveryToken dt) {
AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
@@ -134,6 +123,8 @@ class AbstractMQTTProcessor : public core::Processor {
}
protected:
+ static const std::set<core::Property> getSupportedProperties();
+
MQTTClient client_;
MQTTClient_deliveryToken delivered_token_;
std::string uri_;
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index 472d35f..3c8f37b 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -38,30 +38,21 @@ namespace processors {
core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", "");
+core::Relationship ConsumeMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
+
void ConsumeMQTT::initialize() {
// Set the supported properties
- std::set<core::Property> properties;
- properties.insert(BrokerURL);
- properties.insert(CleanSession);
- properties.insert(ClientID);
- properties.insert(UserName);
- properties.insert(PassWord);
- properties.insert(KeepLiveInterval);
- properties.insert(ConnectionTimeOut);
- properties.insert(QOS);
- properties.insert(Topic);
+ std::set<core::Property> properties(AbstractMQTTProcessor::getSupportedProperties());
properties.insert(MaxFlowSegSize);
properties.insert(QueueBufferMaxMessage);
setSupportedProperties(properties);
// Set the supported relationships
- std::set<core::Relationship> relationships;
- relationships.insert(Success);
- setSupportedRelationships(relationships);
+ setSupportedRelationships({Success});
}
bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
if (queue_.size_approx() >= maxQueueSize_) {
- logger_->log_debug("MQTT queue full");
+ logger_->log_warn("MQTT queue full");
return false;
} else {
if (message->payloadlen > maxSegSize_)
@@ -72,8 +63,8 @@ bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
}
}
-void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
- AbstractMQTTProcessor::onSchedule(context, sessionFactory);
+void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+ AbstractMQTTProcessor::onSchedule(context, factory);
std::string value;
int64_t valInt;
value = "";
@@ -90,7 +81,10 @@ void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession
void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
// reconnect if necessary
- reconnect();
+ if(!reconnect()) {
+ yield();
+ }
+
std::deque<MQTTClient_message *> msg_queue;
getReceivedMQTTMsg(msg_queue);
while (!msg_queue.empty()) {
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index a1ea13d..18c0b33 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -68,6 +68,9 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
// Supported Properties
static core::Property MaxFlowSegSize;
static core::Property QueueBufferMaxMessage;
+
+ static core::Relationship Success;
+
// Nest Callback Class for write stream
class WriteCallback : public OutputStreamCallback {
public:
@@ -92,12 +95,12 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
// OnTrigger method, implemented by NiFi ConsumeMQTT
- virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi ConsumeMQTT
- virtual void initialize(void);
- virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+ void initialize(void) override;
+ bool enqueueReceiveMQTTMsg(MQTTClient_message *message) override;
protected:
void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index 411cc2d..24ba49d 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -38,30 +38,21 @@ namespace processors {
core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in broker", "false");
core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
+core::Relationship PublishMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
+core::Relationship PublishMQTT::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
+
void PublishMQTT::initialize() {
// Set the supported properties
- std::set<core::Property> properties;
- properties.insert(BrokerURL);
- properties.insert(CleanSession);
- properties.insert(ClientID);
- properties.insert(UserName);
- properties.insert(PassWord);
- properties.insert(KeepLiveInterval);
- properties.insert(ConnectionTimeOut);
- properties.insert(QOS);
- properties.insert(Topic);
+ std::set<core::Property> properties(AbstractMQTTProcessor::getSupportedProperties());
properties.insert(Retain);
properties.insert(MaxFlowSegSize);
setSupportedProperties(properties);
// Set the supported relationships
- std::set<core::Relationship> relationships;
- relationships.insert(Success);
- relationships.insert(Failure);
- setSupportedRelationships(relationships);
+ setSupportedRelationships({Success, Failure});
}
-void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
- AbstractMQTTProcessor::onSchedule(context, sessionFactory);
+void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+ AbstractMQTTProcessor::onSchedule(context, factory);
std::string value;
int64_t valInt;
value = "";
@@ -76,15 +67,15 @@ void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession
}
void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- std::shared_ptr<core::FlowFile> flowFile = session->get();
-
- if (!flowFile) {
+ if (!reconnect()) {
+ logger_->log_error("MQTT connect to %s failed", uri_);
+ yield();
return;
}
+
+ std::shared_ptr<core::FlowFile> flowFile = session->get();
- if (!reconnect()) {
- logger_->log_error("MQTT connect to %s failed", uri_);
- session->transfer(flowFile, Failure);
+ if (!flowFile) {
return;
}
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index f6c01ab..6d6c834 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -58,6 +58,9 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
static core::Property Retain;
static core::Property MaxFlowSegSize;
+ static core::Relationship Failure;
+ static core::Relationship Success;
+
// Nest Callback Class for read stream
class ReadCallback : public InputStreamCallback {
public:
@@ -123,11 +126,11 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
// OnTrigger method, implemented by NiFi PublishMQTT
- virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi PublishMQTT
- virtual void initialize(void);
+ void initialize(void) override;
protected: