You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2017/06/07 17:55:45 UTC
nifi-minifi-cpp git commit: MINIFI-262: Configuration Listener
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 77a20dbe3 -> 1a1716dc6
MINIFI-262: Configuration Listener
This closes #112.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a1716dc
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a1716dc
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a1716dc
Branch: refs/heads/master
Commit: 1a1716dc675886689217860a299f7a8d36850adf
Parents: 77a20db
Author: Bin Qiu <be...@gmail.com>
Authored: Wed Jun 7 09:26:11 2017 -0700
Committer: Marc Parisi <ph...@apache.org>
Committed: Wed Jun 7 13:55:28 2017 -0400
----------------------------------------------------------------------
README.md | 18 ++
cmake/BuildTests.cmake | 3 +
libminifi/include/ConfigurationListener.h | 122 ++++++++++++++
libminifi/include/FlowControlProtocol.h | 4 -
libminifi/include/FlowController.h | 39 ++++-
libminifi/include/HttpConfigurationListener.h | 82 +++++++++
libminifi/include/core/FlowConfiguration.h | 9 +-
libminifi/include/core/ProcessGroup.h | 9 +-
.../core/repository/FlowFileRepository.h | 2 +-
libminifi/include/core/yaml/YamlConfiguration.h | 16 ++
libminifi/include/processors/InvokeHTTP.h | 58 +------
libminifi/include/properties/Configure.h | 11 ++
libminifi/include/utils/HTTPUtils.h | 97 +++++++++++
libminifi/src/ConfigurationListener.cpp | 131 +++++++++++++++
libminifi/src/Configure.cpp | 34 +++-
libminifi/src/FlowController.cpp | 37 +++-
libminifi/src/HttpConfigurationListener.cpp | 167 +++++++++++++++++++
libminifi/src/core/FlowConfiguration.cpp | 6 +-
libminifi/src/core/ProcessGroup.cpp | 4 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 14 +-
libminifi/src/processors/InvokeHTTP.cpp | 16 +-
.../HttpConfigurationListenerTest.cpp | 144 ++++++++++++++++
22 files changed, 938 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index acaf8dd..0fb25f3 100644
--- a/README.md
+++ b/README.md
@@ -323,6 +323,24 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
host: localhost
port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204
batch size: 100
+
+### Http Configuration Listener
+
+ Http Configuration Listener will pull flow file configuration from the remote command control server,
+ validate and apply the new flow configuration
+
+ in minifi.properties
+
+ nifi.configuration.listener.type=http
+ nifi.configuration.listener.http.url=https://localhost:8080
+ nifi.configuration.listener.pull.interval=1 sec
+ nifi.configuration.listener.client.ca.certificate=./conf/nifi-cert.pem
+
+ if you want to enable client certificate
+ nifi.configuration.listener.need.ClientAuth=true
+ nifi.configuration.listener.client.certificate=./conf/client.pem
+ nifi.configuration.listener.client.private.key=./conf/client.key
+ nifi.configuration.listener.client.pass.phrase=./conf/password
### Controller Services
If you need to reference a controller service in your config.yml file, use the following template. In the example, below, ControllerServiceClass is the name of the class defining the controller Service. ControllerService1
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index bad01d3..aedae10 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -36,6 +36,7 @@ function(createTests testName)
target_include_directories(${testName} PRIVATE BEFORE "thirdparty/spdlog-0.13.0/include")
target_include_directories(${testName} PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
target_include_directories(${testName} PRIVATE BEFORE "thirdparty/jsoncpp/include")
+ target_include_directories(${testName} PRIVATE BEFORE "thirdparty/civetweb-1.9.1/include")
target_include_directories(${testName} PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
target_include_directories(${testName} PRIVATE BEFORE "include")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/")
@@ -87,6 +88,8 @@ add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegra
add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+add_test(NAME HttpConfigurationListenerTest COMMAND HttpConfigurationListenerTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+
add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/")
add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" )
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/ConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ConfigurationListener.h b/libminifi/include/ConfigurationListener.h
new file mode 100644
index 0000000..5574226
--- /dev/null
+++ b/libminifi/include/ConfigurationListener.h
@@ -0,0 +1,122 @@
+/**
+ * ConfigurationListener class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONFIGURATION_LISTENER__
+#define __CONFIGURATION_LISTENER__
+
+#include <memory>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <string>
+#include <thread>
+
+#include "yaml-cpp/yaml.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+// Forwarder declaration
+class FlowController;
+// ConfigurationListener Class
+class ConfigurationListener {
+public:
+
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ ConfigurationListener(std::shared_ptr<FlowController> controller,
+ std::shared_ptr<Configure> configure, std::string type) :
+ connect_timeout_(20000), read_timeout_(20000), type_(type), configure_(
+ configure), controller_(controller), need_client_certificate_(false) {
+ logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger();
+ running_ = false;
+ }
+ // Destructor
+ virtual ~ConfigurationListener() {
+ stop();
+ }
+
+ // Start the thread
+ void start();
+ // Stop the thread
+ void stop();
+ // whether the thread is enable
+ bool isRunning() {
+ return running_;
+ }
+ // pull the new configuration from the remote host
+ virtual bool pullConfiguration(std::string &configuration) {
+ return false;
+ }
+
+protected:
+
+ // Run function for the thread
+ void run();
+
+ // Run function for the thread
+ void threadExecutor() {
+ run();
+ }
+
+ // Mutex for protection
+ std::mutex mutex_;
+ // thread
+ std::thread thread_;
+ // whether the thread is running
+ std::atomic<bool> running_;
+
+ // url
+ std::string url_;
+ // connection timeout
+ int64_t connect_timeout_;
+ // read timeout.
+ int64_t read_timeout_;
+ // pull interval
+ int64_t pull_interval_;
+ // type (http/rest)
+ std::string type_;
+ // last applied configuration
+ std::string lastAppliedConfiguration;
+
+ std::shared_ptr<Configure> configure_;
+ std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<FlowController> controller_;
+
+ bool need_client_certificate_;
+ std::string certificate_;
+ std::string private_key_;
+ std::string passphrase_;
+ std::string ca_certificate_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
index 8992049..c0781b8 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -218,10 +218,6 @@ class FlowControlProtocol {
}
// Run function for the thread
static void run(FlowControlProtocol *protocol);
- // set 8 bytes SerialNumber
- void setSerialNumber(uint8_t *number) {
- memcpy(_serialNumber, number, 8);
- }
protected:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index dc0d610..6ea802c 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -21,6 +21,7 @@
#define __FLOW_CONTROLLER_H__
#include <uuid/uuid.h>
+#include <stdio.h>
#include <vector>
#include <queue>
#include <map>
@@ -43,6 +44,8 @@
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "FlowControlProtocol.h"
+#include "ConfigurationListener.h"
+#include "HttpConfigurationListener.h"
#include "core/Property.h"
@@ -127,9 +130,36 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
root_->updatePropertyValue(processorName, propertyName, propertyValue);
}
- // set 8 bytes SerialNumber
- virtual void setSerialNumber(uint8_t *number) {
- protocol_->setSerialNumber(number);
+ // set SerialNumber
+ void setSerialNumber(std::string number) {
+ serial_number_ = number;
+ }
+
+ // get serial number as string
+ std::string getSerialNumber() {
+ return serial_number_;
+ }
+
+ // validate and apply passing yaml configuration payload
+ // first it will validate the payload with the current root node config for flowController
+ // like FlowController id/name is the same and new version is greater than the current version
+ // after that, it will apply the configuration
+ bool applyConfiguration(std::string &configurePayload);
+
+ // get name
+ std::string getName() {
+ if (root_ != nullptr)
+ return root_->getName();
+ else
+ return "";
+ }
+
+ // get version
+ int getVersion() {
+ if (root_ != nullptr)
+ return root_->getVersion();
+ else
+ return 0;
}
/**
@@ -292,6 +322,9 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
private:
std::shared_ptr<logging::Logger> logger_;
+ // http configuration listener object.
+ std::unique_ptr<HttpConfigurationListener> http_configuration_listener_;
+ std::string serial_number_;
};
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/HttpConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/HttpConfigurationListener.h b/libminifi/include/HttpConfigurationListener.h
new file mode 100644
index 0000000..72d4728
--- /dev/null
+++ b/libminifi/include/HttpConfigurationListener.h
@@ -0,0 +1,82 @@
+/**
+ * HttpConfigurationListener class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __HTTP_CONFIGURATION_LISTENER__
+#define __HTTP_CONFIGURATION_LISTENER__
+
+#include <curl/curl.h>
+#include "core/Core.h"
+#include "core/Property.h"
+#include "ConfigurationListener.h"
+#include "utils/HTTPUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+// HttpConfigurationListener Class
+class HttpConfigurationListener: public ConfigurationListener {
+public:
+
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ HttpConfigurationListener(std::shared_ptr<FlowController> controller,
+ std::shared_ptr<Configure> configure) :
+ minifi::ConfigurationListener(controller, configure, "http") {
+ std::string value;
+
+ if (configure->get(Configure::nifi_configuration_listener_http_url, value)) {
+ url_ = value;
+ logger_->log_info("Http configuration listener URL %s", url_.c_str());
+ } else {
+ url_ = "";
+ }
+
+ curl_global_init(CURL_GLOBAL_DEFAULT);
+ this->start();
+ }
+
+ bool pullConfiguration(std::string &configuration);
+
+ /**
+ * Configures a secure connection
+ */
+ void configureSecureConnection(CURL *http_session);
+
+ static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param);
+ static int pemPassWordCb(char *buf, int size, int rwflag, void *param);
+
+ // Destructor
+ virtual ~HttpConfigurationListener() {
+ this->stop();
+ curl_global_cleanup();
+ }
+
+protected:
+
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index edcb2b6..6e2b700 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -75,8 +75,8 @@ class FlowConfiguration : public CoreComponent {
// Create Processor (Node/Input/Output Port) based on the name
std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t uuid);
// Create Root Processor Group
- std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, uuid_t uuid);
-
+ std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name,
+ uuid_t uuid, int version);
std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid);
// Create Remote Processor Group
@@ -98,6 +98,11 @@ class FlowConfiguration : public CoreComponent {
return getRoot(config_path_);
}
+ virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(
+ std::string &yamlConfigPayload) {
+ return nullptr;
+ }
+
/**
* Base implementation that returns a null root pointer.
* @return Extensions should return a non-null pointer in order to
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index f54f5b4..4978886 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -55,7 +55,8 @@ class ProcessGroup {
/*!
* Create a new process group
*/
- ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, ProcessGroup *parent = NULL);
+ ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, int version = 0,
+ ProcessGroup *parent = NULL);
// Destructor
virtual ~ProcessGroup();
// Set Processor Name
@@ -109,6 +110,10 @@ class ProcessGroup {
} else
return false;
}
+ // getVersion
+ int getVersion() {
+ return version_;
+ }
// Start Processing
void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler);
// Stop Processing
@@ -165,6 +170,8 @@ class ProcessGroup {
uuid_t uuid_;
// Processor Group Name
std::string name_;
+ // version
+ int version_;
// Process Group Type
ProcessGroupType type_;
// Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h
index b7c43b2..2e19286 100644
--- a/libminifi/include/core/repository/FlowFileRepository.h
+++ b/libminifi/include/core/repository/FlowFileRepository.h
@@ -37,7 +37,7 @@ namespace repository {
#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
-#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
+#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec
/**
* Flow File repository
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 3bfaefd..61bf271 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -91,6 +91,22 @@ class YamlConfiguration : public FlowConfiguration {
return getRoot(&rootYamlNode);
}
+ /**
+ * Returns a shared pointer to a ProcessGroup object containing the
+ * flow configuration. The yamlConfigPayload argument must be
+ * a payload for the raw YAML configuration.
+ *
+ * @param yamlConfigPayload an input payload for the raw YAML configuration
+ * to be parsed and loaded into the flow
+ * configuration tree
+ * @return the root ProcessGroup node of the flow
+ * configuration tree
+ */
+ std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string &yamlConfigPayload) {
+ YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
+ return getRoot(&rootYamlNode);
+ }
+
protected:
/**
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h
index ab78fd5..c8e0c10 100644
--- a/libminifi/include/processors/InvokeHTTP.h
+++ b/libminifi/include/processors/InvokeHTTP.h
@@ -32,6 +32,7 @@
#include "controllers/SSLContextService.h"
#include "utils/ByteInputCallBack.h"
#include "core/logging/LoggerConfiguration.h"
+#include "utils/HTTPUtils.h"
namespace org {
namespace apache {
@@ -39,63 +40,6 @@ namespace nifi {
namespace minifi {
namespace processors {
-struct CallBackPosition {
- utils::ByteInputCallBack *ptr;
- size_t pos;
-};
-
-/**
- * HTTP Response object
- */
-struct HTTPRequestResponse {
- std::vector<char> data;
-
- /**
- * Receive HTTP Response.
- */
- static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) {
- return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, nmemb);
- }
-
- /**
- * Callback for post, put, and patch operations
- * @param buffer
- * @param size size of buffer
- * @param nitems items to add
- * @param insteam input stream object.
- */
-
- static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
- if (p != 0) {
- CallBackPosition *callback = (CallBackPosition*) p;
- if (callback->pos <= callback->ptr->getBufferSize()) {
- char *ptr = callback->ptr->getBuffer();
- int len = callback->ptr->getBufferSize() - callback->pos;
- if (len <= 0) {
- delete callback->ptr;
- delete callback;
- return 0;
- }
- if (len > size * nmemb)
- len = size * nmemb;
- memcpy(data, callback->ptr->getBuffer() + callback->pos, len);
- callback->pos += len;
- return len;
- }
- } else {
- return CURL_READFUNC_ABORT;
- }
-
- return 0;
- }
-
- size_t write_content(char* ptr, size_t size, size_t nmemb) {
- data.insert(data.end(), ptr, ptr + size * nmemb);
- return size * nmemb;
- }
-
-};
-
// InvokeHTTP Class
class InvokeHTTP : public core::Processor {
public:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 58f6679..fa19a18 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -54,10 +54,21 @@ class Configure : public Properties {
static const char *nifi_flowfile_repository_enable;
static const char *nifi_remote_input_secure;
static const char *nifi_security_need_ClientAuth;
+ // site2site security config
static const char *nifi_security_client_certificate;
static const char *nifi_security_client_private_key;
static const char *nifi_security_client_pass_phrase;
static const char *nifi_security_client_ca_certificate;
+ static const char *nifi_configuration_listener_pull_interval;
+ static const char *nifi_configuration_listener_http_url;
+ static const char *nifi_configuration_listener_rest_url;
+ static const char *nifi_configuration_listener_type; // http or rest
+ // configuration listener security config
+ static const char *nifi_configuration_listener_need_ClientAuth;
+ static const char *nifi_configuration_listener_client_certificate;
+ static const char *nifi_configuration_listener_private_key;
+ static const char *nifi_configuration_listener_client_pass_phrase;
+ static const char *nifi_configuration_listener_client_ca_certificate;
};
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/utils/HTTPUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPUtils.h b/libminifi/include/utils/HTTPUtils.h
new file mode 100644
index 0000000..3f20f5e
--- /dev/null
+++ b/libminifi/include/utils/HTTPUtils.h
@@ -0,0 +1,97 @@
+/**
+ * HTTPUtils class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __HTTP_UTILS_H__
+#define __HTTP_UTILS_H__
+
+#include <curl/curl.h>
+#include <vector>
+#include "ByteInputCallBack.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+struct CallBackPosition {
+ ByteInputCallBack *ptr;
+ size_t pos;
+};
+
+/**
+ * HTTP Response object
+ */
+struct HTTPRequestResponse {
+ std::vector<char> data;
+
+ /**
+ * Receive HTTP Response.
+ */
+ static size_t recieve_write(char * data, size_t size, size_t nmemb,
+ void * p) {
+ return static_cast<HTTPRequestResponse*>(p)->write_content(data, size,
+ nmemb);
+ }
+
+ /**
+ * Callback for post, put, and patch operations
+ * @param buffer
+ * @param size size of buffer
+ * @param nitems items to add
+ * @param insteam input stream object.
+ */
+
+ static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
+ if (p != 0) {
+ CallBackPosition *callback = (CallBackPosition*) p;
+ if (callback->pos <= callback->ptr->getBufferSize()) {
+ char *ptr = callback->ptr->getBuffer();
+ int len = callback->ptr->getBufferSize() - callback->pos;
+ if (len <= 0) {
+ delete callback->ptr;
+ delete callback;
+ return 0;
+ }
+ if (len > size * nmemb)
+ len = size * nmemb;
+ memcpy(data, callback->ptr->getBuffer() + callback->pos, len);
+ callback->pos += len;
+ return len;
+ }
+ } else {
+ return CURL_READFUNC_ABORT;
+ }
+
+ return 0;
+ }
+
+ size_t write_content(char* ptr, size_t size, size_t nmemb) {
+ data.insert(data.end(), ptr, ptr + size * nmemb);
+ return size * nmemb;
+ }
+
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/ConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ConfigurationListener.cpp b/libminifi/src/ConfigurationListener.cpp
new file mode 100644
index 0000000..d52a088
--- /dev/null
+++ b/libminifi/src/ConfigurationListener.cpp
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConfigurationListener.h"
+#include "FlowController.h"
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <string>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+void ConfigurationListener::start() {
+ if (running_)
+ return;
+
+ pull_interval_ = 60 * 1000;
+ std::string value;
+ // grab the value for configuration
+ if (configure_->get(Configure::nifi_configuration_listener_pull_interval,
+ value)) {
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, pull_interval_, unit)
+ && core::Property::ConvertTimeUnitToMS(pull_interval_, unit,
+ pull_interval_)) {
+ logger_->log_info("Configuration Listener pull interval: [%d] ms",
+ pull_interval_);
+ }
+ }
+
+ std::string clientAuthStr;
+ if (configure_->get(Configure::nifi_configuration_listener_need_ClientAuth, clientAuthStr)) {
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_);
+ }
+
+ if (configure_->get(
+ Configure::nifi_configuration_listener_client_ca_certificate,
+ this->ca_certificate_)) {
+ logger_->log_info("Configuration Listener CA certificates: [%s]",
+ this->ca_certificate_);
+ }
+
+ if (this->need_client_certificate_) {
+ std::string passphrase_file;
+
+ if (!(configure_->get(
+ Configure::nifi_configuration_listener_client_certificate, this->certificate_)
+ && configure_->get(Configure::nifi_configuration_listener_private_key,
+ this->private_key_))) {
+ logger_->log_error(
+ "Certificate and Private Key PEM file not configured for configuration listener, error: %s.",
+ std::strerror(errno));
+ }
+
+ if (configure_->get(
+ Configure::nifi_configuration_listener_client_pass_phrase,
+ passphrase_file)) {
+ // load the passphase from file
+ std::ifstream file(passphrase_file.c_str(), std::ifstream::in);
+ if (file.good()) {
+ this->passphrase_.assign((std::istreambuf_iterator<char>(file)),
+ std::istreambuf_iterator<char>());
+ file.close();
+ }
+ }
+
+ logger_->log_info("Configuration Listener certificate: [%s], private key: [%s], passphrase file: [%s]",
+ this->certificate_, this->private_key_, passphrase_file);
+ }
+
+ thread_ = std::thread(&ConfigurationListener::threadExecutor, this);
+ thread_.detach();
+ running_ = true;
+ logger_->log_info("%s ConfigurationListener Thread Start", type_);
+}
+
+void ConfigurationListener::stop() {
+ if (!running_)
+ return;
+ running_ = false;
+ if (thread_.joinable())
+ thread_.join();
+ logger_->log_info("%s ConfigurationListener Thread Stop", type_);
+}
+
+void ConfigurationListener::run() {
+ std::unique_lock<std::mutex> lk(mutex_);
+ std::condition_variable cv;
+ int64_t interval = 0;
+ while (!cv.wait_for(lk, std::chrono::milliseconds(100), [this] {return (running_ == false);})) {
+ interval += 100;
+ if (interval >= pull_interval_) {
+ std::string payload;
+ bool ret = false;
+ ret = pullConfiguration(payload);
+ if (ret) {
+ if (payload.empty() || payload == lastAppliedConfiguration) {
+ interval = 0;
+ continue;
+ }
+ ret = this->controller_->applyConfiguration(payload);
+ if (ret)
+ this->lastAppliedConfiguration = payload;
+ }
+ interval = 0;
+ }
+ }
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index d8e049c..e1bc225 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -43,11 +43,35 @@ const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfil
const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
-const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
-const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";
-const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key";
-const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
-const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
+const char *Configure::nifi_security_need_ClientAuth =
+ "nifi.security.need.ClientAuth";
+const char *Configure::nifi_security_client_certificate =
+ "nifi.security.client.certificate";
+const char *Configure::nifi_security_client_private_key =
+ "nifi.security.client.private.key";
+const char *Configure::nifi_security_client_pass_phrase =
+ "nifi.security.client.pass.phrase";
+const char *Configure::nifi_security_client_ca_certificate =
+ "nifi.security.client.ca.certificate";
+const char *Configure::nifi_configuration_listener_pull_interval =
+ "nifi.configuration.listener.pull.interval";
+const char *Configure::nifi_configuration_listener_http_url =
+ "nifi.configuration.listener.http.url";
+const char *Configure::nifi_configuration_listener_rest_url =
+ "nifi.configuration.listener.rest.url";
+const char *Configure::nifi_configuration_listener_type =
+ "nifi.configuration.listener.type";
+const char *Configure::nifi_configuration_listener_need_ClientAuth =
+ "nifi.configuration.listener.need.ClientAuth";
+const char *Configure::nifi_configuration_listener_client_certificate =
+ "nifi.configuration.listener.client.certificate";
+const char *Configure::nifi_configuration_listener_private_key =
+ "nifi.configuration.listener.client.private.key";
+const char *Configure::nifi_configuration_listener_client_pass_phrase =
+ "nifi.configuration.listener.client.pass.phrase";
+const char *Configure::nifi_configuration_listener_client_ca_certificate =
+ "nifi.configuration.listener.client.ca.certificate";
+
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 62cf21c..fd75fdd 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -33,6 +33,7 @@
#include <utility>
#include <memory>
#include <string>
+#include "yaml-cpp/yaml.h"
#include "core/ProcessContext.h"
#include "core/ProcessGroup.h"
#include "utils/StringUtils.h"
@@ -152,6 +153,31 @@ FlowController::~FlowController() {
provenance_repo_ = nullptr;
}
+bool FlowController::applyConfiguration(std::string &configurePayload) {
+ std::unique_ptr<core::ProcessGroup> newRoot;
+ try {
+ newRoot = std::move(flow_configuration_->getRootFromPayload(configurePayload));
+ }
+ catch (const YAML::Exception& e) {
+ logger_->log_error("Invalid configuration payload");
+ return false;
+ }
+
+ if (newRoot == nullptr)
+ return false;
+
+ logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d",
+ newRoot->getName().c_str(), newRoot->getVersion());
+
+ std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+ stop(true);
+ waitUnload(30000);
+ this->root_ = std::move(newRoot);
+ loadFlowRepo();
+ initialized_ = true;
+ return start();
+}
+
void FlowController::stop(bool force) {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
@@ -164,7 +190,7 @@ void FlowController::stop(bool force) {
this->flow_file_repo_->stop();
this->provenance_repo_->stop();
// Wait for sometime for thread stop
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ std::this_thread::sleep_for(std::chrono::milliseconds(3000));
if (this->root_)
this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
}
@@ -214,6 +240,15 @@ void FlowController::load() {
stop(true);
}
if (!initialized_) {
+ std::string listenerType;
+ // grab the value for configuration
+ if (this->http_configuration_listener_ == nullptr && configuration_->get(Configure::nifi_configuration_listener_type, listenerType)) {
+ if (listenerType == "http") {
+ this->http_configuration_listener_ =
+ std::unique_ptr<minifi::HttpConfigurationListener>(new minifi::HttpConfigurationListener(shared_from_this(), configuration_));
+ }
+ }
+
logger_->log_info("Initializing timers");
if (nullptr == timer_scheduler_) {
timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/HttpConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp
new file mode 100644
index 0000000..70d5793
--- /dev/null
+++ b/libminifi/src/HttpConfigurationListener.cpp
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "HttpConfigurationListener.h"
+#include "FlowController.h"
+#include <curl/curlbuild.h>
+#include <curl/easy.h>
+#include <iostream>
+#include <iterator>
+#include <string>
+#include <vector>
+#include <utility>
+
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+int HttpConfigurationListener::pemPassWordCb(char *buf, int size, int rwflag,
+ void *param) {
+ minifi::HttpConfigurationListener *listener =
+ static_cast<minifi::HttpConfigurationListener*>(param);
+
+ if (listener->passphrase_.length() > 0) {
+ memset(buf, 0x00, size);
+ memcpy(buf, listener->passphrase_.c_str(),
+ listener->passphrase_.length() - 1);
+ return listener->passphrase_.length() - 1;
+ }
+ return 0;
+}
+
+CURLcode HttpConfigurationListener::configureSSLContext(CURL *curl, void *ctx,
+ void *param) {
+ minifi::HttpConfigurationListener *listener =
+ static_cast<minifi::HttpConfigurationListener*>(param);
+ SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx);
+
+ SSL_CTX_load_verify_locations(sslCtx, listener->ca_certificate_.c_str(), 0);
+ SSL_CTX_use_certificate_file(sslCtx, listener->certificate_.c_str(),
+ SSL_FILETYPE_PEM);
+ SSL_CTX_set_default_passwd_cb(sslCtx,
+ HttpConfigurationListener::pemPassWordCb);
+ SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param);
+ SSL_CTX_use_PrivateKey_file(sslCtx, listener->private_key_.c_str(),
+ SSL_FILETYPE_PEM);
+ // verify private key
+ if (!SSL_CTX_check_private_key(sslCtx)) {
+ listener->logger_->log_error(
+ "Private key does not match the public certificate, error : %s",
+ std::strerror(errno));
+ return CURLE_FAILED_INIT;
+ }
+
+ listener->logger_->log_debug(
+ "HttpConfigurationListener load Client Certificates OK");
+ return CURLE_OK;
+}
+
+void HttpConfigurationListener::configureSecureConnection(CURL *http_session) {
+ curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
+ curl_easy_setopt(http_session, CURLOPT_CAINFO, this->ca_certificate_.c_str());
+ curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM");
+ curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L);
+ if (this->need_client_certificate_) {
+ CURLcode ret;
+ ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
+ &HttpConfigurationListener::configureSSLContext);
+ if (ret != CURLE_OK)
+ logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret);
+ curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
+ static_cast<void*>(this));
+ curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM");
+ }
+}
+
+bool HttpConfigurationListener::pullConfiguration(std::string &configuration) {
+ if (url_.empty())
+ return false;
+
+ bool ret = false;
+
+ std::string fullUrl = url_;
+
+ CURL *http_session = curl_easy_init();
+
+ curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str());
+
+ if (connect_timeout_ > 0) {
+ curl_easy_setopt(http_session, CURLOPT_TIMEOUT, connect_timeout_);
+ }
+
+ if (read_timeout_ > 0) {
+ curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
+ }
+
+ if (fullUrl.find("https") != std::string::npos) {
+ configureSecureConnection(http_session);
+ }
+
+ utils::HTTPRequestResponse content;
+ curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
+ &utils::HTTPRequestResponse::recieve_write);
+
+ curl_easy_setopt(http_session, CURLOPT_WRITEDATA,
+ static_cast<void*>(&content));
+
+ CURLcode res = curl_easy_perform(http_session);
+
+ if (res == CURLE_OK) {
+ logger_->log_debug("HttpConfigurationListener -- curl successful to %s",
+ fullUrl.c_str());
+
+ std::string response_body(content.data.begin(), content.data.end());
+ int64_t http_code = 0;
+ curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code);
+ char *content_type;
+ /* ask for the content-type */
+ curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type);
+
+ bool isSuccess = ((int32_t) (http_code / 100)) == 2
+ && res != CURLE_ABORTED_BY_CALLBACK;
+ bool body_empty = IsNullOrEmpty(content.data);
+
+ if (isSuccess && !body_empty) {
+ configuration = std::move(response_body);
+ logger_->log_debug("config %s", configuration.c_str());
+ ret = true;
+ } else {
+ logger_->log_error("Cannot output body to content");
+ }
+ } else {
+ logger_->log_error(
+ "HttpConfigurationListener -- curl_easy_perform() failed %s\n",
+ curl_easy_strerror(res));
+ }
+ curl_easy_cleanup(http_session);
+
+ return ret;
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 6635701..cc6e0e5 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -53,8 +53,10 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask()
return processor;
}
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid) {
- return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(
+ std::string name, uuid_t uuid, int version) {
+ return std::unique_ptr<core::ProcessGroup>(
+ new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
}
std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 9e6778c..7ac139b 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -37,10 +37,12 @@ namespace nifi {
namespace minifi {
namespace core {
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version,
+ ProcessGroup *parent)
: logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
name_(name),
type_(type),
+ version_(version),
parent_process_group_(parent) {
if (!uuid)
// Generate the global UUID for the flow record
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index a11db2b..44aec12 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -31,14 +31,24 @@ namespace core {
core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
uuid_t uuid;
+ int64_t version = 0;
checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string flowName = rootFlowNode["name"].as<std::string>();
std::string id = getOrGenerateId(&rootFlowNode);
uuid_parse(id.c_str(), uuid);
- logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
- std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid);
+ if (rootFlowNode["version"]) {
+ std::string value = rootFlowNode["version"].as<std::string>();
+ if (core::Property::StringToInt(value, version)) {
+ logger_->log_debug("parseRootProcessorGroup: version => [%d]", version);
+ }
+ }
+
+ logger_->log_debug(
+ "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
+ std::unique_ptr<core::ProcessGroup> group =
+ FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
this->name_ = flowName;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index fd39a64..c636201 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -315,8 +315,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
if (read_timeout_ > 0) {
curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
}
- HTTPRequestResponse content;
- curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &HTTPRequestResponse::recieve_write);
+ utils::HTTPRequestResponse content;
+ curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
+ &utils::HTTPRequestResponse::recieve_write);
curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content));
@@ -326,14 +327,17 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
if (claim) {
utils::ByteInputCallBack *callback = new utils::ByteInputCallBack();
session->read(flowFile, callback);
- CallBackPosition *callbackObj = new CallBackPosition;
+ utils::CallBackPosition *callbackObj = new utils::CallBackPosition;
callbackObj->ptr = callback;
callbackObj->pos = 0;
logger_->log_info("InvokeHTTP -- Setting callback");
curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize());
- curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &HTTPRequestResponse::send_write);
- curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj));
+ curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE,
+ (curl_off_t)callback->getBufferSize());
+ curl_easy_setopt(http_session, CURLOPT_READFUNCTION,
+ &utils::HTTPRequestResponse::send_write);
+ curl_easy_setopt(http_session, CURLOPT_READDATA,
+ static_cast<void*>(callbackObj));
} else {
logger_->log_error("InvokeHTTP -- no resource claim");
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/test/integration/HttpConfigurationListenerTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpConfigurationListenerTest.cpp b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
new file mode 100644
index 0000000..a86b884
--- /dev/null
+++ b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "../TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "../include/core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "HttpConfigurationListener.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include <cstring>
+
+void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+class ConfigHandler: public CivetHandler {
+ public:
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::ifstream myfile(test_file_location_.c_str());
+
+ if (myfile.is_open()) {
+ std::stringstream buffer;
+ buffer << myfile.rdbuf();
+ std::string str = buffer.str();
+ myfile.close();
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ str.length());
+ mg_printf(conn, "%s", str.c_str());
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+ std::string test_file_location_;
+};
+
+int main(int argc, char **argv) {
+ LogTestController::getInstance().setInfo<minifi::ConfigurationListener>();
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setInfo<minifi::HttpConfigurationListener>();
+
+ const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
+ std::vector < std::string > cpp_options;
+ for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+ cpp_options.push_back(options[i]);
+ }
+
+ CivetServer server(cpp_options);
+ ConfigHandler h_ex;
+ server.addHandler("/config", h_ex);
+ LogTestController::getInstance().setDebug<minifi::ConfigurationListener>();
+ std::string key_dir, test_file_location;
+ if (argc > 1) {
+ h_ex.test_file_location_ = test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<
+ minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+ configuration->set(minifi::Configure::nifi_configuration_listener_type,
+ "http");
+ configuration->set(
+ minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec");
+ configuration->set(minifi::Configure::nifi_configuration_listener_http_url,
+ "http://localhost:9090/config");
+ mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+ std::shared_ptr<core::Repository> test_repo =
+ std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
+ TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file,
+ test_file_location);
+
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
+ < minifi::io::StreamFactory > (configuration);
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
+ < core::YamlConfiguration
+ > (new core::YamlConfiguration(test_repo, test_repo, stream_factory,
+ configuration, test_file_location));
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+ < TestRepository > (test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller =
+ std::make_shared < minifi::FlowController
+ > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true);
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
+ configuration, test_file_location);
+
+ std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
+ test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup
+ > (ptr.get());
+ ptr.release();
+
+ controller->load();
+ controller->start();
+ waitToVerifyProcessor();
+
+ controller->waitUnload(60000);
+ std::string logs = LogTestController::getInstance().log_output.str();
+ assert(logs.find("HttpConfigurationListener -- curl successful to http://localhost:9090/config") != std::string::npos);
+ assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
+ LogTestController::getInstance().reset();
+ rmdir("./content_repository");
+ return 0;
+}