You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by minifirocks <gi...@git.apache.org> on 2018/01/04 16:58:45 UTC

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

GitHub user minifirocks opened a pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228

    MINIFICPP-342: MQTT extension

    Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
         in the commit message?
    
    - [ ] Does your PR title start with MINIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] If applicable, have you updated the LICENSE file?
    - [ ] If applicable, have you updated the NOTICE file?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_dev

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi-minifi-cpp/pull/228.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #228
    
----
commit cc47483e04164102b2382a78bf68d10f4e8f5efe
Author: Bin Qiu <be...@...>
Date:   2018-01-04T16:23:14Z

    MINIFICPP-342: MQTT extension

----


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794068
  
    --- Diff: extensions/mqtt/ConsumeMQTT.h ---
    @@ -0,0 +1,125 @@
    +/**
    + * @file ConsumeMQTT.h
    + * ConsumeMQTT class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __CONSUME_MQTT_H__
    +#define __CONSUME_MQTT_H__
    +
    +#include <climits>
    +#include <deque>
    +#include "FlowFileRecord.h"
    +#include "core/Processor.h"
    +#include "core/ProcessSession.h"
    +#include "core/Core.h"
    +#include "core/Resource.h"
    +#include "core/Property.h"
    +#include "core/logging/LoggerConfiguration.h"
    +#include "MQTTClient.h"
    +#include "AbstractMQTTProcessor.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
    +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
    +
    +// ConsumeMQTT Class
    +class ConsumeMQTT: public processors::AbstractMQTTProcessor {
    +public:
    +  // Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
    +    : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) {
    +    isSubscriber_ = true;
    +    maxQueueSize_ = 100;
    +  }
    +  // Destructor
    +  virtual ~ConsumeMQTT() {
    +    std::lock_guard < std::mutex > lock(mutex_);
    +    while (!queue_.empty()) {
    +      MQTTClient_message *message = queue_.front();
    +      MQTTClient_freeMessage(&message);
    +      queue_.pop_front();
    +    }
    +  }
    +  // Processor Name
    +  static constexpr char const* ProcessorName = "ConsumeMQTT";
    +  // Supported Properties
    +  static core::Property MaxQueueSize;
    +  // Nest Callback Class for write stream
    +  class WriteCallback: public OutputStreamCallback {
    +  public:
    +    WriteCallback(MQTTClient_message *message) :
    +      message_(message) {
    +      status_ = 0;
    +    }
    +    MQTTClient_message *message_;
    +    int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +      int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen);
    +      if (len < 0)
    +        status_ = -1;
    +      return len;
    +    }
    +    int status_;
    +  };
    +
    +public:
    +  /**
    +   * Function that's executed when the processor is scheduled.
    +   * @param context process context.
    +   * @param sessionFactory process session factory that is used when creating
    +   * ProcessSession objects.
    +   */
    +  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
    +  // OnTrigger method, implemented by NiFi ConsumeMQTT
    +  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
    +  // Initialize, over write by NiFi ConsumeMQTT
    +  virtual void initialize(void);
    +  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
    +
    +protected:
    +  void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
    +    std::lock_guard < std::mutex > lock(mutex_);
    --- End diff --
    
    Could you use a lock free queue here? might not save much but may increase throughput and we have one in our code base already.


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159793508
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
    @@ -0,0 +1,154 @@
    +/**
    + * @file AbstractMQTTProcessor.h
    + * AbstractMQTTProcessor class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __ABSTRACTMQTT_H__
    +#define __ABSTRACTMQTT_H__
    +
    +#include "FlowFileRecord.h"
    +#include "core/Processor.h"
    +#include "core/ProcessSession.h"
    +#include "core/Core.h"
    +#include "core/Resource.h"
    +#include "core/logging/LoggerConfiguration.h"
    +#include "MQTTClient.h"
    +
    +#define MQTT_QOS_0 "0"
    +#define MQTT_QOS_1 "1"
    +#define MQTT_QOS_2 "2"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +// AbstractMQTTProcessor Class
    +class AbstractMQTTProcessor : public core::Processor {
    + public:
    +  // Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
    +      : core::Processor(name, uuid),
    +        logger_(logging::LoggerFactory<AbstractMQTTProcessor>::getLogger()) {
    +    client_ = nullptr;
    +    cleanSession_ = false;
    +    keepAliveInterval_ = 60;
    +    connectionTimeOut_ = 30;
    +    qos_ = 0;
    +    isSubscriber_ = false;
    +  }
    +  // Destructor
    +  virtual ~AbstractMQTTProcessor() {
    +    if (isSubscriber_) {
    +      MQTTClient_unsubscribe(client_, topic_.c_str());
    --- End diff --
    
    what happens if unsubscribe is not called due to failure? Is that eventually okay?


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_);
    --- End diff --
    
    In the cases where you are using int64, please use %ll instead of %d 


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794693
  
    --- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
    @@ -0,0 +1,86 @@
    +#*******************************************************************************
    +#  Copyright (c) 2015, 2017 logi.cals GmbH and others
    +#
    +#  All rights reserved. This program and the accompanying materials
    +#  are made available under the terms of the Eclipse Public License v1.0
    +#  and Eclipse Distribution License v1.0 which accompany this distribution.
    +#
    +#  The Eclipse Public License is available at
    +#     http://www.eclipse.org/legal/epl-v10.html
    +#  and the Eclipse Distribution License is available at
    +#    http://www.eclipse.org/org/documents/edl-v10.php.
    +#
    +#  Contributors:
    +#     Rainer Poisel - initial version
    +#     Genis Riera Perez - Add support for building debian package
    +#*******************************************************************************/
    +
    +# Note: on OS X you should install XCode and the associated command-line tools
    +
    +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
    +PROJECT("paho" C)
    +MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
    +MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
    +
    +SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
    +SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
    +
    +## build settings
    +SET(PAHO_VERSION_MAJOR 1)
    +SET(PAHO_VERSION_MINOR 2)
    +SET(PAHO_VERSION_PATCH 0)
    +SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
    +
    +INCLUDE(GNUInstallDirs)
    +
    +STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
    +MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
    +
    +IF(WIN32)
    +  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
    +ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
    +  ADD_DEFINITIONS(-DOSX)
    +ENDIF()
    +
    +## build options
    +SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ")
    +SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
    +SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)")
    +SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
    +SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
    +SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
    +
    +ADD_SUBDIRECTORY(src)
    +IF(PAHO_BUILD_SAMPLES)
    +    ADD_SUBDIRECTORY(src/samples)
    +ENDIF()
    +
    +IF(PAHO_BUILD_DOCUMENTATION)
    +    ADD_SUBDIRECTORY(doc)
    +ENDIF()
    +
    +### packaging settings
    +IF (WIN32)
    +    SET(CPACK_GENERATOR "ZIP")
    +ELSEIF(PAHO_BUILD_DEB_PACKAGE)
    +    SET(CPACK_GENERATOR "DEB")
    +    CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
    +        ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
    +    SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
    +    ADD_SUBDIRECTORY(debian)
    +ELSE()
    +    SET(CPACK_GENERATOR "TGZ")
    +ENDIF()
    +
    +SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
    +SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
    +SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
    +INCLUDE(CPack)
    +
    +IF(PAHO_ENABLE_TESTING)
    --- End diff --
    
    probably don't need tests, right?


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097
  
    --- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
    @@ -0,0 +1,86 @@
    +#*******************************************************************************
    +#  Copyright (c) 2015, 2017 logi.cals GmbH and others
    +#
    +#  All rights reserved. This program and the accompanying materials
    +#  are made available under the terms of the Eclipse Public License v1.0
    +#  and Eclipse Distribution License v1.0 which accompany this distribution.
    +#
    +#  The Eclipse Public License is available at
    +#     http://www.eclipse.org/legal/epl-v10.html
    +#  and the Eclipse Distribution License is available at
    +#    http://www.eclipse.org/org/documents/edl-v10.php.
    +#
    +#  Contributors:
    +#     Rainer Poisel - initial version
    +#     Genis Riera Perez - Add support for building debian package
    +#*******************************************************************************/
    +
    +# Note: on OS X you should install XCode and the associated command-line tools
    +
    +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
    +PROJECT("paho" C)
    +MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
    +MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
    +
    +SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
    +SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
    +
    +## build settings
    +SET(PAHO_VERSION_MAJOR 1)
    +SET(PAHO_VERSION_MINOR 2)
    +SET(PAHO_VERSION_PATCH 0)
    +SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
    +
    +INCLUDE(GNUInstallDirs)
    +
    +STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
    +MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
    +
    +IF(WIN32)
    +  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
    +ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
    +  ADD_DEFINITIONS(-DOSX)
    +ENDIF()
    +
    +## build options
    +SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ")
    +SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
    +SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)")
    +SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
    +SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
    +SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
    +
    +ADD_SUBDIRECTORY(src)
    +IF(PAHO_BUILD_SAMPLES)
    +    ADD_SUBDIRECTORY(src/samples)
    +ENDIF()
    +
    +IF(PAHO_BUILD_DOCUMENTATION)
    +    ADD_SUBDIRECTORY(doc)
    +ENDIF()
    +
    +### packaging settings
    +IF (WIN32)
    +    SET(CPACK_GENERATOR "ZIP")
    +ELSEIF(PAHO_BUILD_DEB_PACKAGE)
    +    SET(CPACK_GENERATOR "DEB")
    +    CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
    +        ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
    +    SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
    +    ADD_SUBDIRECTORY(debian)
    +ELSE()
    +    SET(CPACK_GENERATOR "TGZ")
    +ENDIF()
    +
    +SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
    +SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
    +SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
    +INCLUDE(CPack)
    +
    +IF(PAHO_ENABLE_TESTING)
    --- End diff --
    
    will remove


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794354
  
    --- Diff: extensions/mqtt/PublishMQTT.h ---
    @@ -0,0 +1,142 @@
    +/**
    + * @file PublishMQTT.h
    + * PublishMQTT class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __PUBLISH_MQTT_H__
    +#define __PUBLISH_MQTT_H__
    +
    +#include "FlowFileRecord.h"
    +#include "core/Processor.h"
    +#include "core/ProcessSession.h"
    +#include "core/Core.h"
    +#include "core/Resource.h"
    +#include "core/Property.h"
    +#include "core/logging/LoggerConfiguration.h"
    +#include "MQTTClient.h"
    +#include "AbstractMQTTProcessor.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +// PublishMQTT Class
    +class PublishMQTT: public processors::AbstractMQTTProcessor {
    +public:
    +  // Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
    +    : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) {
    +    retain_ = false;
    +    max_seg_size_ = ULLONG_MAX;
    +  }
    +  // Destructor
    +  virtual ~PublishMQTT() {
    +  }
    +  // Processor Name
    +  static constexpr char const* ProcessorName = "PublishMQTT";
    +  // Supported Properties
    +  static core::Property Retain;
    +  static core::Property MaxFlowSegSize;
    +
    +  // Nest Callback Class for read stream
    +  class ReadCallback: public InputStreamCallback {
    +  public:
    +    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client,
    +        int qos, bool retain, MQTTClient_deliveryToken &token) :
    +        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client),
    +        qos_(qos), retain_(retain), token_(token) {
    +      status_ = 0;
    +      read_size_ = 0;
    +    }
    +    ~ReadCallback() {
    +    }
    +    int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +      if (flow_size_ < max_seg_size_)
    +        max_seg_size_ = flow_size_;
    +      std::vector<unsigned char> buffer;
    +      buffer.reserve(max_seg_size_);
    +      read_size_ = 0;
    +      status_ = 0;
    +      while (read_size_ < flow_size_) {
    +        int readRet = stream->read(&buffer[0], max_seg_size_);
    +        if (readRet < 0) {
    +          status_ = -1;
    +          return read_size_;
    +        }
    +        if (readRet > 0) {
    +          MQTTClient_message pubmsg = MQTTClient_message_initializer;
    +          pubmsg.payload = &buffer[0];
    +          pubmsg.payloadlen = readRet;
    +          pubmsg.qos = qos_;
    +          pubmsg.retained = retain_;
    +          if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) {
    --- End diff --
    
    does the publish copy the buffer? If not, does it finish entirely or will it create a callback ? I ask because you're passing in the buffer as the payload, but if there is a callback, we could possibly have memory that's freed when this function exits with the MQTTClient still in progress. 


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794850
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_);
    --- End diff --
    
    FYI This can cause issues on arm


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_);
    --- End diff --
    
    will change


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi-minifi-cpp/pull/228


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405
  
    --- Diff: extensions/mqtt/ConsumeMQTT.h ---
    @@ -0,0 +1,125 @@
    +/**
    + * @file ConsumeMQTT.h
    + * ConsumeMQTT class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __CONSUME_MQTT_H__
    +#define __CONSUME_MQTT_H__
    +
    +#include <climits>
    +#include <deque>
    +#include "FlowFileRecord.h"
    +#include "core/Processor.h"
    +#include "core/ProcessSession.h"
    +#include "core/Core.h"
    +#include "core/Resource.h"
    +#include "core/Property.h"
    +#include "core/logging/LoggerConfiguration.h"
    +#include "MQTTClient.h"
    +#include "AbstractMQTTProcessor.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
    +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
    +
    +// ConsumeMQTT Class
    +class ConsumeMQTT: public processors::AbstractMQTTProcessor {
    +public:
    +  // Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
    +    : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) {
    +    isSubscriber_ = true;
    +    maxQueueSize_ = 100;
    +  }
    +  // Destructor
    +  virtual ~ConsumeMQTT() {
    +    std::lock_guard < std::mutex > lock(mutex_);
    +    while (!queue_.empty()) {
    +      MQTTClient_message *message = queue_.front();
    +      MQTTClient_freeMessage(&message);
    +      queue_.pop_front();
    +    }
    +  }
    +  // Processor Name
    +  static constexpr char const* ProcessorName = "ConsumeMQTT";
    +  // Supported Properties
    +  static core::Property MaxQueueSize;
    +  // Nest Callback Class for write stream
    +  class WriteCallback: public OutputStreamCallback {
    +  public:
    +    WriteCallback(MQTTClient_message *message) :
    +      message_(message) {
    +      status_ = 0;
    +    }
    +    MQTTClient_message *message_;
    +    int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +      int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen);
    +      if (len < 0)
    +        status_ = -1;
    +      return len;
    +    }
    +    int status_;
    +  };
    +
    +public:
    +  /**
    +   * Function that's executed when the processor is scheduled.
    +   * @param context process context.
    +   * @param sessionFactory process session factory that is used when creating
    +   * ProcessSession objects.
    +   */
    +  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
    +  // OnTrigger method, implemented by NiFi ConsumeMQTT
    +  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
    +  // Initialize, over write by NiFi ConsumeMQTT
    +  virtual void initialize(void);
    +  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
    +
    +protected:
    +  void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
    +    std::lock_guard < std::mutex > lock(mutex_);
    --- End diff --
    
    will do


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
    +      core::Property::StringToInt(value, valInt)) {
    +    qos_ = valInt;
    +    logger_->log_info("PublishKafka: QOS [%d]", qos_);
    --- End diff --
    
    will change.


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792503
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
    +      core::Property::StringToInt(value, valInt)) {
    +    qos_ = valInt;
    +    logger_->log_info("PublishKafka: QOS [%d]", qos_);
    --- End diff --
    
    statement copied from kafka


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792523
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
    +      core::Property::StringToInt(value, valInt)) {
    +    qos_ = valInt;
    +    logger_->log_info("PublishKafka: QOS [%d]", qos_);
    +  }
    +  if (!client_) {
    +    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
    --- End diff --
    
    what's the thread safety of this function call?


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
    @@ -0,0 +1,154 @@
    +/**
    + * @file AbstractMQTTProcessor.h
    + * AbstractMQTTProcessor class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __ABSTRACTMQTT_H__
    +#define __ABSTRACTMQTT_H__
    +
    +#include "FlowFileRecord.h"
    +#include "core/Processor.h"
    +#include "core/ProcessSession.h"
    +#include "core/Core.h"
    +#include "core/Resource.h"
    +#include "core/logging/LoggerConfiguration.h"
    +#include "MQTTClient.h"
    +
    +#define MQTT_QOS_0 "0"
    +#define MQTT_QOS_1 "1"
    +#define MQTT_QOS_2 "2"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +// AbstractMQTTProcessor Class
    +class AbstractMQTTProcessor : public core::Processor {
    + public:
    +  // Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
    +      : core::Processor(name, uuid),
    +        logger_(logging::LoggerFactory<AbstractMQTTProcessor>::getLogger()) {
    +    client_ = nullptr;
    +    cleanSession_ = false;
    +    keepAliveInterval_ = 60;
    +    connectionTimeOut_ = 30;
    +    qos_ = 0;
    +    isSubscriber_ = false;
    +  }
    +  // Destructor
    +  virtual ~AbstractMQTTProcessor() {
    +    if (isSubscriber_) {
    +      MQTTClient_unsubscribe(client_, topic_.c_str());
    --- End diff --
    
    it should be. only side effect is if app halt and did not unsub. The broker will buffer the msgs to the topic while the client is go away. if the app restart with the same topic and same client ID, it will rx all msg buffer for that client buffered in the broker.



---

[GitHub] nifi-minifi-cpp issue #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/228
  
    @phrocker addressed  comments, please review.


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086
  
    --- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
    @@ -0,0 +1,47 @@
    +sudo: true
    --- End diff --
    
    will remove


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078
  
    --- Diff: extensions/mqtt/PublishMQTT.h ---
    @@ -0,0 +1,142 @@
    +/**
    + * @file PublishMQTT.h
    + * PublishMQTT class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __PUBLISH_MQTT_H__
    +#define __PUBLISH_MQTT_H__
    +
    +#include "FlowFileRecord.h"
    +#include "core/Processor.h"
    +#include "core/ProcessSession.h"
    +#include "core/Core.h"
    +#include "core/Resource.h"
    +#include "core/Property.h"
    +#include "core/logging/LoggerConfiguration.h"
    +#include "MQTTClient.h"
    +#include "AbstractMQTTProcessor.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +// PublishMQTT Class
    +class PublishMQTT: public processors::AbstractMQTTProcessor {
    +public:
    +  // Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
    +    : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) {
    +    retain_ = false;
    +    max_seg_size_ = ULLONG_MAX;
    +  }
    +  // Destructor
    +  virtual ~PublishMQTT() {
    +  }
    +  // Processor Name
    +  static constexpr char const* ProcessorName = "PublishMQTT";
    +  // Supported Properties
    +  static core::Property Retain;
    +  static core::Property MaxFlowSegSize;
    +
    +  // Nest Callback Class for read stream
    +  class ReadCallback: public InputStreamCallback {
    +  public:
    +    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client,
    +        int qos, bool retain, MQTTClient_deliveryToken &token) :
    +        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client),
    +        qos_(qos), retain_(retain), token_(token) {
    +      status_ = 0;
    +      read_size_ = 0;
    +    }
    +    ~ReadCallback() {
    +    }
    +    int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +      if (flow_size_ < max_seg_size_)
    +        max_seg_size_ = flow_size_;
    +      std::vector<unsigned char> buffer;
    +      buffer.reserve(max_seg_size_);
    +      read_size_ = 0;
    +      status_ = 0;
    +      while (read_size_ < flow_size_) {
    +        int readRet = stream->read(&buffer[0], max_seg_size_);
    +        if (readRet < 0) {
    +          status_ = -1;
    +          return read_size_;
    +        }
    +        if (readRet > 0) {
    +          MQTTClient_message pubmsg = MQTTClient_message_initializer;
    +          pubmsg.payload = &buffer[0];
    +          pubmsg.payloadlen = readRet;
    +          pubmsg.qos = qos_;
    +          pubmsg.retained = retain_;
    +          if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) {
    --- End diff --
    
    it add the MQTT header and call socket write.
    the deliverable callback is for QOS.
    /**
     * This is a callback function. The client application
     * must provide an implementation of this function to enable asynchronous
     * notification of delivery of messages. The function is registered with the
     * client library by passing it as an argument to MQTTClient_setCallbacks().
     * It is called by the client library after the client application has
     * published a message to the server. It indicates that the necessary
     * handshaking and acknowledgements for the requested quality of service (see
     * MQTTClient_message.qos) have been completed. This function is executed on a
     * separate thread to the one on which the client application is running.
     * <b>Note:</b>MQTTClient_deliveryComplete() is not called when messages are
     * published at QoS0.
     * @param context A pointer to the <i>context</i> value originally passed to
     * MQTTClient_setCallbacks(), which contains any application-specific context.
     * @param dt The ::MQTTClient_deliveryToken associated with
     * the published message. Applications can check that all messages have been
     * correctly published by matching the delivery tokens returned from calls to
     * MQTTClient_publish() and MQTTClient_publishMessage() with the tokens passed
     * to this callback.
     */
    typedef void MQTTClient_deliveryComplete(void* context, MQTTClient_deliveryToken dt);


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by minifirocks <gi...@git.apache.org>.
Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
    +      core::Property::StringToInt(value, valInt)) {
    +    qos_ = valInt;
    +    logger_->log_info("PublishKafka: QOS [%d]", qos_);
    +  }
    +  if (!client_) {
    +    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
    --- End diff --
    
    it is thread safeint 
    MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
    		int persistence_type, void* persistence_context)
    {
    	int rc = 0;
    	MQTTClients *m = NULL;
    
    	FUNC_ENTRY;
    	rc = Thread_lock_mutex(mqttclient_mutex);


---

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794541
  
    --- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
    @@ -0,0 +1,47 @@
    +sudo: true
    --- End diff --
    
    this file doesn't seem like it's needed. 


---