You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/04/05 16:11:47 UTC

[3/4] nifi-minifi-cpp git commit: MINIFI-254: Incremental update for linter changes

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/utils/StringUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 82459db..bf802e2 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -35,96 +35,97 @@ namespace utils {
  * Purpose: Houses many useful string utilities.
  */
 class StringUtils {
-public:
-	/**
-	 * Converts a string to a boolean
-	 * Better handles mixed case.
-	 * @param input input string
-	 * @param output output string.
-	 */
-	static bool StringToBool(std::string input, bool &output) {
-
-		std::transform(input.begin(), input.end(), input.begin(), ::tolower);
-		std::istringstream(input) >> std::boolalpha >> output;
-		return output;
-	}
-
-	// Trim String utils
-
-	/**
-	 * Trims a string left to right
-	 * @param s incoming string
-	 * @returns modified string
-	 */
-	static std::string trim(std::string s) {
-		return trimRight(trimLeft(s));
-	}
-
-	/**
-	 * Trims left most part of a string
-	 * @param s incoming string
-	 * @returns modified string
-	 */
-	static inline std::string trimLeft(std::string s) {
-		s.erase(s.begin(),
-				std::find_if(s.begin(), s.end(),
-						std::not1(
-								std::pointer_to_unary_function<int, int>(
-										std::isspace))));
-		return s;
-	}
-
-	/**
-	 * Trims a string on the right
-	 * @param s incoming string
-	 * @returns modified string
-	 */
-
-	static inline std::string trimRight(std::string s) {
-		s.erase(
-				std::find_if(s.rbegin(), s.rend(),
-						std::not1(
-								std::pointer_to_unary_function<int, int>(
-										std::isspace))).base(), s.end());
-		return s;
-	}
-
-	/**
-	 * Converts a string to a float
-	 * @param input input string
-	 * @param output output float
-	 * @param cp failure policy
-	 */
-	static bool StringToFloat(std::string input, float &output,
-			FailurePolicy cp = RETURN) {
-		try {
-			output = std::stof(input);
-		} catch (const std::invalid_argument &ie) {
-			switch (cp) {
-			case RETURN:
-			case NOTHING:
-				return false;
-			case EXIT:
-				exit(1);
-			case EXCEPT:
-				throw ie;
-			}
-		} catch (const std::out_of_range &ofr) {
-			switch (cp) {
-			case RETURN:
-			case NOTHING:
-				return false;
-			case EXIT:
-				exit(1);
-			case EXCEPT:
-				throw ofr;
-
-			}
-		}
-
-		return true;
-
-	}
+ public:
+  /**
+   * Converts a string to a boolean
+   * Better handles mixed case.
+   * @param input input string
+   * @param output output string.
+   */
+  static bool StringToBool(std::string input, bool &output) {
+
+    std::transform(input.begin(), input.end(), input.begin(), ::tolower);
+    std::istringstream(input) >> std::boolalpha >> output;
+    return output;
+  }
+
+  // Trim String utils
+
+  /**
+   * Trims a string left to right
+   * @param s incoming string
+   * @returns modified string
+   */
+  static std::string trim(std::string s) {
+    return trimRight(trimLeft(s));
+  }
+
+  /**
+   * Trims left most part of a string
+   * @param s incoming string
+   * @returns modified string
+   */
+  static inline std::string trimLeft(std::string s) {
+    s.erase(
+        s.begin(),
+        std::find_if(
+            s.begin(), s.end(),
+            std::not1(std::pointer_to_unary_function<int, int>(std::isspace))));
+    return s;
+  }
+
+  /**
+   * Trims a string on the right
+   * @param s incoming string
+   * @returns modified string
+   */
+
+  static inline std::string trimRight(std::string s) {
+    s.erase(
+        std::find_if(
+            s.rbegin(), s.rend(),
+            std::not1(std::pointer_to_unary_function<int, int>(std::isspace)))
+            .base(),
+        s.end());
+    return s;
+  }
+
+  /**
+   * Converts a string to a float
+   * @param input input string
+   * @param output output float
+   * @param cp failure policy
+   */
+  static bool StringToFloat(std::string input, float &output, FailurePolicy cp =
+                                RETURN) {
+    try {
+      output = std::stof(input);
+    } catch (const std::invalid_argument &ie) {
+      switch (cp) {
+        case RETURN:
+        case NOTHING:
+          return false;
+        case EXIT:
+          exit(1);
+        case EXCEPT:
+          throw ie;
+      }
+    } catch (const std::out_of_range &ofr) {
+      switch (cp) {
+        case RETURN:
+        case NOTHING:
+          return false;
+        case EXIT:
+          exit(1);
+        case EXCEPT:
+          throw ofr;
+
+      }
+    }
+
+    return true;
+
+  }
 
 };
 
@@ -134,5 +135,4 @@ public:
 } /* namespace apache */
 } /* namespace org */
 
-
 #endif /* LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/utils/TimeUtil.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index 76b334b..6805419 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -33,8 +33,8 @@
  * @returns milliseconds since epoch
  */
 inline uint64_t getTimeMillis() {
-	return std::chrono::duration_cast<std::chrono::milliseconds>(
-			std::chrono::system_clock::now().time_since_epoch()).count();
+  return std::chrono::duration_cast<std::chrono::milliseconds>(
+      std::chrono::system_clock::now().time_since_epoch()).count();
 }
 
 /**
@@ -43,8 +43,8 @@ inline uint64_t getTimeMillis() {
  */
 inline uint64_t getTimeNano() {
 
-	return std::chrono::duration_cast<std::chrono::nanoseconds>(
-			std::chrono::system_clock::now().time_since_epoch()).count();
+  return std::chrono::duration_cast<std::chrono::nanoseconds>(
+      std::chrono::system_clock::now().time_since_epoch()).count();
 
 }
 
@@ -54,21 +54,19 @@ inline uint64_t getTimeNano() {
  * @param msec milliseconds since epoch
  * @returns string representing the time
  */
-inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false)
-{
-	char date[120];
-	time_t second = (time_t) (msec/1000);
-	msec = msec % 1000;
-	strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT,
-	             ( enforce_locale==true ? gmtime(&second) : localtime(&second)));
+inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false) {
+  char date[120];
+  time_t second = (time_t) (msec / 1000);
+  msec = msec % 1000;
+  strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT,
+           (enforce_locale == true ? gmtime(&second) : localtime(&second)));
 
-	std::string ret = date;
-	date[0] = '\0';
-	sprintf(date, ".%03llu", (unsigned long long) msec);
+  std::string ret = date;
+  date[0] = '\0';
+  sprintf(date, ".%03llu", (unsigned long long) msec);
 
-	ret += date;
-	return ret;
+  ret += date;
+  return ret;
 }
 
-
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 96ed7c7..f70686d 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 #include "properties/Configure.h"
+#include <string>
 #include "utils/StringUtils.h"
-#include "core/core.h"
+#include "core/Core.h"
 
 namespace org {
 namespace apache {
@@ -25,147 +26,149 @@ namespace nifi {
 namespace minifi {
 
 Configure *Configure::configure_(NULL);
-const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file";
-const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
+const char *Configure::nifi_flow_configuration_file =
+    "nifi.flow.configuration.file";
+const char *Configure::nifi_administrative_yield_duration =
+    "nifi.administrative.yield.duration";
 const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
-const char *Configure::nifi_graceful_shutdown_seconds  = "nifi.flowcontroller.graceful.shutdown.period";
+const char *Configure::nifi_graceful_shutdown_seconds =
+    "nifi.flowcontroller.graceful.shutdown.period";
 const char *Configure::nifi_log_level = "nifi.log.level";
 const char *Configure::nifi_server_name = "nifi.server.name";
-const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name";
-const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name";
-const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name";
+const char *Configure::nifi_configuration_class_name =
+    "nifi.flow.configuration.class.name";
+const char *Configure::nifi_flow_repository_class_name =
+    "nifi.flow.repository.class.name";
+const char *Configure::nifi_provenance_repository_class_name =
+    "nifi.provenance.repository.class.name";
 const char *Configure::nifi_server_port = "nifi.server.port";
-const char *Configure::nifi_server_report_interval= "nifi.server.report.interval";
-const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
-const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
-const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
-const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
-const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
-const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
+const char *Configure::nifi_server_report_interval =
+    "nifi.server.report.interval";
+const char *Configure::nifi_provenance_repository_max_storage_size =
+    "nifi.provenance.repository.max.storage.size";
+const char *Configure::nifi_provenance_repository_max_storage_time =
+    "nifi.provenance.repository.max.storage.time";
+const char *Configure::nifi_provenance_repository_directory_default =
+    "nifi.provenance.repository.directory.default";
+const char *Configure::nifi_flowfile_repository_max_storage_size =
+    "nifi.flowfile.repository.max.storage.size";
+const char *Configure::nifi_flowfile_repository_max_storage_time =
+    "nifi.flowfile.repository.max.storage.time";
+const char *Configure::nifi_flowfile_repository_directory_default =
+    "nifi.flowfile.repository.directory.default";
 const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
-const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
-const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";
-const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key";
-const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
-const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
+const char *Configure::nifi_security_need_ClientAuth =
+    "nifi.security.need.ClientAuth";
+const char *Configure::nifi_security_client_certificate =
+    "nifi.security.client.certificate";
+const char *Configure::nifi_security_client_private_key =
+    "nifi.security.client.private.key";
+const char *Configure::nifi_security_client_pass_phrase =
+    "nifi.security.client.pass.phrase";
+const char *Configure::nifi_security_client_ca_certificate =
+    "nifi.security.client.ca.certificate";
+
+#define BUFFER_SIZE 512
 
 // Get the config value
-bool Configure::get(std::string key, std::string &value)
-{
-	std::lock_guard<std::mutex> lock(mutex_);
-	auto it = properties_.find(key);
-
-	if (it != properties_.end())
-	{
-		value = it->second;
-		return true;
-	}
-	else
-	{
-		return false;
-	}
+bool Configure::get(std::string key, std::string &value) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  auto it = properties_.find(key);
+
+  if (it != properties_.end()) {
+    value = it->second;
+    return true;
+  } else {
+    return false;
+  }
 }
 
-
 // Parse one line in configure file like key=value
-void Configure::parseConfigureFileLine(char *buf)
-{
-	char *line = buf;
-
-    while ((line[0] == ' ') || (line[0] =='\t'))
-    	++line;
-
-    char first = line[0];
-    if ((first == '\0') || (first == '#')  || (first == '\r') || (first == '\n') || (first == '='))
-    {
-    	return;
-    }
-
-    char *equal = strchr(line, '=');
-    if (equal == NULL)
-    {
-    	return;
-    }
-
-    equal[0] = '\0';
-    std::string key = line;
-
-    equal++;
-    while ((equal[0] == ' ') || (equal[0] == '\t'))
-    	++equal;
-
-    first = equal[0];
-    if ((first == '\0') || (first == '\r') || (first== '\n'))
-    {
-    	return;
-    }
-
-    std::string value = equal;
-    key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key);
-    value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value);
-    set(key, value);
+void Configure::parseConfigureFileLine(char *buf) {
+  char *line = buf;
+
+  while ((line[0] == ' ') || (line[0] == '\t'))
+    ++line;
+
+  char first = line[0];
+  if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n')
+      || (first == '=')) {
+    return;
+  }
+
+  char *equal = strchr(line, '=');
+  if (equal == NULL) {
+    return;
+  }
+
+  equal[0] = '\0';
+  std::string key = line;
+
+  equal++;
+  while ((equal[0] == ' ') || (equal[0] == '\t'))
+    ++equal;
+
+  first = equal[0];
+  if ((first == '\0') || (first == '\r') || (first == '\n')) {
+    return;
+  }
+
+  std::string value = equal;
+  key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key);
+  value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value);
+  set(key, value);
 }
 
 // Load Configure File
-void Configure::loadConfigureFile(const char *fileName)
-{
-
-    std::string adjustedFilename;
-    if (fileName)
-    {
-        // perform a naive determination if this is a relative path
-        if (fileName[0] != '/')
-        {
-            adjustedFilename = adjustedFilename + configure_->getHome() + "/" + fileName;
-        }
-        else
-        {
-            adjustedFilename += fileName;
-        }
-    }
-    char *path = NULL;
-    char full_path[PATH_MAX];
-    path = realpath(adjustedFilename.c_str(), full_path);
-    logger_->log_info("Using configuration file located at %s", path);
-
-    std::ifstream file(path, std::ifstream::in);
-    if (!file.good())
-    {
-        logger_->log_error("load configure file failed %s", path);
-        return;
-    }
-    this->clear();
-    const unsigned int bufSize = 512;
-    char buf[bufSize];
-    for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize))
-    {
-        parseConfigureFileLine(buf);
+void Configure::loadConfigureFile(const char *fileName) {
+  std::string adjustedFilename;
+  if (fileName) {
+    // perform a naive determination if this is a relative path
+    if (fileName[0] != '/') {
+      adjustedFilename = adjustedFilename + configure_->getHome() + "/"
+          + fileName;
+    } else {
+      adjustedFilename += fileName;
     }
+  }
+  char *path = NULL;
+  char full_path[PATH_MAX];
+  path = realpath(adjustedFilename.c_str(), full_path);
+  logger_->log_info("Using configuration file located at %s", path);
+
+  std::ifstream file(path, std::ifstream::in);
+  if (!file.good()) {
+    logger_->log_error("load configure file failed %s", path);
+    return;
+  }
+  this->clear();
+
+  char buf[BUFFER_SIZE];
+  for (file.getline(buf, BUFFER_SIZE); file.good();
+      file.getline(buf, BUFFER_SIZE)) {
+    parseConfigureFileLine(buf);
+  }
 }
 
 // Parse Command Line
-void Configure::parseCommandLine(int argc, char **argv)
-{
-	int i;
-	bool keyFound = false;
-	std::string key, value;
-
-	for (i = 1; i < argc; i++)
-	{
-		if (argv[i][0] == '-' && argv[i][1] != '\0')
-		{
-			keyFound = true;
-			key = &argv[i][1];
-			continue;
-		}
-		if (keyFound)
-		{
-			value = argv[i];
-			set(key,value);
-			keyFound = false;
-		}
-	}
-	return;
+void Configure::parseCommandLine(int argc, char **argv) {
+  int i;
+  bool keyFound = false;
+  std::string key, value;
+
+  for (i = 1; i < argc; i++) {
+    if (argv[i][0] == '-' && argv[i][1] != '\0') {
+      keyFound = true;
+      key = &argv[i][1];
+      continue;
+    }
+    if (keyFound) {
+      value = argv[i];
+      set(key, value);
+      keyFound = false;
+    }
+  }
+  return;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 6f64ff3..ab300ba 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -17,16 +17,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "Connection.h"
+#include <sys/time.h>
+#include <time.h>
 #include <vector>
 #include <queue>
+#include <memory>
+#include <string>
 #include <map>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
 #include <chrono>
 #include <thread>
 #include <iostream>
-
 #include "core/FlowFile.h"
 #include "Connection.h"
 #include "core/Processor.h"
@@ -95,7 +97,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) {
 
   if (!flow->isStored()) {
     // Save to the flowfile repo
-    FlowFileRecord event(flow_repository_,flow,this->uuidStr_);
+    FlowFileRecord event(flow_repository_, flow, this->uuidStr_);
     if (event.Serialize()) {
       flow->setStoredToRepository(true);
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 0484139..cbb60ea 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -17,10 +17,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "EventDrivenSchedulingAgent.h"
 #include <chrono>
+#include <memory>
 #include <thread>
 #include <iostream>
-#include "EventDrivenSchedulingAgent.h"
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSessionFactory.h"
@@ -31,7 +32,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-
 void EventDrivenSchedulingAgent::run(
     std::shared_ptr<core::Processor> processor,
     core::ProcessContext *processContext,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 50fc0e2..69f482f 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -17,17 +17,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "FlowControlProtocol.h"
 #include <sys/time.h>
 #include <stdio.h>
 #include <time.h>
+#include <netinet/tcp.h>
 #include <chrono>
 #include <thread>
+#include <string>
 #include <random>
-#include <netinet/tcp.h>
 #include <iostream>
 #include "FlowController.h"
-#include "FlowControlProtocol.h"
-#include "core/core.h"
+#include "core/Core.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -45,7 +46,7 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
   int hh_errno;
   gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
 #endif
-  memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+  memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
   sock = socket(AF_INET, SOCK_STREAM, 0);
   if (sock < 0) {
     logger_->log_error("Could not create socket to hostName %s", host);
@@ -56,30 +57,26 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
   int opt = 1;
   bool nagle_off = true;
 
-  if (nagle_off)
-  {
-    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
-    {
+  if (nagle_off) {
+    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, reinterpret_cast<void*>(&opt), sizeof(opt)) < 0) {
       logger_->log_error("setsockopt() TCP_NODELAY failed");
       close(sock);
       return 0;
     }
     if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-            (char *)&opt, sizeof(opt)) < 0)
-    {
-      logger_->log_error("setsockopt() SO_REUSEADDR failed");
-      close(sock);
-      return 0;
-    }
-  }
+            reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
+          logger_->log_error("setsockopt() SO_REUSEADDR failed");
+          close(sock);
+          return 0;
+        }
+      }
 
-  int sndsize = 256*1024;
-  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
-  {
-    logger_->log_error("setsockopt() SO_SNDBUF failed");
-    close(sock);
-    return 0;
-  }
+      int sndsize = 256*1024;
+      if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) {
+        logger_->log_error("setsockopt() SO_SNDBUF failed");
+        close(sock);
+        return 0;
+      }
 #endif
 
   struct sockaddr_in sa;
@@ -123,7 +120,7 @@ int FlowControlProtocol::sendData(uint8_t *buf, int buflen) {
 
   while (bytes < buflen) {
     ret = send(_socket, buf + bytes, buflen - bytes, 0);
-    //check for errors
+    // check for errors
     if (ret == -1) {
       return ret;
     }
@@ -232,8 +229,9 @@ void FlowControlProtocol::run(FlowControlProtocol *protocol) {
     if (!protocol->_registered) {
       // if it is not register yet
       protocol->sendRegisterReq();
-    } else
+    } else {
       protocol->sendReportReq();
+    }
   }
   return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 433387a..1a163ea 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -17,23 +17,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
+#include "FlowController.h"
 #include <sys/time.h>
 #include <time.h>
-#include <chrono>
-#include <thread>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <chrono>
 #include <future>
-#include "FlowController.h"
+#include <thread>
+#include <utility>
+#include <memory>
+#include <string>
 #include "core/ProcessContext.h"
 #include "core/ProcessGroup.h"
 #include "utils/StringUtils.h"
-#include "core/core.h"
+#include "core/Core.h"
 #include "core/repository/FlowFileRepository.h"
 
 namespace org {
@@ -105,7 +108,6 @@ FlowController::FlowController(
 
     initializePaths(adjustedFilename);
   }
-
 }
 
 void FlowController::initializePaths(const std::string &adjustedFilename) {
@@ -125,12 +127,12 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
   // Create the content repo directory if needed
   struct stat contentDirStat;
 
-  if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat)
+  if (stat(ResourceClaim::default_directory_path, &contentDirStat)
       != -1&& S_ISDIR(contentDirStat.st_mode)) {
-    path = realpath(ResourceClaim::default_directory_path.c_str(), full_path);
+    path = realpath(ResourceClaim::default_directory_path, full_path);
     logger_->log_info("FlowController content directory %s", full_path);
   } else {
-    if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) {
+    if (mkdir(ResourceClaim::default_directory_path, 0777) == -1) {
       logger_->log_error("FlowController content directory creation failed");
       exit(1);
     }
@@ -144,7 +146,6 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
         full_path);
     exit(1);
   }
-
 }
 
 FlowController::~FlowController() {
@@ -154,7 +155,6 @@ FlowController::~FlowController() {
     delete protocol_;
   flow_file_repo_ = nullptr;
   provenance_repo_ = nullptr;
-
 }
 
 void FlowController::stop(bool force) {
@@ -173,7 +173,6 @@ void FlowController::stop(bool force) {
     if (this->root_)
       this->root_->stopProcessing(&this->_timerScheduler,
                                   &this->_eventScheduler);
-
   }
 }
 
@@ -200,7 +199,6 @@ void FlowController::waitUnload(const uint64_t timeToWaitMs) {
     if (std::future_status::ready == unload_task.wait_until(wait_time)) {
       running_ = false;
     }
-
   }
 }
 
@@ -278,7 +276,6 @@ bool FlowController::start() {
         "Can not start Flow Controller because it has not been initialized");
     return false;
   } else {
-
     if (!running_) {
       logger_->log_info("Starting Flow Controller");
       this->_timerScheduler.start();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 562a685..de682b0 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -17,16 +17,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "FlowFileRecord.h"
+#include <sys/time.h>
+#include <time.h>
+#include <cstdio>
 #include <vector>
 #include <queue>
 #include <map>
-#include <sys/time.h>
-#include <time.h>
+#include <memory>
+#include <string>
 #include <iostream>
 #include <fstream>
-#include <cstdio>
-
-#include "FlowFileRecord.h"
 #include "core/logging/Logger.h"
 #include "core/Relationship.h"
 #include "core/Repository.h"
@@ -73,18 +74,18 @@ FlowFileRecord::FlowFileRecord(
     : FlowFile(),
       snapshot_(""),
       flow_repository_(flow_repository) {
-	entry_date_ = event->getEntryDate();
-	lineage_start_date_ = event->getlineageStartDate();
-	lineage_Identifiers_ = event->getlineageIdentifiers();
-	uuid_str_ = event->getUUIDStr();
-	attributes_ = event->getAttributes();
-	size_ = event->getSize();
-	offset_ = event->getOffset();
-	event->getUUID(uuid_);
-	uuid_connection_ = uuidConnection;
-	if (event->getResourceClaim()) {
-	  content_full_fath_ = event->getResourceClaim()->getContentFullPath();
-	}
+  entry_date_ = event->getEntryDate();
+  lineage_start_date_ = event->getlineageStartDate();
+  lineage_Identifiers_ = event->getlineageIdentifiers();
+  uuid_str_ = event->getUUIDStr();
+  attributes_ = event->getAttributes();
+  size_ = event->getSize();
+  offset_ = event->getOffset();
+  event->getUUID(uuid_);
+  uuid_connection_ = uuidConnection;
+  if (event->getResourceClaim()) {
+    content_full_fath_ = event->getResourceClaim()->getContentFullPath();
+  }
 }
 
 FlowFileRecord::FlowFileRecord(
@@ -94,7 +95,6 @@ FlowFileRecord::FlowFileRecord(
       uuid_connection_(""),
       snapshot_(""),
       flow_repository_(flow_repository) {
-
 }
 
 FlowFileRecord::~FlowFileRecord() {
@@ -175,10 +175,10 @@ bool FlowFileRecord::DeSerialize(std::string key) {
     logger_->log_error("NiFi FlowFile Store event %s can not found",
                        key.c_str());
     return false;
-  } else
+  } else {
     logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(),
                        value.length());
-
+  }
   io::DataStream stream((const uint8_t*) value.data(), value.length());
 
   ret = DeSerialize(stream);
@@ -197,14 +197,12 @@ bool FlowFileRecord::DeSerialize(std::string key) {
 }
 
 bool FlowFileRecord::Serialize() {
-
   io::DataStream outStream;
 
   int ret;
 
   ret = write(this->event_time_, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
@@ -215,57 +213,48 @@ bool FlowFileRecord::Serialize() {
 
   ret = write(this->lineage_start_date_, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
   ret = writeUTF(this->uuid_str_, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
   ret = writeUTF(this->uuid_connection_, &outStream);
   if (ret <= 0) {
-
     return false;
   }
   // write flow attributes
   uint32_t numAttributes = this->attributes_.size();
   ret = write(numAttributes, &outStream);
   if (ret != 4) {
-
     return false;
   }
 
   for (auto itAttribute : attributes_) {
     ret = writeUTF(itAttribute.first, &outStream, true);
     if (ret <= 0) {
-
       return false;
     }
     ret = writeUTF(itAttribute.second, &outStream, true);
     if (ret <= 0) {
-
       return false;
     }
   }
 
   ret = writeUTF(this->content_full_fath_, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
   ret = write(this->size_, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
   ret = write(this->offset_, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
@@ -281,13 +270,10 @@ bool FlowFileRecord::Serialize() {
     return false;
   }
 
-  // cleanup
-
   return true;
 }
 
 bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
-
   int ret;
 
   io::DataStream outStream(buffer, bufferSize);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 9790256..33f0cb2 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -17,18 +17,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "RemoteProcessorGroupPort.h"
+#include <sys/time.h>
+#include <string.h>
+#include <time.h>
 #include <vector>
 #include <queue>
 #include <map>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
+#include <string>
+#include <utility>
+#include <memory>
 #include <sstream>
-#include <string.h>
 #include <iostream>
-
-#include "RemoteProcessorGroupPort.h"
-
 #include "../include/io/StreamFactory.h"
 #include "io/ClientSocket.h"
 #include "utils/TimeUtil.h"
@@ -48,13 +49,13 @@ core::Property RemoteProcessorGroupPort::hostName("Host Name",
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
 core::Relationship RemoteProcessorGroupPort::relation;
 
-
 std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol() {
   std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
   if (available_protocols_.empty())
     return nullptr;
 
-  std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(available_protocols_.top());
+  std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(
+      available_protocols_.top());
   available_protocols_.pop();
   return std::move(return_pointer);
 }
@@ -66,7 +67,6 @@ void RemoteProcessorGroupPort::returnProtocol(
 }
 
 void RemoteProcessorGroupPort::initialize() {
-
   // Set the supported properties
   std::set<core::Property> properties;
   properties.insert(hostName);
@@ -76,7 +76,6 @@ void RemoteProcessorGroupPort::initialize() {
   std::set<core::Relationship> relationships;
   relationships.insert(relation);
   setSupportedRelationships(relationships);
-
 }
 
 void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
@@ -90,7 +89,6 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
 
   // Peer Connection
   if (protocol_ == nullptr) {
-
     protocol_ = std::unique_ptr<Site2SiteClientProtocol>(
         new Site2SiteClientProtocol(0));
     protocol_->setPortId(protocol_uuid_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index 826ca1d..cbe7712 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -20,6 +20,7 @@
 #include <vector>
 #include <queue>
 #include <map>
+#include <string>
 
 #include "ResourceClaim.h"
 
@@ -30,7 +31,7 @@ namespace minifi {
 
 std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
 
-std::string ResourceClaim::default_directory_path = DEFAULT_CONTENT_DIRECTORY;
+char *ResourceClaim::default_directory_path = const_cast<char*>(DEFAULT_CONTENT_DIRECTORY);
 
 ResourceClaim::ResourceClaim(const std::string contentDirectory)
     : _id(_localResourceClaimNumber.load()),

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 8cb88e0..d69ba00 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -17,11 +17,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "SchedulingAgent.h"
 #include <chrono>
 #include <thread>
+#include <memory>
 #include <iostream>
 #include "Exception.h"
-#include "SchedulingAgent.h"
 #include "core/Processor.h"
 
 namespace org {
@@ -29,8 +30,7 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-bool SchedulingAgent::hasWorkToDo(
-    std::shared_ptr<core::Processor> processor) {
+bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
   // Whether it has work to do
   if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections()
       || processor->flowFilesQueued())
@@ -44,10 +44,9 @@ bool SchedulingAgent::hasTooMuchOutGoing(
   return processor->flowFilesOutGoingFull();
 }
 
-bool SchedulingAgent::onTrigger(
-    std::shared_ptr<core::Processor> processor,
-    core::ProcessContext *processContext,
-    core::ProcessSessionFactory *sessionFactory) {
+bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor,
+                                core::ProcessContext *processContext,
+                                core::ProcessSessionFactory *sessionFactory) {
   if (processor->isYield())
     return false;
 
@@ -62,8 +61,6 @@ bool SchedulingAgent::onTrigger(
     // need to apply backpressure
     return true;
 
-  //TODO runDuration
-
   processor->incrementActiveTasks();
   try {
     processor->onTrigger(processContext, sessionFactory);
@@ -85,7 +82,6 @@ bool SchedulingAgent::onTrigger(
   return false;
 }
 
-
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
index e0265bb..52a0a02 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -21,9 +21,11 @@
 #include <stdio.h>
 #include <time.h>
 #include <chrono>
+#include <map>
+#include <string>
+#include <memory>
 #include <thread>
 #include <random>
-#include <netinet/tcp.h>
 #include <iostream>
 #include "io/CRCStream.h"
 #include "Site2SitePeer.h"
@@ -356,7 +358,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType &type) {
   if (ret <= 0)
     return ret;
 
-  for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) {
+  for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
     if (RequestTypeStr[i] == requestTypeStr) {
       type = (RequestType) i;
       return ret;
@@ -426,12 +428,14 @@ int Site2SiteClientProtocol::writeRespond(RespondCode code,
 
   if (resCode->hasDescription) {
     ret = peer_->writeUTF(message);
-    if (ret > 0)
+    if (ret > 0) {
       return (3 + ret);
-    else
+    } else {
       return ret;
-  } else
+    }
+  } else {
     return 3;
+  }
 }
 
 bool Site2SiteClientProtocol::negotiateCodec() {
@@ -518,7 +522,8 @@ Transaction* Site2SiteClientProtocol::createTransaction(
       return NULL;
     }
 
-    org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get());
+    org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(
+        peer_.get());
     switch (code) {
       case MORE_DATA:
         dataAvailable = true;
@@ -553,7 +558,8 @@ Transaction* Site2SiteClientProtocol::createTransaction(
       // tearDown();
       return NULL;
     } else {
-      org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get());
+      org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(
+          peer_.get());
       transaction = new Transaction(direction, crcstream);
       _transactionMap[transaction->getUUIDStr()] = transaction;
       transactionID = transaction->getUUIDStr();
@@ -680,9 +686,10 @@ bool Site2SiteClientProtocol::receive(std::string transactionID,
   return true;
 }
 
-bool Site2SiteClientProtocol::send(
-    std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile,
-    core::ProcessSession *session) {
+bool Site2SiteClientProtocol::send(std::string transactionID,
+                                   DataPacket *packet,
+                                   std::shared_ptr<FlowFileRecord> flowFile,
+                                   core::ProcessSession *session) {
   int ret;
   Transaction *transaction = NULL;
 
@@ -772,9 +779,8 @@ bool Site2SiteClientProtocol::send(
   return true;
 }
 
-void Site2SiteClientProtocol::receiveFlowFiles(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
+void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
+                                               core::ProcessSession *session) {
   uint64_t bytes = 0;
   int transfers = 0;
   Transaction *transaction = NULL;
@@ -817,7 +823,9 @@ void Site2SiteClientProtocol::receiveFlowFiles(
         // transaction done
         break;
       }
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());;
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+          FlowFileRecord>(session->create());
+
       if (!flowFile) {
         throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
         return;
@@ -930,7 +938,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
     // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
     // Critical Section involved in this transaction so that rather than the Critical Section being the
     // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
-    long crcValue = transaction->getCRC();
+    int64_t crcValue = transaction->getCRC();
     std::string crc = std::to_string(crcValue);
     logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s",
                       transaction->getCRC(), transactionID.c_str());
@@ -978,7 +986,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
           "Site2Site transaction %s peer confirm transaction with CRC %s",
           transactionID.c_str(), message.c_str());
       if (this->_currentVersion > 3) {
-        long crcValue = transaction->getCRC();
+        int64_t crcValue = transaction->getCRC();
         std::string crc = std::to_string(crcValue);
         if (message == crc) {
           logger_->log_info("Site2Site transaction %s CRC matched",
@@ -1113,9 +1121,9 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) {
       logger_->log_info("Site2Site transaction %s send finished",
                         transactionID.c_str());
       ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
-      if (ret <= 0)
+      if (ret <= 0) {
         return false;
-      else {
+      } else {
         transaction->_state = TRANSACTION_COMPLETED;
         return true;
       }
@@ -1143,10 +1151,11 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) {
   }
 }
 
-void Site2SiteClientProtocol::transferFlowFiles(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get());;
+void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
+                                                core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flow =
+      std::static_pointer_cast<FlowFileRecord>(session->get());
+
   Transaction *transaction = NULL;
 
   if (!flow)
@@ -1201,7 +1210,8 @@ void Site2SiteClientProtocol::transferFlowFiles(
       if (transferNanos > _batchSendNanos)
         break;
 
-      flow = std::static_pointer_cast<FlowFileRecord>(session->get());;
+      flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+
       if (!flow) {
         continueTransaction = false;
       }
@@ -1240,7 +1250,6 @@ void Site2SiteClientProtocol::transferFlowFiles(
   return;
 }
 
-
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp
index 64732ac..551d466 100644
--- a/libminifi/src/Site2SitePeer.cpp
+++ b/libminifi/src/Site2SitePeer.cpp
@@ -17,6 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "Site2SitePeer.h"
 #include <sys/time.h>
 #include <stdio.h>
 #include <time.h>
@@ -24,43 +25,39 @@
 #include <thread>
 #include <random>
 #include <memory>
-#include <netinet/tcp.h>
 #include <iostream>
 #include "io/ClientSocket.h"
 #include "io/validation.h"
-#include "Site2SitePeer.h"
 #include "FlowController.h"
 
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 
 bool Site2SitePeer::Open() {
+  if (IsNullOrEmpty(host_))
+    return false;
 
-	if (IsNullOrEmpty (host_))
-		return false;
+  if (stream_->initialize() < 0)
+    return false;
 
-	if (stream_->initialize() < 0)
-		return false;
+  uint16_t data_size = sizeof MAGIC_BYTES;
 
-	uint16_t data_size = sizeof MAGIC_BYTES;
+  if (stream_->writeData(
+      reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size)
+      != data_size) {
+    return false;
+  }
 
-	if (stream_->writeData((uint8_t *) MAGIC_BYTES, data_size) != data_size) {
-		return false;
-	}
-
-	return true;
+  return true;
 }
 
 void Site2SitePeer::Close() {
-    if (stream_ != nullptr)
-	stream_->closeStream();
+  if (stream_ != nullptr)
+    stream_->closeStream();
 }
 
-
-
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 6c04281..65e7531 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -17,11 +17,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "ThreadedSchedulingAgent.h"
+#include <memory>
+#include <string>
+#include <vector>
+#include <map>
 #include <thread>
 #include <iostream>
-
-#include "ThreadedSchedulingAgent.h"
-
 #include "core/Connectable.h"
 #include "core/ProcessorNode.h"
 #include "core/ProcessContext.h"
@@ -35,7 +37,7 @@ namespace minifi {
 
 void ThreadedSchedulingAgent::schedule(
     std::shared_ptr<core::Processor> processor) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   _administrativeYieldDuration = 0;
   std::string yieldValue;
@@ -43,10 +45,11 @@ void ThreadedSchedulingAgent::schedule(
   if (configure_->get(Configure::nifi_administrative_yield_duration,
                       yieldValue)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(
-        yieldValue, _administrativeYieldDuration, unit)
-        && core::Property::ConvertTimeUnitToMS(
-            _administrativeYieldDuration, unit, _administrativeYieldDuration)) {
+    if (core::Property::StringToTime(yieldValue, _administrativeYieldDuration,
+                                     unit)
+        && core::Property::ConvertTimeUnitToMS(_administrativeYieldDuration,
+                                               unit,
+                                               _administrativeYieldDuration)) {
       logger_->log_debug("nifi_administrative_yield_duration: [%d] ms",
                          _administrativeYieldDuration);
     }
@@ -55,10 +58,9 @@ void ThreadedSchedulingAgent::schedule(
   _boredYieldDuration = 0;
   if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(
-        yieldValue, _boredYieldDuration, unit)
-        && core::Property::ConvertTimeUnitToMS(
-            _boredYieldDuration, unit, _boredYieldDuration)) {
+    if (core::Property::StringToTime(yieldValue, _boredYieldDuration, unit)
+        && core::Property::ConvertTimeUnitToMS(_boredYieldDuration, unit,
+                                               _boredYieldDuration)) {
       logger_->log_debug("nifi_bored_yield_duration: [%d] ms",
                          _boredYieldDuration);
     }
@@ -80,11 +82,10 @@ void ThreadedSchedulingAgent::schedule(
   }
 
   core::ProcessorNode processor_node(processor);
-  auto processContext = std::make_shared
-      < core::ProcessContext > (processor_node,repo_);
-  auto sessionFactory = std::make_shared
-      < core::ProcessSessionFactory
-      > (processContext.get());
+  auto processContext = std::make_shared<core::ProcessContext>(processor_node,
+                                                               repo_);
+  auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(
+      processContext.get());
 
   processor->onSchedule(processContext.get(), sessionFactory.get());
 
@@ -105,9 +106,9 @@ void ThreadedSchedulingAgent::schedule(
   return;
 }
 
-void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
-  std::lock_guard < std::mutex > lock(mutex_);
-
+void ThreadedSchedulingAgent::unschedule(
+    std::shared_ptr<core::Processor> processor) {
+  std::lock_guard<std::mutex> lock(mutex_);
   logger_->log_info("Shutting down threads for processor %s/%s",
                     processor->getName().c_str(),
                     processor->getUUIDStr().c_str());
@@ -139,7 +140,6 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces
   processor->clearActiveTask();
 }
 
-
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 3895e81..8d10658 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -17,10 +17,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "TimerDrivenSchedulingAgent.h"
 #include <chrono>
 #include <thread>
+#include <memory>
 #include <iostream>
-#include "TimerDrivenSchedulingAgent.h"
 #include "core/Property.h"
 
 namespace org {
@@ -35,7 +36,6 @@ void TimerDrivenSchedulingAgent::run(
   while (this->running_) {
     bool shouldYield = this->onTrigger(processor, processContext,
                                        sessionFactory);
-
     if (processor->isYield()) {
       // Honor the yield
       std::this_thread::sleep_for(

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 67a43dd..fa5ff7d 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -17,7 +17,10 @@
  */
 
 #include "core/ConfigurableComponent.h"
-
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
 #include "core/Property.h"
 #include "core/logging/Logger.h"
 
@@ -27,19 +30,17 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-ConfigurableComponent::ConfigurableComponent(std::shared_ptr<logging::Logger> logger)
+ConfigurableComponent::ConfigurableComponent(
+    std::shared_ptr<logging::Logger> logger)
     : my_logger_(logger) {
-
 }
 
 ConfigurableComponent::ConfigurableComponent(
     const ConfigurableComponent &&other)
     : properties_(std::move(other.properties_)),
       my_logger_(std::move(other.my_logger_)) {
-
 }
 ConfigurableComponent::~ConfigurableComponent() {
-
 }
 
 /**
@@ -58,7 +59,7 @@ bool ConfigurableComponent::getProperty(const std::string name,
     Property item = it->second;
     value = item.getValue();
     my_logger_->log_info("Processor %s property name %s value %s", name.c_str(),
-                      item.getName().c_str(), value.c_str());
+                         item.getName().c_str(), value.c_str());
     return true;
   } else {
     return false;
@@ -80,7 +81,7 @@ bool ConfigurableComponent::setProperty(const std::string name,
     item.setValue(value);
     properties_[item.getName()] = item;
     my_logger_->log_info("Component %s property name %s value %s", name.c_str(),
-                      item.getName().c_str(), value.c_str());
+                         item.getName().c_str(), value.c_str());
     return true;
   } else {
     return false;
@@ -102,7 +103,7 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
     item.setValue(value);
     properties_[item.getName()] = item;
     my_logger_->log_info("property name %s value %s", prop.getName().c_str(),
-                      item.getName().c_str(), value.c_str());
+                         item.getName().c_str(), value.c_str());
     return true;
   } else {
     Property newProp(prop);
@@ -110,7 +111,6 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
     properties_.insert(
         std::pair<std::string, Property>(prop.getName(), newProp));
     return true;
-
   }
   return false;
 }
@@ -132,11 +132,10 @@ bool ConfigurableComponent::setSupportedProperties(
   for (auto item : properties) {
     properties_[item.getName()] = item;
   }
-
   return true;
 }
 
-} /* namespace components */
+} /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index 52bde69..e009b1b 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -15,10 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
+#include <type_traits>
+#include <utility>
+#include <string>
+#include <memory>
+#include <algorithm>
+#include <set>
 #include "core/ConfigurationFactory.h"
 #include "core/FlowConfiguration.h"
-#include  <type_traits>
+
 #ifdef YAML_SUPPORT
 #include "core/yaml/YamlConfiguration.h"
 #endif
@@ -29,50 +34,49 @@ namespace nifi {
 namespace minifi {
 namespace core {
 #ifndef YAML_SUPPORT
-  class YamlConfiguration;
+class YamlConfiguration;
 #endif
 
-  std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
-      std::shared_ptr<core::Repository> repo,
-      std::shared_ptr<core::Repository> flow_file_repo,
-      const std::string configuration_class_name, const std::string path,
-      bool fail_safe) {
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
+    std::shared_ptr<core::Repository> repo,
+    std::shared_ptr<core::Repository> flow_file_repo,
+    const std::string configuration_class_name, const std::string path,
+    bool fail_safe) {
 
-    std::string class_name_lc = configuration_class_name;
-    std::transform(class_name_lc.begin(), class_name_lc.end(),
-                   class_name_lc.begin(), ::tolower);
-    try {
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(),
+                 class_name_lc.begin(), ::tolower);
+  try {
+    if (class_name_lc == "flowconfiguration") {
+      // load the base configuration.
+      return std::unique_ptr<core::FlowConfiguration>(
+          new core::FlowConfiguration(repo, flow_file_repo, path));
 
-      if (class_name_lc == "flowconfiguration") {
-	// load the base configuration.
-        return std::unique_ptr<core::FlowConfiguration>(
-            new core::FlowConfiguration(repo, flow_file_repo, path));
-	
-      } else if (class_name_lc == "yamlconfiguration") {
-	// only load if the class is defined.
-        return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, path));
-            
+    } else if (class_name_lc == "yamlconfiguration") {
+      // only load if the class is defined.
+      return std::unique_ptr<core::FlowConfiguration>(
+          instantiate<core::YamlConfiguration>(repo, flow_file_repo, path));
 
-      } else {
-        if (fail_safe) {
-          return std::unique_ptr<core::FlowConfiguration>(
-              new core::FlowConfiguration(repo, flow_file_repo, path));
-        } else {
-          throw std::runtime_error(
-              "Support for the provided configuration class could not be found");
-        }
-      }
-    } catch (const std::runtime_error &r) {
+    } else {
       if (fail_safe) {
         return std::unique_ptr<core::FlowConfiguration>(
             new core::FlowConfiguration(repo, flow_file_repo, path));
+      } else {
+        throw std::runtime_error(
+            "Support for the provided configuration class could not be found");
       }
     }
-
-    throw std::runtime_error(
-        "Support for the provided configuration class could not be found");
+  } catch (const std::runtime_error &r) {
+    if (fail_safe) {
+      return std::unique_ptr<core::FlowConfiguration>(
+          new core::FlowConfiguration(repo, flow_file_repo, path));
+    }
   }
 
+  throw std::runtime_error(
+      "Support for the provided configuration class could not be found");
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index ac61568..e234a06 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -1,13 +1,26 @@
-/*
- * Connectable.cpp
+/**
  *
- *  Created on: Feb 27, 2017
- *      Author: mparisi
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
-#include "../../include/core/Connectable.h"
-
+#include "core/Connectable.h"
 #include <uuid/uuid.h>
+#include <utility>
+#include <memory>
+#include <string>
+#include <set>
 #include "core/logging/Logger.h"
 #include "core/Relationship.h"
 
@@ -20,7 +33,6 @@ namespace core {
 Connectable::Connectable(std::string name, uuid_t uuid)
     : CoreComponent(name, uuid),
       max_concurrent_tasks_(1) {
-
 }
 
 Connectable::Connectable(const Connectable &&other)
@@ -31,7 +43,6 @@ Connectable::Connectable(const Connectable &&other)
 }
 
 Connectable::~Connectable() {
-
 }
 
 bool Connectable::setSupportedRelationships(
@@ -51,7 +62,6 @@ bool Connectable::setSupportedRelationships(
     logger_->log_info("Processor %s supported relationship name %s",
                       name_.c_str(), item.getName().c_str());
   }
-
   return true;
 }
 
@@ -89,7 +99,6 @@ bool Connectable::setAutoTerminatedRelationships(
     logger_->log_info("Processor %s auto terminated relationship name %s",
                       name_.c_str(), item.getName().c_str());
   }
-
   return true;
 }
 
@@ -118,7 +127,6 @@ void Connectable::waitForWork(uint64_t timeoutMs) {
     work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
                              [&] {return has_work_.load();});
   }
-
 }
 
 void Connectable::notifyWork() {
@@ -134,7 +142,6 @@ void Connectable::notifyWork() {
       work_condition_.notify_one();
     }
   }
-
 }
 
 std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(
@@ -167,7 +174,7 @@ std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() {
   return ret;
 }
 
-} /* namespace components */
+} /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Core.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index 39969f6..cc6445d 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -1,12 +1,23 @@
-/*
- * Core.cpp
+/**
  *
- *  Created on: Mar 10, 2017
- *      Author: mparisi
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
-#include "core/core.h"
-
+#include "core/Core.h"
+#include <string>
 namespace org {
 namespace apache {
 namespace nifi {
@@ -38,14 +49,13 @@ unsigned const char *CoreComponent::getUUID() {
 // Set Processor Name
 void CoreComponent::setName(const std::string name) {
   name_ = name;
-
 }
 // Get Process Name
 std::string CoreComponent::getName() {
   return name_;
 }
-}
-}
-}
-}
-}
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index f2dda0d..68aaf5c 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -17,6 +17,8 @@
  */
 
 #include "core/FlowConfiguration.h"
+#include <memory>
+#include <string>
 
 namespace org {
 namespace apache {
@@ -25,7 +27,6 @@ namespace minifi {
 namespace core {
 
 FlowConfiguration::~FlowConfiguration() {
-
 }
 
 std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
@@ -40,11 +41,6 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
     processor = std::make_shared<
         org::apache::nifi::minifi::processors::LogAttribute>(name, uuid);
   } else if (name
-      == org::apache::nifi::minifi::processors::RealTimeDataCollector::ProcessorName) {
-    processor = std::make_shared<
-        org::apache::nifi::minifi::processors::RealTimeDataCollector>(name,
-                                                                      uuid);
-  } else if (name
       == org::apache::nifi::minifi::processors::GetFile::ProcessorName) {
     processor =
         std::make_shared<org::apache::nifi::minifi::processors::GetFile>(name,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
new file mode 100644
index 0000000..b3ab741
--- /dev/null
+++ b/libminifi/src/core/FlowFile.cpp
@@ -0,0 +1,223 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/FlowFile.h"
+#include <memory>
+#include <string>
+#include <set>
+#include "core/logging/Logger.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+FlowFile::FlowFile()
+    : size_(0),
+      id_(0),
+      stored(false),
+      offset_(0),
+      last_queue_date_(0),
+      penaltyExpiration_ms_(0),
+      event_time_(0),
+      claim_(nullptr),
+      marked_delete_(false),
+      connection_(nullptr),
+      original_connection_() {
+  entry_date_ = getTimeMillis();
+  lineage_start_date_ = entry_date_;
+
+  char uuidStr[37];
+
+  // Generate the global UUID for the flow record
+  uuid_generate(uuid_);
+
+  uuid_unparse_lower(uuid_, uuidStr);
+  uuid_str_ = uuidStr;
+
+  logger_ = logging::Logger::getLogger();
+}
+
+FlowFile::~FlowFile() {
+}
+
+FlowFile& FlowFile::operator=(const FlowFile& other) {
+  uuid_copy(uuid_, other.uuid_);
+  stored = other.stored;
+  marked_delete_ = other.marked_delete_;
+  entry_date_ = other.entry_date_;
+  lineage_start_date_ = other.lineage_start_date_;
+  lineage_Identifiers_ = other.lineage_Identifiers_;
+  last_queue_date_ = other.last_queue_date_;
+  size_ = other.size_;
+  penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
+  attributes_ = other.attributes_;
+  claim_ = other.claim_;
+  if (claim_ != nullptr)
+    this->claim_->increaseFlowFileRecordOwnedCount();
+  uuid_str_ = other.uuid_str_;
+  connection_ = other.connection_;
+  original_connection_ = other.original_connection_;
+  return *this;
+}
+
+/**
+ * Returns whether or not this flow file record
+ * is marked as deleted.
+ * @return marked deleted
+ */
+bool FlowFile::isDeleted() {
+  return marked_delete_;
+}
+
+/**
+ * Sets whether to mark this flow file record
+ * as deleted
+ * @param deleted deleted flag
+ */
+void FlowFile::setDeleted(const bool deleted) {
+  marked_delete_ = deleted;
+}
+
+std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
+  return claim_;
+}
+
+void FlowFile::clearResourceClaim() {
+  claim_ = nullptr;
+}
+void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) {
+  claim_ = claim;
+}
+
+// ! Get Entry Date
+uint64_t FlowFile::getEntryDate() {
+  return entry_date_;
+}
+uint64_t FlowFile::getEventTime() {
+  return event_time_;
+}
+// ! Get Lineage Start Date
+uint64_t FlowFile::getlineageStartDate() {
+  return lineage_start_date_;
+}
+
+std::set<std::string> &FlowFile::getlineageIdentifiers() {
+  return lineage_Identifiers_;
+}
+
+bool FlowFile::getAttribute(std::string key, std::string &value) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    value = it->second;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+// Get Size
+uint64_t FlowFile::getSize() {
+  return size_;
+}
+// ! Get Offset
+uint64_t FlowFile::getOffset() {
+  return offset_;
+}
+
+bool FlowFile::removeAttribute(const std::string key) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    attributes_.erase(key);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool FlowFile::updateAttribute(const std::string key, const std::string value) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    attributes_[key] = value;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    // attribute already there in the map
+    return false;
+  } else {
+    attributes_[key] = value;
+    return true;
+  }
+}
+
+void FlowFile::setLineageStartDate(const uint64_t date) {
+  lineage_start_date_ = date;
+}
+
+/**
+ * Sets the original connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setOriginalConnection(
+    std::shared_ptr<core::Connectable> &connection) {
+  original_connection_ = connection;
+}
+
+/**
+ * Sets the connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) {
+  connection_ = connection;
+}
+
+/**
+ * Sets the connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) {
+  connection_ = connection;
+}
+
+/**
+ * Returns the connection referenced by this record.
+ * @return shared connection pointer.
+ */
+std::shared_ptr<core::Connectable> FlowFile::getConnection() {
+  return connection_;
+}
+
+/**
+ * Returns the original connection referenced by this record.
+ * @return shared original connection pointer.
+ */
+std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() {
+  return original_connection_;
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index baa3ebd..1a6e729 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -17,16 +17,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "core/ProcessGroup.h"
+#include <sys/time.h>
+#include <time.h>
 #include <vector>
+#include <memory>
+#include <string>
 #include <queue>
 #include <map>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
 #include <chrono>
 #include <thread>
-
-#include "core/ProcessGroup.h"
 #include "core/Processor.h"
 
 namespace org {
@@ -63,7 +64,6 @@ ProcessGroup::~ProcessGroup() {
     ProcessGroup *processGroup(*it);
     delete processGroup;
   }
-
 }
 
 bool ProcessGroup::isRootProcessGroup() {
@@ -176,16 +176,12 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
 }
 
 std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
-
   std::shared_ptr<Processor> ret = NULL;
-  // std::lock_guard<std::mutex> lock(mutex_);
-
   for (auto processor : processors_) {
     logger_->log_info("find processor %s", processor->getName().c_str());
     uuid_t processorUUID;
 
     if (processor->getUUID(processorUUID)) {
-
       char uuid_str[37];  // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
       uuid_unparse_lower(processorUUID, uuid_str);
       std::string processorUUIDstr = uuid_str;
@@ -195,37 +191,31 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
         return processor;
       }
     }
-
   }
   for (auto processGroup : child_process_groups_) {
-
     logger_->log_info("find processor child %s",
                       processGroup->getName().c_str());
     std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
     if (processor)
       return processor;
   }
-
   return ret;
 }
 
 std::shared_ptr<Processor> ProcessGroup::findProcessor(
     const std::string &processorName) {
   std::shared_ptr<Processor> ret = NULL;
-
   for (auto processor : processors_) {
     logger_->log_debug("Current processor is %s", processor->getName().c_str());
     if (processor->getName() == processorName)
       return processor;
   }
-
   for (auto processGroup : child_process_groups_) {
     std::shared_ptr<Processor> processor = processGroup->findProcessor(
         processorName);
     if (processor)
       return processor;
   }
-
   return ret;
 }
 
@@ -233,18 +223,15 @@ void ProcessGroup::updatePropertyValue(std::string processorName,
                                        std::string propertyName,
                                        std::string propertyValue) {
   std::lock_guard<std::mutex> lock(mutex_);
-
   for (auto processor : processors_) {
     if (processor->getName() == processorName) {
       processor->setProperty(propertyName, propertyValue);
     }
   }
-
   for (auto processGroup : child_process_groups_) {
     processGroup->updatePropertyValue(processorName, propertyName,
                                       propertyValue);
   }
-
   return;
 }
 
@@ -253,7 +240,6 @@ void ProcessGroup::getConnections(
   for (auto connection : connections_) {
     connectionMap[connection->getUUIDStr()] = connection;
   }
-
   for (auto processGroup : child_process_groups_) {
     processGroup->getConnections(connectionMap);
   }
@@ -305,7 +291,7 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
   }
 }
 
-} /* namespace processor */
+} /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 09c3fa3..70de3f6 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -17,18 +17,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "core/ProcessSession.h"
+#include <sys/time.h>
+#include <time.h>
 #include <vector>
 #include <queue>
 #include <map>
+#include <memory>
+#include <string>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
 #include <chrono>
 #include <thread>
 #include <iostream>
 
-#include "core/ProcessSession.h"
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -37,8 +38,8 @@ namespace core {
 
 std::shared_ptr<core::FlowFile> ProcessSession::create() {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
-      empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
+      process_context_->getProvenanceRepository(), empty);
 
   _addedFlowFiles[record->getUUIDStr()] = record;
   logger_->log_debug("Create FlowFile with UUID %s",
@@ -50,10 +51,11 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() {
   return record;
 }
 
-std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::create(
+    std::shared_ptr<core::FlowFile> &&parent) {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
-      empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
+      process_context_->getProvenanceRepository(), empty);
 
   if (record) {
     _addedFlowFiles[record->getUUIDStr()] = record;
@@ -77,7 +79,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::Flo
     record->setLineageStartDate(parent->getlineageStartDate());
     record->setLineageIdentifiers(parent->getlineageIdentifiers());
     parent->getlineageIdentifiers().insert(parent->getUUIDStr());
-
   }
   return record;
 }
@@ -93,7 +94,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(
       record->setOffset(parent->getOffset());
       record->setSize(parent->getSize());
       record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
-      ;
     }
     provenance_report_->clone(parent, record);
   }
@@ -103,8 +103,8 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(
 std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
     std::shared_ptr<core::FlowFile> &parent) {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
-      empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
+      process_context_->getProvenanceRepository(), empty);
 
   if (record) {
     this->_clonedFlowFiles[record->getUUIDStr()] = record;
@@ -134,7 +134,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
       record->setOffset(parent->getOffset());
       record->setSize(parent->getSize());
       record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
-      ;
     }
     provenance_report_->clone(parent, record);
   }
@@ -143,12 +142,11 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
 }
 
 std::shared_ptr<core::FlowFile> ProcessSession::clone(
-    std::shared_ptr<core::FlowFile> &parent, long offset, long size) {
+    std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
   std::shared_ptr<core::FlowFile> record = this->create(parent);
   if (record) {
-
     if (parent->getResourceClaim()) {
-      if ((offset + size) > (long) parent->getSize()) {
+      if ((offset + size) > parent->getSize()) {
         // Set offset and size
         logger_->log_error("clone offset %d and size %d exceed parent size %d",
                            offset, size, parent->getSize());
@@ -165,7 +163,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(
       std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
       record->setResourceClaim(parent_claim);
       if (parent_claim != nullptr) {
-
         record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
       }
     }
@@ -531,13 +528,13 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow,
 }
 
 /**
-  * Imports a file from the data stream
-  * @param stream incoming data stream that contains the data to store into a file
-  * @param flow flow file
-  *
-  */
+ * Imports a file from the data stream
+ * @param stream incoming data stream that contains the data to store into a file
+ * @param flow flow file
+ *
+ */
 void ProcessSession::importFrom(io::DataStream &stream,
-                            std::shared_ptr<core::FlowFile> &&flow) {
+                                std::shared_ptr<core::FlowFile> &&flow) {
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
 
   int max_read = getpagesize();
@@ -550,27 +547,21 @@ void ProcessSession::importFrom(io::DataStream &stream,
     fs.open(claim->getContentFullPath().c_str(),
             std::fstream::out | std::fstream::binary | std::fstream::trunc);
 
-
-    if (fs.is_open() ) {
-
+    if (fs.is_open()) {
       size_t position = 0;
       const size_t max_size = stream.getSize();
       size_t read_size = max_read;
-      while(position < max_size)
-      {
-        if ((max_size - position) > max_read)
-        {
+      while (position < max_size) {
+        if ((max_size - position) > max_read) {
           read_size = max_read;
-        }
-        else
-        {
+        } else {
           read_size = max_size - position;
         }
         charBuffer.clear();
-        stream.readData(charBuffer,read_size);
+        stream.readData(charBuffer, read_size);
 
-        fs.write((const char*)charBuffer.data(),read_size);
-        position+=read_size;
+        fs.write((const char*) charBuffer.data(), read_size);
+        position += read_size;
       }
       // Open the source file and stream to the flow file
 
@@ -603,7 +594,6 @@ void ProcessSession::importFrom(io::DataStream &stream,
     } else {
       throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
     }
-
   } catch (std::exception &exception) {
     if (flow && flow->getResourceClaim() == claim) {
       flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -730,7 +720,6 @@ void ProcessSession::import(std::string source,
           fs.write(buf, input.gcount());
       }
 
-
       if (fs.good() && fs.tellp() >= 0) {
         flow->setSize(fs.tellp());
         flow->setOffset(0);
@@ -786,7 +775,6 @@ void ProcessSession::import(std::string source,
 }
 
 void ProcessSession::commit() {
-
   try {
     // First we clone the flow record based on the transfered relationship for updated flow record
     for (auto && it : _updatedFlowFiles) {
@@ -962,7 +950,6 @@ void ProcessSession::rollback() {
         flowf->setSnapShot(false);
         connection->put(record);
       }
-
     }
     _originalFlowFiles.clear();
 
@@ -1010,7 +997,8 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       _updatedFlowFiles[ret->getUUIDStr()] = ret;
       std::map<std::string, std::string> empty;
       std::shared_ptr<core::FlowFile> snapshot =
-          std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),empty);
+          std::make_shared<FlowFileRecord>(
+              process_context_->getProvenanceRepository(), empty);
       logger_->log_debug("Create Snapshot FlowFile with UUID %s",
                          snapshot->getUUIDStr().c_str());
       snapshot = ret;
@@ -1026,7 +1014,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
   return NULL;
 }
 
-} /* namespace processor */
+} /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp
index 445ca58..31b7481 100644
--- a/libminifi/src/core/ProcessSessionFactory.cpp
+++ b/libminifi/src/core/ProcessSessionFactory.cpp
@@ -19,23 +19,19 @@
  */
 
 #include "core/ProcessSessionFactory.h"
-
 #include <memory>
 
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
 
-std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession()
-{
-	return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_));
+std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() {
+  return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_));
 }
 
-
-} /* namespace processor */
+} /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ba52c28..9a0898a 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -17,19 +17,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "core/Processor.h"
+#include <sys/time.h>
+#include <time.h>
 #include <vector>
 #include <queue>
 #include <map>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
 #include <chrono>
+#include <string>
 #include <thread>
 #include <memory>
 #include <functional>
-
-#include "core/Processor.h"
-
 #include "Connection.h"
 #include "core/Connectable.h"
 #include "core/ProcessContext.h"
@@ -45,7 +44,6 @@ namespace core {
 Processor::Processor(std::string name, uuid_t uuid)
     : Connectable(name, uuid),
       ConfigurableComponent(logging::Logger::getLogger()) {
-
   has_work_.store(false);
   // Setup the default values
   state_ = DISABLED;
@@ -74,7 +72,6 @@ void Processor::setScheduledState(ScheduledState state) {
 }
 
 bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
-  
   bool ret = false;
 
   if (isRunning()) {
@@ -82,7 +79,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
                       name_.c_str());
     return false;
   }
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
+      conn);
   std::lock_guard<std::mutex> lock(mutex_);
 
   uuid_t srcUUID;
@@ -128,7 +126,6 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
         ret = true;
       }
     } else {
-
       // We do not have any outgoing connection for this relationship yet
       std::set<std::shared_ptr<Connectable>> newConnection;
       newConnection.insert(connection);
@@ -156,8 +153,9 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
 
   uuid_t srcUUID;
   uuid_t destUUID;
-  
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
+      conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
@@ -193,8 +191,6 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   }
 }
 
-
-
 bool Processor::flowFilesQueued() {
   std::lock_guard<std::mutex> lock(mutex_);
 
@@ -202,7 +198,8 @@ bool Processor::flowFilesQueued() {
     return false;
 
   for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+    std::shared_ptr<Connection> connection =
+        std::static_pointer_cast<Connection>(conn);
     if (connection->getQueueSize() > 0)
       return true;
   }
@@ -217,7 +214,8 @@ bool Processor::flowFilesOutGoingFull() {
     // We already has connection for this relationship
     std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
     for (const auto conn : existedConnection) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<
+          Connection>(conn);
       if (connection->isFull())
         return true;
     }
@@ -251,7 +249,8 @@ bool Processor::isWorkAvailable() {
 
   try {
     for (const auto &conn : _incomingConnections) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<
+          Connection>(conn);
       if (connection->getQueueSize() > 0) {
         hasWork = true;
         break;
@@ -259,13 +258,14 @@ bool Processor::isWorkAvailable() {
     }
   } catch (...) {
     logger_->log_error(
-        "Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!");
+        "Caught an exception while checking if work is available;"
+        " unless it was positively determined that work is available, assuming NO work is available!");
   }
 
   return hasWork;
 }
 
-} /* namespace processor */
+} /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessorNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
index 44491d3..8979e32 100644
--- a/libminifi/src/core/ProcessorNode.cpp
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -16,7 +16,7 @@
  */
 
 #include "core/ProcessorNode.h"
-
+#include <memory>
 namespace org {
 namespace apache {
 namespace nifi {
@@ -25,29 +25,23 @@ namespace core {
 
 ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor)
     : processor_(processor),
-      Connectable(processor->getName(),0),
+      Connectable(processor->getName(), 0),
       ConfigurableComponent(logging::Logger::getLogger()) {
-	
-	uuid_t copy;
-	processor->getUUID(copy);
-	setUUID( copy );
-
-
+  uuid_t copy;
+  processor->getUUID(copy);
+  setUUID(copy);
 }
 
 ProcessorNode::ProcessorNode(const ProcessorNode &other)
     : processor_(other.processor_),
       Connectable(other.getName(), 0),
       ConfigurableComponent(logging::Logger::getLogger()) {
-	
-	uuid_t copy;
-	processor_->getUUID(copy);
-	setUUID( copy );
-
+  uuid_t copy;
+  processor_->getUUID(copy);
+  setUUID(copy);
 }
 
 ProcessorNode::~ProcessorNode() {
-
 }
 
 bool ProcessorNode::isWorkAvailable() {
@@ -58,7 +52,7 @@ bool ProcessorNode::isRunning() {
   return processor_->isRunning();
 }
 
-} /* namespace processor */
+} /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Property.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp
index 287b7ec..aa002af 100644
--- a/libminifi/src/core/Property.cpp
+++ b/libminifi/src/core/Property.cpp
@@ -17,10 +17,11 @@
  */
 
 #include "core/Property.h"
-
+#include <string>
 namespace org {
 namespace apache {
-namespace nifi {namespace minifi {
+namespace nifi {
+namespace minifi {
 namespace core {
 
 // Get Name for the property