You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/12 19:24:06 UTC
nifi-minifi-cpp git commit: MINIFICPP-618: Add C2 triggers,
first of which monitors a local file for changes
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master e0d45609b -> ed8221b14
MINIFICPP-618: Add C2 triggers, first of which monitors a local file for changes
MINIFICPP-624: Add alternate names for C2 configuration items and
support both
This closes #415.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ed8221b1
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ed8221b1
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ed8221b1
Branch: refs/heads/master
Commit: ed8221b14d8ba9b4d1713cb618bca6f194e6bcf7
Parents: e0d4560
Author: Marc Parisi <ph...@apache.org>
Authored: Tue Sep 25 17:45:07 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Oct 12 15:22:30 2018 -0400
----------------------------------------------------------------------
C2.md | 54 ++++++--
extensions/http-curl/protocols/RESTReceiver.cpp | 4 +-
extensions/http-curl/protocols/RESTSender.cpp | 12 +-
libminifi/CMakeLists.txt | 2 +-
libminifi/include/c2/C2Agent.h | 18 ++-
libminifi/include/c2/C2Trigger.h | 82 ++++++++++++
.../include/c2/triggers/FileUpdateTrigger.h | 126 +++++++++++++++++++
libminifi/include/properties/Configure.h | 5 +-
libminifi/include/properties/Properties.h | 20 ++-
libminifi/src/Configure.cpp | 1 +
libminifi/src/FlowController.cpp | 5 +-
libminifi/src/Properties.cpp | 21 +++-
libminifi/src/c2/C2Agent.cpp | 82 +++++++++---
libminifi/src/c2/triggers/FileUpdateTrigger.cpp | 49 ++++++++
libminifi/test/unit/FileTriggerTests.cpp | 99 +++++++++++++++
main/MiNiFiMain.cpp | 1 -
16 files changed, 536 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/C2.md
----------------------------------------------------------------------
diff --git a/C2.md b/C2.md
index b41c29c..9d9068a 100644
--- a/C2.md
+++ b/C2.md
@@ -25,6 +25,10 @@ options defined are located in minifi.properties.
- [Configuration](#configuration)
- [Base Options](#base-options)
- [Metrics](#metrics)
+ - [Protocols](#protocols)
+ - [Triggers](#triggers)
+ - [UpdatePolicies](#updatepolicies)
+ - [Documentation](#documentation)
## Description
@@ -41,6 +45,10 @@ will be explained in greater detail in the metrics section.
For more more insight into the API used within the C2 agent, please visit:
https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
+Release 0.6.0: Please note that all c2 properties now exist as nifi.c2.* . If your configuration properties
+files contain the former naming convention of c2.*, we will continue to support that as
+an alternate key, but you are encouraged to switch your configuration options as soon as possible.
+
in minifi.properties
@@ -51,19 +59,19 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
# specify C2 protocol -- default is RESTSender if this is not specified
- c2.agent.protocol.class=RESTSender
+ nifi.c2.agent.protocol.class=RESTSender
# may also use MQTT
- # c2.agent.protocol.class=MQTTC2Protocol
+ # nifi.c2.agent.protocol.class=MQTTC2Protocol
# control c2 heartbeat interval in millisecocnds
- c2.agent.heartbeat.period=3000
+ nifi.c2.agent.heartbeat.period=3000
# enable reporter classes
- c2.agent.heartbeat.reporter.class=RESTReciver
+ nifi.c2.agent.heartbeat.reporter.class=RESTReciver
# specify the rest URIs if using RESTSender
- c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat
- c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge
+ nifi.c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat
+ nifi.c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge
# c2 agent identifier
nifi.c2.agent.identifier=<your identifier>
@@ -72,7 +80,7 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
nifi.c2.agent.class=<your agent class>
# configure SSL Context service for REST Protocol
- c2.rest.ssl.context.service
+ nifi.c2.rest.ssl.context.service
### Metrics
@@ -287,7 +295,7 @@ will forward responses and updates to the heartbeating agents.
Remote Process Groups: []
NiFi Properties Overrides: {}
-### Update Policies
+### UpdatePolicies
Updates to MiNiFi C++ properties can be controlled through an UpdatePolicyControllerService named
C2UpdatePolicy. The service supports several configuration options. They are defined in the following example:
@@ -308,7 +316,31 @@ C2UpdatePolicy. The service supports several configuration options. They are def
Property_3:true
Property_4:true
-### Update Type Descriptions
+
+### Triggers
+
+ C2 Triggers can be activated to perform some C2 activity via a local event. Currently only FileUpdateTrigger exists, which monitors
+ for C2 File triggers to update the flow configuration. Classes can be defined as a comma separated list of classes to load via the option
+ nifi.c2.agent.trigger.classes
+
+
+#### C2 File triggers
+
+C2 updates can be triggered with updates to a flow configuration file. It doesn't have to be the same base configuration file. It
+will be copied into place. A new property, nifi.c2.file.watch, can be placed into minifi.properties to monitor. If the update time
+changes while the agent is running, it will be copied into place of the file defined by nifi.flow.configuration.file. The agent
+will then be restarted with the new flow configuration. If a failure occurs in reading that file or it is an invalid YAML file, the
+update process will be halted.
+
+ in minifi.properties to activate the file update trigger specify
+
+ # specifying a trigger
+ nifi.c2.agent.trigger.classes=FileUpdateTrigger
+ nifi.c2.file.watch=<full path of file to monitor>
+
+
+
+## Documentation
Type descriptions ( class descriptions entered in PROCESSORS.md ) can be automatically placed within C2 by building cmake with
the following flag:
@@ -320,6 +352,4 @@ the following flag:
When cmake is instantiated with this, a build will re-generate the type descriptions from PROCESSORS.md. Once this is finished
you may re-build the project with the following command from the build directory, running the build as you normally would:
- cmake -DBOOTSTRAP= ..
-
-
\ No newline at end of file
+ cmake -DBOOTSTRAP= ..
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/extensions/http-curl/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
index d2c17b6..babc983 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -49,8 +49,8 @@ void RESTReceiver::initialize(const std::shared_ptr<core::controller::Controller
logger_->log_trace("Initializing rest receiver");
if (nullptr != configuration_) {
std::string listeningPort,rootUri="/", caCert;
- configuration_->get("c2.rest.listener.port", listeningPort);
- configuration_->get("c2.rest.listener.cacert", caCert);
+ configuration_->get("nifi.c2.rest.listener.port","c2.rest.listener.port", listeningPort);
+ configuration_->get("nifi.c2.rest.listener.cacert","c2.rest.listener.cacert", caCert);
if (!listeningPort.empty() && !rootUri.empty()) {
handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
if (!caCert.empty()) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 7db998a..9b4ce5e 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -46,15 +46,15 @@ void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerSe
// base URL when one is not specified.
if (nullptr != configure) {
std::string update_str, ssl_context_service_str;
- configure->get("c2.rest.url", rest_uri_);
- configure->get("c2.rest.url.ack", ack_uri_);
- if (configure->get("c2.rest.ssl.context.service", ssl_context_service_str)) {
+ configure->get("nifi.c2.rest.url","c2.rest.url", rest_uri_);
+ configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", ack_uri_);
+ if (configure->get("nifi.c2.rest.ssl.context.service","c2.rest.ssl.context.service", ssl_context_service_str)) {
auto service = controller->getControllerService(ssl_context_service_str);
if (nullptr != service) {
ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
}
}
- configure->get("c2.rest.heartbeat.minimize.updates", update_str);
+ configure->get("nifi.c2.rest.heartbeat.minimize.updates","c2.rest.heartbeat.minimize.updates", update_str);
utils::StringUtils::StringToBool(update_str, minimize_updates_);
}
logger_->log_debug("Submitting to %s", rest_uri_);
@@ -78,8 +78,8 @@ C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction directi
void RESTSender::update(const std::shared_ptr<Configure> &configure) {
std::string url;
- configure->get("c2.rest.url", url);
- configure->get("c2.rest.url.ack", url);
+ configure->get("nifi.c2.rest.url","c2.rest.url", url);
+ configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", url);
}
const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 1f556d0..11eec1c 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -97,7 +97,7 @@ find_package(OpenSSL)
if (OPENSSL_FOUND)
set(TLS_SOURCES "src/io/tls/*.cpp")
endif(OPENSSL_FOUND)
-file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" )
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index e9ff4e4..cc575a2 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -30,6 +30,7 @@
#include "controllers/UpdatePolicyControllerService.h"
#include "core/state/Value.h"
#include "C2Payload.h"
+#include "C2Trigger.h"
#include "C2Protocol.h"
#include "io/validation.h"
#include "HeartBeatReporter.h"
@@ -90,11 +91,23 @@ class C2Agent : public state::UpdateController, public state::response::Response
protected:
+ /**
+ * Restarts this agent.
+ */
void restart_agent();
+ /**
+ * Update agent per the provided C2 update from c2 server or triggers
+ */
void update_agent();
/**
+ * Check the collection of triggers for any updates that need to be handled.
+ * This is an optional step
+ */
+ void checkTriggers();
+
+ /**
* Configure the C2 agent
*/
void configure(const std::shared_ptr<Configure> &configure, bool reconfigure = true);
@@ -212,6 +225,8 @@ class C2Agent : public state::UpdateController, public state::response::Response
std::vector<std::shared_ptr<HeartBeatReporter>> heartbeat_protocols_;
+ std::vector<std::shared_ptr<C2Trigger>> triggers_;
+
std::atomic<C2Protocol*> protocol_;
bool allow_updates_;
@@ -223,8 +238,7 @@ class C2Agent : public state::UpdateController, public state::response::Response
std::string bin_location_;
std::shared_ptr<logging::Logger> logger_;
-}
-;
+};
} /* namesapce c2 */
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/C2Trigger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Trigger.h b/libminifi/include/c2/C2Trigger.h
new file mode 100644
index 0000000..87f33f9
--- /dev/null
+++ b/libminifi/include/c2/C2Trigger.h
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_
+#define LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_
+
+#include "core/Connectable.h"
+#include "c2/C2Payload.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose: Defines basic triggering mechanism for command and control interfaces
+ *
+ * Design: Extends Connectable so that we can instantiate with the class name
+ *
+ * The state machine expects triggered (yes ) -> getAction -> reset(optional)
+ */
+class C2Trigger : public core::Connectable{
+ public:
+
+ C2Trigger(std::string name, utils::Identifier uuid)
+ : core::Connectable(name, uuid){
+
+ }
+ virtual ~C2Trigger() {
+ }
+
+
+ /**
+ * initializes trigger with minifi configuration.
+ */
+ virtual void initialize(const std::shared_ptr<minifi::Configure> &configuration) = 0;
+ /**
+ * returns true if triggered, false otherwise. calling this function multiple times
+ * may change internal state.
+ */
+ virtual bool triggered() = 0;
+
+ /**
+ * Resets actions once they have been triggered. The flow of events does not require
+ * this to occur after an action has been triggered. Instead this is optional
+ * and a feature available to potential triggers that require a reset.
+ *
+ * This will occur because there are times in which the C2Action may take a significant
+ * amount of time and a reset is in order to avoid continual triggering.
+ */
+ virtual void reset() = 0;
+
+ /**
+ * Returns a payload implementing a C2 action. May or may not reset the action.
+ * @return C2Payload of the action to perform.
+ */
+ virtual C2Payload getAction() = 0;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/triggers/FileUpdateTrigger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h
new file mode 100644
index 0000000..031a245
--- /dev/null
+++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_
+#define LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_
+#include <atomic>
+#include "c2/C2Trigger.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+#include "c2/C2Payload.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose: Defines a file update trigger when the last write time of a file has been changed.
+ * Design: Extends C2Trigger, and implements a trigger, action, reset state machine. Calling
+ * triggered will check the file.
+ */
+class FileUpdateTrigger : public C2Trigger {
+ public:
+
+ FileUpdateTrigger(std::string name, utils::Identifier uuid = utils::Identifier())
+ : C2Trigger(name, uuid),
+ last_update_(0),
+ update_(false),
+ logger_(logging::LoggerFactory<FileUpdateTrigger>::getLogger()) {
+ }
+
+ void initialize(const std::shared_ptr<minifi::Configure> &configuration) {
+ if (nullptr != configuration) {
+ if (configuration->get(minifi::Configure::nifi_c2_file_watch, "c2.file.watch", file_)) {
+ last_update_ = utils::file::FileUtils::last_write_time(file_);
+ } else {
+ logger_->log_trace("Could not configure file");
+ }
+
+ }
+ }
+
+ virtual bool triggered() {
+ if (last_update_ == 0) {
+ logger_->log_trace("Last Update is zero");
+ return false;
+ }
+ auto update_time = utils::file::FileUtils::last_write_time(file_);
+ logger_->log_trace("Last Update is %d and update time is %d", last_update_.load(), update_time);
+ if (update_time > last_update_) {
+ last_update_ = update_time;
+ update_ = true;
+ return true;
+ }
+ return false;
+ }
+
+ virtual void reset() {
+ // reset the last write time
+ last_update_ = utils::file::FileUtils::last_write_time(file_);
+ update_ = false;
+ }
+
+ /**
+ * Returns an update payload implementing a C2 action
+ */
+ virtual C2Payload getAction();
+
+ /**
+ * Determines if we are connected and operating
+ */
+ virtual bool isRunning() {
+ return true;
+ }
+
+ /**
+ * Block until work is available on any input connection, or the given duration elapses
+ * @param timeoutMs timeout in milliseconds
+ */
+
+ virtual void yield() {
+
+ }
+
+ /**
+ * Determines if work is available by this connectable
+ * @return boolean if work is available.
+ */
+ virtual bool isWorkAvailable() {
+ return true;
+ }
+
+ protected:
+ std::string file_;
+ std::atomic<uint64_t> last_update_;
+ std::atomic<bool> update_;
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+// add the trigger to the known resources.
+REGISTER_RESOURCE(FileUpdateTrigger)
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 872c35d..4fb68dc 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -41,7 +41,6 @@ class Configure : public Properties {
}
// nifi.flow.configuration.file
static const char *nifi_default_directory;
- static const char *nifi_c2_enable;
static const char *nifi_flow_configuration_file;
static const char *nifi_flow_configuration_file_backup_update;
static const char *nifi_flow_engine_threads;
@@ -78,6 +77,10 @@ class Configure : public Properties {
// nifi rest api user name and password
static const char *nifi_rest_api_user_name;
static const char *nifi_rest_api_password;
+ // c2 options
+
+ static const char *nifi_c2_enable;
+ static const char *nifi_c2_file_watch;
private:
std::string agent_identifier_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/properties/Properties.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index ec0ca5d..eadb77d 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -57,8 +57,24 @@ class Properties {
std::lock_guard<std::mutex> lock(mutex_);
return (properties_.find(key) != properties_.end());
}
- // Get the config value
- bool get(std::string key, std::string &value);
+ /**
+ * Returns the config value by placing it into the referenced param value
+ * @param key key to look up
+ * @param value value in which to place the map's stored property value
+ * @returns true if found, false otherwise.
+ */
+ bool get(const std::string &key, std::string &value);
+
+ /**
+ * Returns the config value by placing it into the referenced param value
+ * Uses alternate_key if key is not found within the map.
+ *
+ * @param key key to look up
+ * @param alternate_key is the secondary lookup key if key is not found
+ * @param value value in which to place the map's stored property value
+ * @returns true if found, false otherwise.
+ */
+ bool get(const std::string &key, const std::string &alternate_key, std::string &value);
/**
* Returns the configuration value or an empty string.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 21cce95..4bcf315 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -55,6 +55,7 @@ const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.
const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name";
const char *Configure::nifi_rest_api_password = "nifi.rest.api.password";
+const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch";
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index c840f14..9206f41 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -54,7 +54,6 @@
#include "core/Connectable.h"
#include "utils/HTTPClient.h"
-
#ifdef _MSC_VER
#ifndef PATH_MAX
#define PATH_MAX 260
@@ -189,7 +188,7 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
this->root_ = std::move(newRoot);
loadFlowRepo();
initialized_ = true;
- bool started = start();
+ bool started = start() == 0;
updating_ = false;
@@ -358,7 +357,7 @@ void FlowController::initializeC2() {
std::string c2_enable_str;
- if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) {
+ if (configuration_->get(Configure::nifi_c2_enable, "c2.enable", c2_enable_str)) {
bool enable_c2 = true;
utils::StringUtils::StringToBool(c2_enable_str, enable_c2);
c2_enabled_ = enable_c2;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index f5537fc..e64a92f 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -33,7 +33,7 @@ Properties::Properties()
}
// Get the config value
-bool Properties::get(std::string key, std::string &value) {
+bool Properties::get(const std::string &key, std::string &value) {
std::lock_guard<std::mutex> lock(mutex_);
auto it = properties_.find(key);
@@ -45,6 +45,25 @@ bool Properties::get(std::string key, std::string &value) {
}
}
+bool Properties::get(const std::string &key, const std::string &alternate_key, std::string &value) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto it = properties_.find(key);
+
+ if (it == properties_.end()) {
+ it = properties_.find(alternate_key);
+ if (it != properties_.end()) {
+ logger_->log_warn("%s is an alternate property that may not be supported in future releases. Please use %s instead.", alternate_key, key);
+ }
+ }
+
+ if (it != properties_.end()) {
+ value = it->second;
+ return true;
+ } else {
+ return false;
+ }
+}
+
int Properties::getInt(const std::string &key, int default_value) {
std::lock_guard<std::mutex> lock(mutex_);
auto it = properties_.find(key);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 10d8d29..8db6894 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -62,7 +62,7 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count();
// place priority on messages to send to the c2 server
- if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
+ if ( protocol_ != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
if (requests.size() > 0) {
int count = 0;
do {
@@ -80,6 +80,8 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
performHeartBeat();
}
+ checkTriggers();
+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
};
@@ -102,11 +104,31 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
functions_.push_back(c2_consumer_);
}
+void C2Agent::checkTriggers() {
+ logger_->log_info("Checking %d triggers", triggers_.size());
+ for (const auto &trigger : triggers_) {
+ if (trigger->triggered()) {
+ /**
+ * Action was triggered, so extract it.
+ */
+ C2Payload &&triggerAction = trigger->getAction();
+ logger_->log_trace("%s action triggered", trigger->getName());
+ // handle the response the same way. This means that
+ // acknowledgements will be sent to the c2 server for every trigger action.
+ // this is expected
+ extractPayload(std::move(triggerAction));
+ // call reset if the trigger supports this activity
+ trigger->reset();
+ } else {
+ logger_->log_trace("%s action not triggered", trigger->getName());
+ }
+ }
+}
void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconfigure) {
std::string clazz, heartbeat_period, device;
if (!reconfigure) {
- if (!configure->get("c2.agent.protocol.class", clazz)) {
+ if (!configure->get("nifi.c2.agent.protocol.class", "c2.agent.protocol.class", clazz)) {
clazz = "RESTSender";
}
logger_->log_info("Class is %s", clazz);
@@ -132,7 +154,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
protocol_.load()->update(configure);
}
- if (configure->get("c2.agent.heartbeat.period", heartbeat_period)) {
+ if (configure->get("nifi.c2.agent.heartbeat.period", "c2.agent.heartbeat.period", heartbeat_period)) {
try {
heart_beat_period_ = std::stoi(heartbeat_period);
} catch (const std::invalid_argument &ie) {
@@ -144,12 +166,12 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
}
std::string update_settings;
- if (configure->get("c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) {
+ if (configure->get("nifi.c2.agent.update.allow", "c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) {
// allow the agent to be updated. we then need to get an update command to execute after
}
if (allow_updates_) {
- if (!configure->get("c2.agent.update.command", update_command_)) {
+ if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) {
char cwd[1024];
getcwd(cwd, sizeof(cwd));
@@ -158,7 +180,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
update_command_ = command.str();
}
- if (!configure->get("c2.agent.update.temp.location", update_location_)) {
+ if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) {
char cwd[1024];
getcwd(cwd, sizeof(cwd));
@@ -169,10 +191,10 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
}
// if not defined we won't beable to update
- configure->get("c2.agent.bin.location", bin_location_);
+ configure->get("nifi.c2.agent.bin.location", "c2.agent.bin.location", bin_location_);
}
std::string heartbeat_reporters;
- if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) {
+ if (configure->get("nifi.c2.agent.heartbeat.reporter.classes", "c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) {
std::vector<std::string> reporters = utils::StringUtils::split(heartbeat_reporters, ",");
std::lock_guard<std::mutex> lock(heartbeat_mutex);
for (auto reporter : reporters) {
@@ -187,6 +209,22 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
}
}
+ std::string trigger_classes;
+ if (configure->get("nifi.c2.agent.trigger.classes", "c2.agent.trigger.classes", trigger_classes)) {
+ std::vector<std::string> triggers = utils::StringUtils::split(trigger_classes, ",");
+ std::lock_guard<std::mutex> lock(heartbeat_mutex);
+ for (auto trigger : triggers) {
+ auto trigger_obj = core::ClassLoader::getDefaultClassLoader().instantiate(trigger, trigger);
+ if (trigger_obj == nullptr) {
+ logger_->log_debug("Could not instantiate %s", trigger);
+ } else {
+ std::shared_ptr<C2Trigger> trg_impl = std::static_pointer_cast<C2Trigger>(trigger_obj);
+ trg_impl->initialize(configuration_);
+ triggers_.push_back(trg_impl);
+ }
+ }
+ }
+
auto base_reporter = "ControllerSocketProtocol";
auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter);
if (heartbeat_reporter_obj == nullptr) {
@@ -514,23 +552,38 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
// just get the raw data.
C2Payload payload(Operation::TRANSFER, false, true);
- C2Payload &&response = protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, false);
+ auto urlStr = url->second.to_string();
- auto raw_data = response.getRawData();
- std::string file_path = std::string(raw_data.data(), raw_data.size());
+ std::string file_path = urlStr;
+ if (nullptr != protocol_ && file_path.find("http") != std::string::npos) {
+ C2Payload &&response = protocol_.load()->consumePayload(urlStr, payload, RECEIVE, false);
+
+ auto raw_data = response.getRawData();
+ file_path = std::string(raw_data.data(), raw_data.size());
+ }
std::ifstream new_conf(file_path);
std::string raw_data_str((std::istreambuf_iterator<char>(new_conf)), std::istreambuf_iterator<char>());
unlink(file_path.c_str());
// if we can apply the update, we will acknowledge it and then backup the configuration file.
- if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) {
+ if (update_sink_->applyUpdate(urlStr, raw_data_str)) {
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
enqueue_c2_response(std::move(response));
if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
// update nifi.flow.configuration.file=./conf/config.yml
std::string config_file;
+
configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file);
+ std::string adjustedFilename;
+ if (config_file[0] != '/') {
+ adjustedFilename = adjustedFilename + configuration_->getHome() + "/" + config_file;
+ } else {
+ adjustedFilename += config_file;
+ }
+
+ config_file = adjustedFilename;
+
std::stringstream config_file_backup;
config_file_backup << config_file << ".bak";
// we must be able to successfuly copy the file.
@@ -540,14 +593,15 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) {
if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) {
+ logger_->log_debug("Cannot copy %s to %s", config_file, config_file_backup.str());
persist_config = false;
}
}
+ logger_->log_debug("Copy %s to %s %d", config_file, config_file_backup.str(), persist_config);
if (persist_config) {
std::ofstream writer(config_file);
if (writer.is_open()) {
- auto output = response.getRawData();
- writer.write(output.data(), output.size());
+ writer.write(raw_data_str.data(), raw_data_str.size());
}
writer.close();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
new file mode 100644
index 0000000..84cf0d3
--- /dev/null
+++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/triggers/FileUpdateTrigger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Returns a payload implementing a C2 action
+ */
+C2Payload FileUpdateTrigger::getAction() {
+ if (update_) {
+ C2Payload response_payload(Operation::UPDATE, state::UpdateState::READ_COMPLETE, true, true);
+ C2ContentResponse resp(Operation::UPDATE);
+ resp.ident = "triggered";
+ resp.name = "configuration";
+ resp.operation_arguments["location"] = file_;
+ resp.operation_arguments["persist"] = "true";
+ response_payload.addContent(std::move(resp));
+ update_ = false;
+ return response_payload;
+ }
+ C2Payload response_payload(Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true, true);
+ return response_payload;
+}
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/test/unit/FileTriggerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/FileTriggerTests.cpp b/libminifi/test/unit/FileTriggerTests.cpp
new file mode 100644
index 0000000..c05327c
--- /dev/null
+++ b/libminifi/test/unit/FileTriggerTests.cpp
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <uuid/uuid.h>
+#include <memory>
+
+#include "c2/triggers/FileUpdateTrigger.h"
+#include "../TestBase.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "core/ClassLoader.h"
+#include "core/yaml/YamlConfiguration.h"
+
+TEST_CASE("Empty file", "[t1]") {
+ minifi::c2::FileUpdateTrigger trigger("test");
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ trigger.initialize(configuration);
+
+ REQUIRE(false == trigger.triggered());
+ REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation());
+}
+
+TEST_CASE("invalidfile file", "[t2]") {
+ minifi::c2::FileUpdateTrigger trigger("test");
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_c2_file_watch, "/tmp/blahblahblhalbha");
+ trigger.initialize(configuration);
+
+ REQUIRE(false == trigger.triggered());
+ REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation());
+}
+
+TEST_CASE("test valid file no update", "[t3]") {
+ TestController testController;
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ std::string path = ss.str();
+ file.open(path, std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ minifi::c2::FileUpdateTrigger trigger("test");
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_c2_file_watch, path);
+ trigger.initialize(configuration);
+
+ REQUIRE(false == trigger.triggered());
+ REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation());
+}
+
+TEST_CASE("test valid file update", "[t4]") {
+ TestController testController;
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ std::string path = ss.str();
+ file.open(path, std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ minifi::c2::FileUpdateTrigger trigger("test");
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_c2_file_watch, path);
+ trigger.initialize(configuration);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ file.open(path, std::ios::out);
+ file << "tempFiles";
+ file.close();
+
+ REQUIRE(true == trigger.triggered());
+
+ REQUIRE(minifi::c2::Operation::UPDATE == trigger.getAction().getOperation());
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index a99c219..23c7e70 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -36,7 +36,6 @@
#include <vector>
#include <queue>
#include <map>
-#include <yaml-cpp/yaml.h>
#include <iostream>
#include "ResourceClaim.h"
#include "core/Core.h"