You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/06/30 13:29:06 UTC

[GitHub] [nifi-minifi-cpp] adam-markovics opened a new pull request, #1363: Add support and tests for advanced MQTT features

adam-markovics opened a new pull request, #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363

   https://issues.apache.org/jira/browse/MINIFICPP-1680
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r921170250


##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -15,147 +15,167 @@
  * limitations under the License.
  */
 #include "AbstractMQTTProcessor.h"
-#include <cstdio>
 #include <memory>
 #include <string>
-#include <cinttypes>
-#include <vector>
+#include <utility>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
-  sslEnabled_ = false;
-  sslopts_ = MQTTClient_SSLOptions_initializer;
-
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
-    uri_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
-  }
-  value = "";
-  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
-    clientID_ = value;
+  if (auto value = context->getProperty(BrokerURI)) {
+    uri_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+  }
+  if (auto value = context->getProperty(ClientID)) {
+    clientID_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
   }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
   }
-  value = "";
-  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
-    userName_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+  if (auto value = context->getProperty(Username)) {
+    username_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
   }
-  value = "";
-  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
-    passWord_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+  if (auto value = context->getProperty(Password)) {
+    password_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
   }
 
-  const auto cleanSession_parsed = [&] () -> std::optional<bool> {
-    std::string property_value;
-    if (!context->getProperty(CleanSession.getName(), property_value)) return std::nullopt;
-    return utils::StringUtils::toBool(property_value);
-  }();
-  if ( cleanSession_parsed ) {
-    cleanSession_ = *cleanSession_parsed;
-    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
+  if (const auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
+    keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()});
   }
 
-  if (auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
-    keepAliveInterval_ = keep_alive_interval->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "] ms", int64_t{keepAliveInterval_.count()});
+  if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
+    max_seg_size_ = {*value};
+    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", max_seg_size_);
   }
 
-  if (auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
-    connectionTimeout_ = connection_timeout->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] ms", int64_t{connectionTimeout_.count()});
+  if (const auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+    connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()});
   }
 
-  value = "";
-  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
-      core::Property::StringToInt(value, valInt)) {
-    qos_ = valInt;
-    logger_->log_debug("AbstractMQTTProcessor: QOS [%" PRId64 "]", qos_);
+  if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+    qos_ = {*value};
+    logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
   }
-  value = "";
 
-  if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
-    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
-      sslEnabled_ = true;
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
-        securityCA_ = value;
-        sslopts_.trustStore = securityCA_.c_str();
+  if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
+    if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslOpts_ = MQTTAsync_SSLOptions_initializer;
+      if (auto value = context->getProperty(SecurityCA)) {
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", *value);
+        securityCA_ = std::move(*value);
+        sslOpts_->trustStore = securityCA_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
-        securityCert_ = value;
-        sslopts_.keyStore = securityCert_.c_str();
+      if (auto value = context->getProperty(SecurityCert)) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", *value);
+        securityCert_ = std::move(*value);
+        sslOpts_->keyStore = securityCert_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
-        securityPrivateKey_ = value;
-        sslopts_.privateKey = securityPrivateKey_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKey)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", *value);
+        securityPrivateKey_ = std::move(*value);
+        sslOpts_->privateKey = securityPrivateKey_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value);
-        securityPrivateKeyPassWord_ = value;
-        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKeyPassword)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", *value);
+        securityPrivateKeyPassword_ = std::move(*value);
+        sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
       }
     }
   }
+
+  if (auto last_will_topic = context->getProperty(LastWillTopic); last_will_topic.has_value() && !last_will_topic->empty()) {
+    last_will_ = MQTTAsync_willOptions_initializer;
+
+    logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [%s]", *last_will_topic);
+    last_will_topic_ = std::move(*last_will_topic);
+    last_will_->topicName = last_will_topic_.c_str();
+
+    if (auto value = context->getProperty(LastWillMessage)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Message [%s]", *value);
+      last_will_message_ = std::move(*value);
+      last_will_->message = last_will_message_.c_str();
+    }
+
+    if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]", *value);
+      last_will_qos_ = {*value};
+      last_will_->qos = gsl::narrow<int>(last_will_qos_);
+    }
+
+    if (const auto value = context->getProperty<bool>(LastWillRetain)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", *value);
+      last_will_retain_ = {*value};
+      last_will_->retained = last_will_retain_;
+    }
+  }
+
   if (!client_) {
-    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+    if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
+      logger_->log_error("Creating MQTT client failed");
+    }
   }
   if (client_) {
-    MQTTClient_setCallbacks(client_, this, connectionLost, msgReceived, msgDelivered);
+    MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, nullptr);

Review Comment:
   What do you mean? Checking return value of `MQTTAsync_setCallbacks`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez closed pull request #1363: MINIFICPP-1680 Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
lordgamez closed pull request #1363: MINIFICPP-1680 Add support and tests for advanced MQTT features
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r934278703


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   would it make sense to move the "Check properties" section after this to the end of `AbstractMQQTTProcessor::onSchedule`? IMO it would be really great if we could have the `base::onSchedule` call either be at the beginning or the end of this (or any) onSchedule



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r924386946


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   I see, should we extend the parent `onSchedule` (create an overload) to take these parameters? it would make these requirements explicit and impossible to accidentally omit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r925533498


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   We could extend parent's `onSchedule` with `CleanSession`, but not with `QueueBufferMaxMessage`. The former is used during connection, and that's okay. But the latter is specific to this processor, not in its sibling class `PublishMQTT`. It is used used asynchronously when receiving messages, and that could be right after connection, so its value has to be read before that. I added a comment for now.



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -15,147 +15,167 @@
  * limitations under the License.
  */
 #include "AbstractMQTTProcessor.h"
-#include <cstdio>
 #include <memory>
 #include <string>
-#include <cinttypes>
-#include <vector>
+#include <utility>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
-  sslEnabled_ = false;
-  sslopts_ = MQTTClient_SSLOptions_initializer;
-
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
-    uri_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
-  }
-  value = "";
-  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
-    clientID_ = value;
+  if (auto value = context->getProperty(BrokerURI)) {
+    uri_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+  }
+  if (auto value = context->getProperty(ClientID)) {
+    clientID_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
   }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
   }
-  value = "";
-  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
-    userName_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+  if (auto value = context->getProperty(Username)) {
+    username_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
   }
-  value = "";
-  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
-    passWord_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+  if (auto value = context->getProperty(Password)) {
+    password_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
   }
 
-  const auto cleanSession_parsed = [&] () -> std::optional<bool> {
-    std::string property_value;
-    if (!context->getProperty(CleanSession.getName(), property_value)) return std::nullopt;
-    return utils::StringUtils::toBool(property_value);
-  }();
-  if ( cleanSession_parsed ) {
-    cleanSession_ = *cleanSession_parsed;
-    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
+  if (const auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
+    keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()});
   }
 
-  if (auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
-    keepAliveInterval_ = keep_alive_interval->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "] ms", int64_t{keepAliveInterval_.count()});
+  if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
+    max_seg_size_ = {*value};
+    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", max_seg_size_);
   }
 
-  if (auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
-    connectionTimeout_ = connection_timeout->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] ms", int64_t{connectionTimeout_.count()});
+  if (const auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+    connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()});
   }
 
-  value = "";
-  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
-      core::Property::StringToInt(value, valInt)) {
-    qos_ = valInt;
-    logger_->log_debug("AbstractMQTTProcessor: QOS [%" PRId64 "]", qos_);
+  if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+    qos_ = {*value};
+    logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
   }
-  value = "";
 
-  if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
-    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
-      sslEnabled_ = true;
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
-        securityCA_ = value;
-        sslopts_.trustStore = securityCA_.c_str();
+  if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
+    if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslOpts_ = MQTTAsync_SSLOptions_initializer;
+      if (auto value = context->getProperty(SecurityCA)) {
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", *value);
+        securityCA_ = std::move(*value);
+        sslOpts_->trustStore = securityCA_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
-        securityCert_ = value;
-        sslopts_.keyStore = securityCert_.c_str();
+      if (auto value = context->getProperty(SecurityCert)) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", *value);
+        securityCert_ = std::move(*value);
+        sslOpts_->keyStore = securityCert_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
-        securityPrivateKey_ = value;
-        sslopts_.privateKey = securityPrivateKey_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKey)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", *value);
+        securityPrivateKey_ = std::move(*value);
+        sslOpts_->privateKey = securityPrivateKey_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value);
-        securityPrivateKeyPassWord_ = value;
-        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKeyPassword)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", *value);
+        securityPrivateKeyPassword_ = std::move(*value);
+        sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
       }
     }
   }
+
+  if (auto last_will_topic = context->getProperty(LastWillTopic); last_will_topic.has_value() && !last_will_topic->empty()) {
+    last_will_ = MQTTAsync_willOptions_initializer;
+
+    logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [%s]", *last_will_topic);
+    last_will_topic_ = std::move(*last_will_topic);
+    last_will_->topicName = last_will_topic_.c_str();
+
+    if (auto value = context->getProperty(LastWillMessage)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Message [%s]", *value);
+      last_will_message_ = std::move(*value);
+      last_will_->message = last_will_message_.c_str();
+    }
+
+    if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]", *value);
+      last_will_qos_ = {*value};
+      last_will_->qos = gsl::narrow<int>(last_will_qos_);
+    }
+
+    if (const auto value = context->getProperty<bool>(LastWillRetain)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", *value);
+      last_will_retain_ = {*value};
+      last_will_->retained = last_will_retain_;
+    }
+  }
+
   if (!client_) {
-    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+    if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
+      logger_->log_error("Creating MQTT client failed");
+    }
   }
   if (client_) {
-    MQTTClient_setCallbacks(client_, this, connectionLost, msgReceived, msgDelivered);
+    MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, nullptr);

Review Comment:
   Check added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r919969810


##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -15,147 +15,167 @@
  * limitations under the License.
  */
 #include "AbstractMQTTProcessor.h"
-#include <cstdio>
 #include <memory>
 #include <string>
-#include <cinttypes>
-#include <vector>
+#include <utility>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
-  sslEnabled_ = false;
-  sslopts_ = MQTTClient_SSLOptions_initializer;
-
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
-    uri_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
-  }
-  value = "";
-  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
-    clientID_ = value;
+  if (auto value = context->getProperty(BrokerURI)) {
+    uri_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+  }
+  if (auto value = context->getProperty(ClientID)) {
+    clientID_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
   }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
   }
-  value = "";
-  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
-    userName_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+  if (auto value = context->getProperty(Username)) {
+    username_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
   }
-  value = "";
-  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
-    passWord_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+  if (auto value = context->getProperty(Password)) {
+    password_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
   }
 
-  const auto cleanSession_parsed = [&] () -> std::optional<bool> {
-    std::string property_value;
-    if (!context->getProperty(CleanSession.getName(), property_value)) return std::nullopt;
-    return utils::StringUtils::toBool(property_value);
-  }();
-  if ( cleanSession_parsed ) {
-    cleanSession_ = *cleanSession_parsed;
-    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
+  if (const auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
+    keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()});
   }
 
-  if (auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
-    keepAliveInterval_ = keep_alive_interval->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "] ms", int64_t{keepAliveInterval_.count()});
+  if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
+    max_seg_size_ = {*value};
+    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", max_seg_size_);
   }
 
-  if (auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
-    connectionTimeout_ = connection_timeout->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] ms", int64_t{connectionTimeout_.count()});
+  if (const auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+    connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()});
   }
 
-  value = "";
-  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
-      core::Property::StringToInt(value, valInt)) {
-    qos_ = valInt;
-    logger_->log_debug("AbstractMQTTProcessor: QOS [%" PRId64 "]", qos_);
+  if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+    qos_ = {*value};
+    logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
   }
-  value = "";
 
-  if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
-    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
-      sslEnabled_ = true;
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
-        securityCA_ = value;
-        sslopts_.trustStore = securityCA_.c_str();
+  if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
+    if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslOpts_ = MQTTAsync_SSLOptions_initializer;
+      if (auto value = context->getProperty(SecurityCA)) {
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", *value);
+        securityCA_ = std::move(*value);
+        sslOpts_->trustStore = securityCA_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
-        securityCert_ = value;
-        sslopts_.keyStore = securityCert_.c_str();
+      if (auto value = context->getProperty(SecurityCert)) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", *value);
+        securityCert_ = std::move(*value);
+        sslOpts_->keyStore = securityCert_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
-        securityPrivateKey_ = value;
-        sslopts_.privateKey = securityPrivateKey_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKey)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", *value);
+        securityPrivateKey_ = std::move(*value);
+        sslOpts_->privateKey = securityPrivateKey_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value);
-        securityPrivateKeyPassWord_ = value;
-        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKeyPassword)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", *value);
+        securityPrivateKeyPassword_ = std::move(*value);
+        sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
       }
     }
   }
+
+  if (auto last_will_topic = context->getProperty(LastWillTopic); last_will_topic.has_value() && !last_will_topic->empty()) {
+    last_will_ = MQTTAsync_willOptions_initializer;
+
+    logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [%s]", *last_will_topic);
+    last_will_topic_ = std::move(*last_will_topic);
+    last_will_->topicName = last_will_topic_.c_str();
+
+    if (auto value = context->getProperty(LastWillMessage)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Message [%s]", *value);
+      last_will_message_ = std::move(*value);
+      last_will_->message = last_will_message_.c_str();
+    }
+
+    if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]", *value);
+      last_will_qos_ = {*value};
+      last_will_->qos = gsl::narrow<int>(last_will_qos_);
+    }
+
+    if (const auto value = context->getProperty<bool>(LastWillRetain)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", *value);
+      last_will_retain_ = {*value};
+      last_will_->retained = last_will_retain_;
+    }
+  }
+
   if (!client_) {
-    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+    if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
+      logger_->log_error("Creating MQTT client failed");
+    }
   }
   if (client_) {
-    MQTTClient_setCallbacks(client_, this, connectionLost, msgReceived, msgDelivered);
+    MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, nullptr);

Review Comment:
   should we check the return code for each MQTT function calls?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r934650630


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   Done. It could only be made with a virtual function due to `ConsumeMQTT::cleanSession_`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r919960389


##########
docker/test/integration/minifi/core/SingleNodeDockerCluster.py:
##########
@@ -142,6 +142,30 @@ def deploy(self, name):
 
         self.containers[name].deploy()
 
-    def deploy_flow(self):
+    def deploy_flow(self, container_name=None):
+        if container_name is not None:
+            if container_name not in self.containers:
+                logging.error('Could not start container because it is not found: \'%s\'', container_name)
+                return
+            self.containers[container_name].deploy()
+            return
         for container in self.containers.values():
             container.deploy()
+
+    def stop_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].stop()
+
+    def kill_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not kill container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].kill()
+
+    def restart_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop restart because it is not found: \'%s\'', container_name)

Review Comment:
   possible typo? `Could not ~stop~ restart ...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r924387965


##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -15,147 +15,167 @@
  * limitations under the License.
  */
 #include "AbstractMQTTProcessor.h"
-#include <cstdio>
 #include <memory>
 #include <string>
-#include <cinttypes>
-#include <vector>
+#include <utility>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
-  sslEnabled_ = false;
-  sslopts_ = MQTTClient_SSLOptions_initializer;
-
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
-    uri_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
-  }
-  value = "";
-  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
-    clientID_ = value;
+  if (auto value = context->getProperty(BrokerURI)) {
+    uri_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+  }
+  if (auto value = context->getProperty(ClientID)) {
+    clientID_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
   }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
   }
-  value = "";
-  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
-    userName_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+  if (auto value = context->getProperty(Username)) {
+    username_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
   }
-  value = "";
-  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
-    passWord_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+  if (auto value = context->getProperty(Password)) {
+    password_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
   }
 
-  const auto cleanSession_parsed = [&] () -> std::optional<bool> {
-    std::string property_value;
-    if (!context->getProperty(CleanSession.getName(), property_value)) return std::nullopt;
-    return utils::StringUtils::toBool(property_value);
-  }();
-  if ( cleanSession_parsed ) {
-    cleanSession_ = *cleanSession_parsed;
-    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
+  if (const auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
+    keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()});
   }
 
-  if (auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
-    keepAliveInterval_ = keep_alive_interval->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "] ms", int64_t{keepAliveInterval_.count()});
+  if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
+    max_seg_size_ = {*value};
+    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", max_seg_size_);
   }
 
-  if (auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
-    connectionTimeout_ = connection_timeout->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] ms", int64_t{connectionTimeout_.count()});
+  if (const auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+    connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()});
   }
 
-  value = "";
-  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
-      core::Property::StringToInt(value, valInt)) {
-    qos_ = valInt;
-    logger_->log_debug("AbstractMQTTProcessor: QOS [%" PRId64 "]", qos_);
+  if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+    qos_ = {*value};
+    logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
   }
-  value = "";
 
-  if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
-    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
-      sslEnabled_ = true;
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
-        securityCA_ = value;
-        sslopts_.trustStore = securityCA_.c_str();
+  if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
+    if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslOpts_ = MQTTAsync_SSLOptions_initializer;
+      if (auto value = context->getProperty(SecurityCA)) {
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", *value);
+        securityCA_ = std::move(*value);
+        sslOpts_->trustStore = securityCA_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
-        securityCert_ = value;
-        sslopts_.keyStore = securityCert_.c_str();
+      if (auto value = context->getProperty(SecurityCert)) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", *value);
+        securityCert_ = std::move(*value);
+        sslOpts_->keyStore = securityCert_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
-        securityPrivateKey_ = value;
-        sslopts_.privateKey = securityPrivateKey_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKey)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", *value);
+        securityPrivateKey_ = std::move(*value);
+        sslOpts_->privateKey = securityPrivateKey_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value);
-        securityPrivateKeyPassWord_ = value;
-        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKeyPassword)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", *value);
+        securityPrivateKeyPassword_ = std::move(*value);
+        sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
       }
     }
   }
+
+  if (auto last_will_topic = context->getProperty(LastWillTopic); last_will_topic.has_value() && !last_will_topic->empty()) {
+    last_will_ = MQTTAsync_willOptions_initializer;
+
+    logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [%s]", *last_will_topic);
+    last_will_topic_ = std::move(*last_will_topic);
+    last_will_->topicName = last_will_topic_.c_str();
+
+    if (auto value = context->getProperty(LastWillMessage)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Message [%s]", *value);
+      last_will_message_ = std::move(*value);
+      last_will_->message = last_will_message_.c_str();
+    }
+
+    if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value && (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]", *value);
+      last_will_qos_ = {*value};
+      last_will_->qos = gsl::narrow<int>(last_will_qos_);
+    }
+
+    if (const auto value = context->getProperty<bool>(LastWillRetain)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", *value);
+      last_will_retain_ = {*value};
+      last_will_->retained = last_will_retain_;
+    }
+  }
+
   if (!client_) {
-    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+    if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
+      logger_->log_error("Creating MQTT client failed");
+    }
   }
   if (client_) {
-    MQTTClient_setCallbacks(client_, this, connectionLost, msgReceived, msgDelivered);
+    MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, nullptr);

Review Comment:
   yes, I don't know which `MQTT*` functions return a "success/error", but this one seems to do so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r919960389


##########
docker/test/integration/minifi/core/SingleNodeDockerCluster.py:
##########
@@ -142,6 +142,30 @@ def deploy(self, name):
 
         self.containers[name].deploy()
 
-    def deploy_flow(self):
+    def deploy_flow(self, container_name=None):
+        if container_name is not None:
+            if container_name not in self.containers:
+                logging.error('Could not start container because it is not found: \'%s\'', container_name)
+                return
+            self.containers[container_name].deploy()
+            return
         for container in self.containers.values():
             container.deploy()
+
+    def stop_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].stop()
+
+    def kill_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not kill container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].kill()
+
+    def restart_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop restart because it is not found: \'%s\'', container_name)

Review Comment:
   possible typo? 
   > Could not ~stop~ restart ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r920201760


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   why move the parent onSchedule here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r921169394


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   Because in parent `onSchedule` we connect to the broker and also messages can arrive from that point. So the properties before the call (`CleanSession` and `QueueBufferMaxMessage`) need to be read before connecting.



##########
docker/test/integration/minifi/core/SingleNodeDockerCluster.py:
##########
@@ -142,6 +142,30 @@ def deploy(self, name):
 
         self.containers[name].deploy()
 
-    def deploy_flow(self):
+    def deploy_flow(self, container_name=None):
+        if container_name is not None:
+            if container_name not in self.containers:
+                logging.error('Could not start container because it is not found: \'%s\'', container_name)
+                return
+            self.containers[container_name].deploy()
+            return
         for container in self.containers.values():
             container.deploy()
+
+    def stop_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].stop()
+
+    def kill_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not kill container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].kill()
+
+    def restart_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop restart because it is not found: \'%s\'', container_name)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1363: Add support and tests for advanced MQTT features

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r921169496


##########
docker/test/integration/minifi/core/SingleNodeDockerCluster.py:
##########
@@ -142,6 +142,30 @@ def deploy(self, name):
 
         self.containers[name].deploy()
 
-    def deploy_flow(self):
+    def deploy_flow(self, container_name=None):
+        if container_name is not None:
+            if container_name not in self.containers:
+                logging.error('Could not start container because it is not found: \'%s\'', container_name)
+                return
+            self.containers[container_name].deploy()
+            return
         for container in self.containers.values():
             container.deploy()
+
+    def stop_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].stop()
+
+    def kill_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not kill container because it is not found: \'%s\'', container_name)
+            return
+        self.containers[container_name].kill()
+
+    def restart_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop restart because it is not found: \'%s\'', container_name)

Review Comment:
   Good point! Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org