You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/12 19:24:06 UTC

nifi-minifi-cpp git commit: MINIFICPP-618: Add C2 triggers, first of which monitors a local file for changes

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master e0d45609b -> ed8221b14


MINIFICPP-618: Add C2 triggers, first of which monitors a local file for changes

MINIFICPP-624: Add alternate names for C2 configuration items and
support both

This closes #415.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ed8221b1
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ed8221b1
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ed8221b1

Branch: refs/heads/master
Commit: ed8221b14d8ba9b4d1713cb618bca6f194e6bcf7
Parents: e0d4560
Author: Marc Parisi <ph...@apache.org>
Authored: Tue Sep 25 17:45:07 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Oct 12 15:22:30 2018 -0400

----------------------------------------------------------------------
 C2.md                                           |  54 ++++++--
 extensions/http-curl/protocols/RESTReceiver.cpp |   4 +-
 extensions/http-curl/protocols/RESTSender.cpp   |  12 +-
 libminifi/CMakeLists.txt                        |   2 +-
 libminifi/include/c2/C2Agent.h                  |  18 ++-
 libminifi/include/c2/C2Trigger.h                |  82 ++++++++++++
 .../include/c2/triggers/FileUpdateTrigger.h     | 126 +++++++++++++++++++
 libminifi/include/properties/Configure.h        |   5 +-
 libminifi/include/properties/Properties.h       |  20 ++-
 libminifi/src/Configure.cpp                     |   1 +
 libminifi/src/FlowController.cpp                |   5 +-
 libminifi/src/Properties.cpp                    |  21 +++-
 libminifi/src/c2/C2Agent.cpp                    |  82 +++++++++---
 libminifi/src/c2/triggers/FileUpdateTrigger.cpp |  49 ++++++++
 libminifi/test/unit/FileTriggerTests.cpp        |  99 +++++++++++++++
 main/MiNiFiMain.cpp                             |   1 -
 16 files changed, 536 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/C2.md
----------------------------------------------------------------------
diff --git a/C2.md b/C2.md
index b41c29c..9d9068a 100644
--- a/C2.md
+++ b/C2.md
@@ -25,6 +25,10 @@ options defined are located in minifi.properties.
 - [Configuration](#configuration)
   - [Base Options](#base-options)
   - [Metrics](#metrics)
+  - [Protocols](#protocols)
+  - [Triggers](#triggers)
+  - [UpdatePolicies](#updatepolicies)
+ - [Documentation](#documentation)
 
 ## Description
 
@@ -41,6 +45,10 @@ will be explained in greater detail in the metrics section.
 For more more insight into the API used within the C2 agent, please visit:
 https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
 
+Release 0.6.0: Please note that all c2 properties now exist as nifi.c2.* . If your configuration properties
+files contain the former naming convention of c2.*, we will continue to support that as
+an alternate key, but you are encouraged to switch your configuration options as soon as possible.
+
 
     in minifi.properties
 
@@ -51,19 +59,19 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
 	nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
 	
 	# specify C2 protocol -- default is RESTSender if this is not specified
-	c2.agent.protocol.class=RESTSender
+	nifi.c2.agent.protocol.class=RESTSender
 	# may also use MQTT
-	# c2.agent.protocol.class=MQTTC2Protocol
+	# nifi.c2.agent.protocol.class=MQTTC2Protocol
 	
 	# control c2 heartbeat interval in millisecocnds
-	c2.agent.heartbeat.period=3000
+	nifi.c2.agent.heartbeat.period=3000
 	
 	# enable reporter classes
-	c2.agent.heartbeat.reporter.class=RESTReciver
+	nifi.c2.agent.heartbeat.reporter.class=RESTReciver
 	
 	# specify the rest URIs if using RESTSender
-	c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat
-	c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge
+	nifi.c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat
+	nifi.c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge
 	
 	# c2 agent identifier
 	nifi.c2.agent.identifier=<your identifier>
@@ -72,7 +80,7 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
 	nifi.c2.agent.class=<your agent class>
 	
 	# configure SSL Context service for REST Protocol
-	c2.rest.ssl.context.service
+	nifi.c2.rest.ssl.context.service
 
 
 ### Metrics
@@ -287,7 +295,7 @@ will forward responses and updates to the heartbeating agents.
 	Remote Process Groups: []
 	NiFi Properties Overrides: {}
 
-### Update Policies
+### UpdatePolicies
 
 Updates to MiNiFi C++ properties can be controlled through an UpdatePolicyControllerService named
 C2UpdatePolicy. The service supports several configuration options. They are defined in the following example:   
@@ -308,7 +316,31 @@ C2UpdatePolicy. The service supports several configuration options. They are def
 	    		 Property_3:true
 	             Property_4:true
 	
-### Update Type Descriptions
+ 	
+### Triggers
+  
+  C2 Triggers can be activated to perform some C2 activity via a local event. Currently only FileUpdateTrigger exists, which monitors
+  for C2 File triggers to update the flow configuration. Classes can be defined as a comma separated list of classes to load via the option
+  nifi.c2.agent.trigger.classes
+  
+  
+#### C2 File triggers
+
+C2 updates can be triggered with updates to a flow configuration file. It doesn't have to be the same base configuration file. It
+will be copied into place. A new property, nifi.c2.file.watch, can be placed into minifi.properties to monitor. If the update time
+changes while the agent is running, it will be copied into place of the file defined by nifi.flow.configuration.file. The agent
+will then be restarted with the new flow configuration. If a failure occurs in reading that file or it is an invalid YAML file, the 
+update process will be halted.
+
+    in minifi.properties to activate the file update trigger specify
+
+	# specifying a trigger
+	nifi.c2.agent.trigger.classes=FileUpdateTrigger
+	nifi.c2.file.watch=<full path of file to monitor>
+	
+
+	
+## Documentation
 
 Type descriptions ( class descriptions entered in PROCESSORS.md ) can be automatically placed within C2 by building cmake with
 the following flag:
@@ -320,6 +352,4 @@ the following flag:
  When cmake is instantiated with this, a build will re-generate the type descriptions from PROCESSORS.md. Once this is finished
  you may re-build the project with the following command from the build directory, running the build as you normally would: 
 
-	cmake -DBOOTSTRAP= ..  
- 	
- 	
\ No newline at end of file
+	cmake -DBOOTSTRAP= ..  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/extensions/http-curl/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
index d2c17b6..babc983 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -49,8 +49,8 @@ void RESTReceiver::initialize(const std::shared_ptr<core::controller::Controller
   logger_->log_trace("Initializing rest receiver");
   if (nullptr != configuration_) {
     std::string listeningPort,rootUri="/", caCert;
-    configuration_->get("c2.rest.listener.port", listeningPort);
-    configuration_->get("c2.rest.listener.cacert", caCert);
+    configuration_->get("nifi.c2.rest.listener.port","c2.rest.listener.port", listeningPort);
+    configuration_->get("nifi.c2.rest.listener.cacert","c2.rest.listener.cacert", caCert);
     if (!listeningPort.empty() && !rootUri.empty()) {
       handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
       if (!caCert.empty()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 7db998a..9b4ce5e 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -46,15 +46,15 @@ void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerSe
   // base URL when one is not specified.
   if (nullptr != configure) {
     std::string update_str, ssl_context_service_str;
-    configure->get("c2.rest.url", rest_uri_);
-    configure->get("c2.rest.url.ack", ack_uri_);
-    if (configure->get("c2.rest.ssl.context.service", ssl_context_service_str)) {
+    configure->get("nifi.c2.rest.url","c2.rest.url", rest_uri_);
+    configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", ack_uri_);
+    if (configure->get("nifi.c2.rest.ssl.context.service","c2.rest.ssl.context.service", ssl_context_service_str)) {
       auto service = controller->getControllerService(ssl_context_service_str);
       if (nullptr != service) {
         ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
       }
     }
-    configure->get("c2.rest.heartbeat.minimize.updates", update_str);
+    configure->get("nifi.c2.rest.heartbeat.minimize.updates","c2.rest.heartbeat.minimize.updates", update_str);
     utils::StringUtils::StringToBool(update_str, minimize_updates_);
   }
   logger_->log_debug("Submitting to %s", rest_uri_);
@@ -78,8 +78,8 @@ C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction directi
 
 void RESTSender::update(const std::shared_ptr<Configure> &configure) {
   std::string url;
-  configure->get("c2.rest.url", url);
-  configure->get("c2.rest.url.ack", url);
+  configure->get("nifi.c2.rest.url","c2.rest.url", url);
+  configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", url);
 }
 
 const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 1f556d0..11eec1c 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -97,7 +97,7 @@ find_package(OpenSSL)
 if (OPENSSL_FOUND)
 	set(TLS_SOURCES "src/io/tls/*.cpp")
 endif(OPENSSL_FOUND)
-file(GLOB SOURCES  "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES  "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
 
 file(GLOB PROCESSOR_SOURCES  "src/processors/*.cpp" )
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index e9ff4e4..cc575a2 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -30,6 +30,7 @@
 #include "controllers/UpdatePolicyControllerService.h"
 #include "core/state/Value.h"
 #include "C2Payload.h"
+#include "C2Trigger.h"
 #include "C2Protocol.h"
 #include "io/validation.h"
 #include "HeartBeatReporter.h"
@@ -90,11 +91,23 @@ class C2Agent : public state::UpdateController, public state::response::Response
 
  protected:
 
+  /**
+   * Restarts this agent.
+   */
   void restart_agent();
 
+  /**
+   * Update agent per the provided C2 update from c2 server or triggers
+   */
   void update_agent();
 
   /**
+   * Check the collection of triggers for any updates that need to be handled.
+   * This is an optional step
+   */
+  void checkTriggers();
+
+  /**
    * Configure the C2 agent
    */
   void configure(const std::shared_ptr<Configure> &configure, bool reconfigure = true);
@@ -212,6 +225,8 @@ class C2Agent : public state::UpdateController, public state::response::Response
 
   std::vector<std::shared_ptr<HeartBeatReporter>> heartbeat_protocols_;
 
+  std::vector<std::shared_ptr<C2Trigger>> triggers_;
+
   std::atomic<C2Protocol*> protocol_;
 
   bool allow_updates_;
@@ -223,8 +238,7 @@ class C2Agent : public state::UpdateController, public state::response::Response
   std::string bin_location_;
 
   std::shared_ptr<logging::Logger> logger_;
-}
-;
+};
 
 } /* namesapce c2 */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/C2Trigger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Trigger.h b/libminifi/include/c2/C2Trigger.h
new file mode 100644
index 0000000..87f33f9
--- /dev/null
+++ b/libminifi/include/c2/C2Trigger.h
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_
+#define LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_
+
+#include "core/Connectable.h"
+#include "c2/C2Payload.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose: Defines basic triggering mechanism for command and control interfaces
+ *
+ * Design: Extends Connectable so that we can instantiate with the class name
+ *
+ * The state machine expects triggered (yes ) -> getAction -> reset(optional)
+ */
+class C2Trigger : public core::Connectable{
+ public:
+
+  C2Trigger(std::string name, utils::Identifier uuid)
+        : core::Connectable(name, uuid){
+
+  }
+  virtual ~C2Trigger() {
+  }
+
+
+  /**
+   * initializes trigger with minifi configuration.
+   */
+  virtual void initialize(const std::shared_ptr<minifi::Configure> &configuration) = 0;
+  /**
+   * returns true if triggered, false otherwise. calling this function multiple times
+   * may change internal state.
+   */
+  virtual bool triggered() = 0;
+
+  /**
+   * Resets actions once they have been triggered. The flow of events does not require
+   * this to occur after an action has been triggered. Instead this is optional
+   * and a feature available to potential triggers that require a reset.
+   *
+   * This will occur because there are times in which the C2Action may take a significant
+   * amount of time and a reset is in order to avoid continual triggering.
+   */
+  virtual void reset() = 0;
+
+  /**
+   * Returns a payload implementing a C2 action. May or may not reset the action.
+   * @return C2Payload of the action to perform.
+   */
+  virtual C2Payload getAction() = 0;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/triggers/FileUpdateTrigger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h
new file mode 100644
index 0000000..031a245
--- /dev/null
+++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_
+#define LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_
+#include <atomic>
+#include "c2/C2Trigger.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+#include "c2/C2Payload.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose: Defines a file update trigger when the last write time of a file has been changed.
+ * Design: Extends C2Trigger, and implements a trigger, action, reset state machine. Calling
+ * triggered will check the file.
+ */
+class FileUpdateTrigger : public C2Trigger {
+ public:
+
+  FileUpdateTrigger(std::string name, utils::Identifier uuid = utils::Identifier())
+      : C2Trigger(name, uuid),
+        last_update_(0),
+        update_(false),
+        logger_(logging::LoggerFactory<FileUpdateTrigger>::getLogger()) {
+  }
+
+  void initialize(const std::shared_ptr<minifi::Configure> &configuration) {
+    if (nullptr != configuration) {
+      if (configuration->get(minifi::Configure::nifi_c2_file_watch, "c2.file.watch", file_)) {
+        last_update_ = utils::file::FileUtils::last_write_time(file_);
+      } else {
+        logger_->log_trace("Could not configure file");
+      }
+
+    }
+  }
+
+  virtual bool triggered() {
+    if (last_update_ == 0) {
+      logger_->log_trace("Last Update is zero");
+      return false;
+    }
+    auto update_time = utils::file::FileUtils::last_write_time(file_);
+    logger_->log_trace("Last Update is %d and update time is %d", last_update_.load(), update_time);
+    if (update_time > last_update_) {
+      last_update_ = update_time;
+      update_ = true;
+      return true;
+    }
+    return false;
+  }
+
+  virtual void reset() {
+    // reset the last write time
+    last_update_ = utils::file::FileUtils::last_write_time(file_);
+    update_ = false;
+  }
+
+  /**
+   * Returns an update payload implementing a C2 action
+   */
+  virtual C2Payload getAction();
+
+  /**
+   * Determines if we are connected and operating
+   */
+  virtual bool isRunning() {
+    return true;
+  }
+
+  /**
+   * Block until work is available on any input connection, or the given duration elapses
+   * @param timeoutMs timeout in milliseconds
+   */
+
+  virtual void yield() {
+
+  }
+
+  /**
+   * Determines if work is available by this connectable
+   * @return boolean if work is available.
+   */
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
+ protected:
+  std::string file_;
+  std::atomic<uint64_t> last_update_;
+  std::atomic<bool> update_;
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+// add the trigger to the known resources.
+REGISTER_RESOURCE(FileUpdateTrigger)
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 872c35d..4fb68dc 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -41,7 +41,6 @@ class Configure : public Properties {
   }
   // nifi.flow.configuration.file
   static const char *nifi_default_directory;
-  static const char *nifi_c2_enable;
   static const char *nifi_flow_configuration_file;
   static const char *nifi_flow_configuration_file_backup_update;
   static const char *nifi_flow_engine_threads;
@@ -78,6 +77,10 @@ class Configure : public Properties {
   // nifi rest api user name and password
   static const char *nifi_rest_api_user_name;
   static const char *nifi_rest_api_password;
+  // c2 options
+
+  static const char *nifi_c2_enable;
+  static const char *nifi_c2_file_watch;
 
  private:
   std::string agent_identifier_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/properties/Properties.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index ec0ca5d..eadb77d 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -57,8 +57,24 @@ class Properties {
     std::lock_guard<std::mutex> lock(mutex_);
     return (properties_.find(key) != properties_.end());
   }
-  // Get the config value
-  bool get(std::string key, std::string &value);
+  /**
+   * Returns the config value by placing it into the referenced param value
+   * @param key key to look up
+   * @param value value in which to place the map's stored property value
+   * @returns true if found, false otherwise.
+   */
+  bool get(const std::string &key, std::string &value);
+
+  /**
+   * Returns the config value by placing it into the referenced param value
+   * Uses alternate_key if key is not found within the map.
+   *
+   * @param key key to look up
+   * @param alternate_key is the secondary lookup key if key is not found
+   * @param value value in which to place the map's stored property value
+   * @returns true if found, false otherwise.
+   */
+  bool get(const std::string &key, const std::string &alternate_key, std::string &value);
 
   /**
    * Returns the configuration value or an empty string.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 21cce95..4bcf315 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -55,6 +55,7 @@ const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.
 const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
 const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name";
 const char *Configure::nifi_rest_api_password = "nifi.rest.api.password";
+const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch";
 
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index c840f14..9206f41 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -54,7 +54,6 @@
 #include "core/Connectable.h"
 #include "utils/HTTPClient.h"
 
-
 #ifdef _MSC_VER
 #ifndef PATH_MAX
 #define PATH_MAX 260
@@ -189,7 +188,7 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
   this->root_ = std::move(newRoot);
   loadFlowRepo();
   initialized_ = true;
-  bool started = start();
+  bool started = start() == 0;
 
   updating_ = false;
 
@@ -358,7 +357,7 @@ void FlowController::initializeC2() {
 
   std::string c2_enable_str;
 
-  if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) {
+  if (configuration_->get(Configure::nifi_c2_enable, "c2.enable", c2_enable_str)) {
     bool enable_c2 = true;
     utils::StringUtils::StringToBool(c2_enable_str, enable_c2);
     c2_enabled_ = enable_c2;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index f5537fc..e64a92f 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -33,7 +33,7 @@ Properties::Properties()
 }
 
 // Get the config value
-bool Properties::get(std::string key, std::string &value) {
+bool Properties::get(const std::string &key, std::string &value) {
   std::lock_guard<std::mutex> lock(mutex_);
   auto it = properties_.find(key);
 
@@ -45,6 +45,25 @@ bool Properties::get(std::string key, std::string &value) {
   }
 }
 
+bool Properties::get(const std::string &key, const std::string &alternate_key, std::string &value) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  auto it = properties_.find(key);
+
+  if (it == properties_.end()) {
+    it = properties_.find(alternate_key);
+    if (it != properties_.end()) {
+      logger_->log_warn("%s is an alternate property that may not be supported in future releases. Please use %s instead.", alternate_key, key);
+    }
+  }
+
+  if (it != properties_.end()) {
+    value = it->second;
+    return true;
+  } else {
+    return false;
+  }
+}
+
 int Properties::getInt(const std::string &key, int default_value) {
   std::lock_guard<std::mutex> lock(mutex_);
   auto it = properties_.find(key);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 10d8d29..8db6894 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -62,7 +62,7 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
     auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count();
 
     // place priority on messages to send to the c2 server
-      if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
+      if ( protocol_ != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
         if (requests.size() > 0) {
           int count = 0;
           do {
@@ -80,6 +80,8 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
         performHeartBeat();
       }
 
+      checkTriggers();
+
       std::this_thread::sleep_for(std::chrono::milliseconds(500));
       return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
     };
@@ -102,11 +104,31 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
   functions_.push_back(c2_consumer_);
 }
 
+void C2Agent::checkTriggers() {
+  logger_->log_info("Checking %d triggers", triggers_.size());
+  for (const auto &trigger : triggers_) {
+    if (trigger->triggered()) {
+      /**
+       * Action was triggered, so extract it.
+       */
+      C2Payload &&triggerAction = trigger->getAction();
+      logger_->log_trace("%s action triggered", trigger->getName());
+      // handle the response the same way. This means that
+      // acknowledgements will be sent to the c2 server for every trigger action.
+      // this is expected
+      extractPayload(std::move(triggerAction));
+      // call reset if the trigger supports this activity
+      trigger->reset();
+    } else {
+      logger_->log_trace("%s action not triggered", trigger->getName());
+    }
+  }
+}
 void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconfigure) {
   std::string clazz, heartbeat_period, device;
 
   if (!reconfigure) {
-    if (!configure->get("c2.agent.protocol.class", clazz)) {
+    if (!configure->get("nifi.c2.agent.protocol.class", "c2.agent.protocol.class", clazz)) {
       clazz = "RESTSender";
     }
     logger_->log_info("Class is %s", clazz);
@@ -132,7 +154,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
     protocol_.load()->update(configure);
   }
 
-  if (configure->get("c2.agent.heartbeat.period", heartbeat_period)) {
+  if (configure->get("nifi.c2.agent.heartbeat.period", "c2.agent.heartbeat.period", heartbeat_period)) {
     try {
       heart_beat_period_ = std::stoi(heartbeat_period);
     } catch (const std::invalid_argument &ie) {
@@ -144,12 +166,12 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
   }
 
   std::string update_settings;
-  if (configure->get("c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) {
+  if (configure->get("nifi.c2.agent.update.allow", "c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) {
     // allow the agent to be updated. we then need to get an update command to execute after
   }
 
   if (allow_updates_) {
-    if (!configure->get("c2.agent.update.command", update_command_)) {
+    if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) {
       char cwd[1024];
       getcwd(cwd, sizeof(cwd));
 
@@ -158,7 +180,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
       update_command_ = command.str();
     }
 
-    if (!configure->get("c2.agent.update.temp.location", update_location_)) {
+    if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) {
       char cwd[1024];
       getcwd(cwd, sizeof(cwd));
 
@@ -169,10 +191,10 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
     }
 
     // if not defined we won't beable to update
-    configure->get("c2.agent.bin.location", bin_location_);
+    configure->get("nifi.c2.agent.bin.location", "c2.agent.bin.location", bin_location_);
   }
   std::string heartbeat_reporters;
-  if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) {
+  if (configure->get("nifi.c2.agent.heartbeat.reporter.classes", "c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) {
     std::vector<std::string> reporters = utils::StringUtils::split(heartbeat_reporters, ",");
     std::lock_guard<std::mutex> lock(heartbeat_mutex);
     for (auto reporter : reporters) {
@@ -187,6 +209,22 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
     }
   }
 
+  std::string trigger_classes;
+  if (configure->get("nifi.c2.agent.trigger.classes", "c2.agent.trigger.classes", trigger_classes)) {
+    std::vector<std::string> triggers = utils::StringUtils::split(trigger_classes, ",");
+    std::lock_guard<std::mutex> lock(heartbeat_mutex);
+    for (auto trigger : triggers) {
+      auto trigger_obj = core::ClassLoader::getDefaultClassLoader().instantiate(trigger, trigger);
+      if (trigger_obj == nullptr) {
+        logger_->log_debug("Could not instantiate %s", trigger);
+      } else {
+        std::shared_ptr<C2Trigger> trg_impl = std::static_pointer_cast<C2Trigger>(trigger_obj);
+        trg_impl->initialize(configuration_);
+        triggers_.push_back(trg_impl);
+      }
+    }
+  }
+
   auto base_reporter = "ControllerSocketProtocol";
   auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter);
   if (heartbeat_reporter_obj == nullptr) {
@@ -514,23 +552,38 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
       // just get the raw data.
       C2Payload payload(Operation::TRANSFER, false, true);
 
-      C2Payload &&response = protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, false);
+      auto urlStr = url->second.to_string();
 
-      auto raw_data = response.getRawData();
-      std::string file_path = std::string(raw_data.data(), raw_data.size());
+      std::string file_path = urlStr;
+      if (nullptr != protocol_ && file_path.find("http") != std::string::npos) {
+        C2Payload &&response = protocol_.load()->consumePayload(urlStr, payload, RECEIVE, false);
+
+        auto raw_data = response.getRawData();
+        file_path = std::string(raw_data.data(), raw_data.size());
+      }
 
       std::ifstream new_conf(file_path);
       std::string raw_data_str((std::istreambuf_iterator<char>(new_conf)), std::istreambuf_iterator<char>());
       unlink(file_path.c_str());
       // if we can apply the update, we will acknowledge it and then backup the configuration file.
-      if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) {
+      if (update_sink_->applyUpdate(urlStr, raw_data_str)) {
         C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
 
         if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
           // update nifi.flow.configuration.file=./conf/config.yml
           std::string config_file;
+
           configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file);
+          std::string adjustedFilename;
+          if (config_file[0] != '/') {
+            adjustedFilename = adjustedFilename + configuration_->getHome() + "/" + config_file;
+          } else {
+            adjustedFilename += config_file;
+          }
+
+          config_file = adjustedFilename;
+
           std::stringstream config_file_backup;
           config_file_backup << config_file << ".bak";
           // we must be able to successfuly copy the file.
@@ -540,14 +593,15 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
 
           if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) {
             if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) {
+              logger_->log_debug("Cannot copy %s to %s", config_file, config_file_backup.str());
               persist_config = false;
             }
           }
+          logger_->log_debug("Copy %s to %s %d", config_file, config_file_backup.str(), persist_config);
           if (persist_config) {
             std::ofstream writer(config_file);
             if (writer.is_open()) {
-              auto output = response.getRawData();
-              writer.write(output.data(), output.size());
+              writer.write(raw_data_str.data(), raw_data_str.size());
             }
             writer.close();
           }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
new file mode 100644
index 0000000..84cf0d3
--- /dev/null
+++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/triggers/FileUpdateTrigger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Returns a payload implementing a C2 action
+ */
+C2Payload FileUpdateTrigger::getAction() {
+  if (update_) {
+    C2Payload response_payload(Operation::UPDATE, state::UpdateState::READ_COMPLETE, true, true);
+    C2ContentResponse resp(Operation::UPDATE);
+    resp.ident = "triggered";
+    resp.name = "configuration";
+    resp.operation_arguments["location"] = file_;
+    resp.operation_arguments["persist"] = "true";
+    response_payload.addContent(std::move(resp));
+    update_ = false;
+    return response_payload;
+  }
+  C2Payload response_payload(Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true, true);
+  return response_payload;
+}
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/test/unit/FileTriggerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/FileTriggerTests.cpp b/libminifi/test/unit/FileTriggerTests.cpp
new file mode 100644
index 0000000..c05327c
--- /dev/null
+++ b/libminifi/test/unit/FileTriggerTests.cpp
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <uuid/uuid.h>
+#include <memory>
+
+#include "c2/triggers/FileUpdateTrigger.h"
+#include "../TestBase.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "core/ClassLoader.h"
+#include "core/yaml/YamlConfiguration.h"
+
+TEST_CASE("Empty file", "[t1]") {
+  minifi::c2::FileUpdateTrigger trigger("test");
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  trigger.initialize(configuration);
+
+  REQUIRE(false == trigger.triggered());
+  REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation());
+}
+
+TEST_CASE("invalidfile file", "[t2]") {
+  minifi::c2::FileUpdateTrigger trigger("test");
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_c2_file_watch, "/tmp/blahblahblhalbha");
+  trigger.initialize(configuration);
+
+  REQUIRE(false == trigger.triggered());
+  REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation());
+}
+
+TEST_CASE("test valid  file no update", "[t3]") {
+  TestController testController;
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  std::string path = ss.str();
+  file.open(path, std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  minifi::c2::FileUpdateTrigger trigger("test");
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_c2_file_watch, path);
+  trigger.initialize(configuration);
+
+  REQUIRE(false == trigger.triggered());
+  REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation());
+}
+
+TEST_CASE("test valid file update", "[t4]") {
+  TestController testController;
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  std::string path = ss.str();
+  file.open(path, std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  minifi::c2::FileUpdateTrigger trigger("test");
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_c2_file_watch, path);
+  trigger.initialize(configuration);
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  file.open(path, std::ios::out);
+  file << "tempFiles";
+  file.close();
+
+  REQUIRE(true == trigger.triggered());
+
+  REQUIRE(minifi::c2::Operation::UPDATE == trigger.getAction().getOperation());
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index a99c219..23c7e70 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -36,7 +36,6 @@
 #include <vector>
 #include <queue>
 #include <map>
-#include <yaml-cpp/yaml.h>
 #include <iostream>
 #include "ResourceClaim.h"
 #include "core/Core.h"