You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/07/27 16:43:42 UTC

[3/6] nifi-minifi-cpp git commit: MINIFI-249: Update prov repo to better abstract deser.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
index e6d561a..ac092ea 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -18,6 +18,7 @@
 #include "core/repository/FlowFileRepository.h"
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 #include "FlowFileRecord.h"
 
@@ -36,24 +37,36 @@ void FlowFileRepository::run() {
     uint64_t curTime = getTimeMillis();
     uint64_t size = repoSize();
     if (size >= purgeThreshold) {
-      std::vector<std::string> purgeList;
+      std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
+      std::vector<std::pair<std::string, uint64_t>> keyRemovalList;
       leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
       for (it->SeekToFirst(); it->Valid(); it->Next()) {
-        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
         std::string key = it->key().ToString();
         if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
-          if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
-            purgeList.push_back(key);
+          if ((curTime - eventRead->getEventTime()) > max_partition_millis_) {
+            purgeList.push_back(eventRead);
+            keyRemovalList.push_back(std::make_pair(key, it->value().size()));
+          }
         } else {
           logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str());
-          purgeList.push_back(key);
+          keyRemovalList.push_back(std::make_pair(key, it->value().size()));
         }
       }
       delete it;
-      for (auto eventId : purgeList) {
-        logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
-        Delete(eventId);
+      for (auto eventId : keyRemovalList) {
+        logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
+        if (Delete(eventId.first)) {
+          repo_size_ -= eventId.second;
+        }
+      }
+
+      for (const auto &ffr : purgeList) {
+        auto claim = ffr->getResourceClaim();
+        if (claim != nullptr) {
+          content_repo_->remove(claim);
+        }
       }
     }
     if (size > max_partition_bytes_)
@@ -61,22 +74,23 @@ void FlowFileRepository::run() {
     else
       repo_full_ = false;
   }
-  return;
 }
 
-void FlowFileRepository::loadComponent() {
-  std::vector<std::string> purgeList;
+void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
+  content_repo_ = content_repo;
+  std::vector<std::pair<std::string, uint64_t>> purgeList;
   leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
     std::string key = it->key().ToString();
+    repo_size_ += it->value().size();
     if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
       auto search = connectionMap.find(eventRead->getConnectionUuid());
       if (search != connectionMap.end()) {
         // we find the connection for the persistent flowfile, create the flowfile and enqueue that
         std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
-        std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
+        std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
         // set store to repo to true so that we do need to persistent again in enqueue
         record->setStoredToRepository(true);
         search->second->put(record);
@@ -84,19 +98,19 @@ void FlowFileRepository::loadComponent() {
         if (eventRead->getContentFullPath().length() > 0) {
           std::remove(eventRead->getContentFullPath().c_str());
         }
-        purgeList.push_back(key);
+        purgeList.push_back(std::make_pair(key, it->value().size()));
       }
     } else {
-      purgeList.push_back(key);
+      purgeList.push_back(std::make_pair(key, it->value().size()));
     }
   }
 
   delete it;
-  std::vector<std::string>::iterator itPurge;
-  for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
-    std::string eventId = *itPurge;
-    logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
-    Delete(eventId);
+  for (auto eventId : purgeList) {
+    logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
+    if (Delete(eventId.first)) {
+      repo_size_ -= eventId.second;
+    }
   }
 
   return;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
new file mode 100644
index 0000000..ac575c5
--- /dev/null
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/repository/VolatileContentRepository.h"
+#include <cstdio>
+#include <string>
+#include <memory>
+#include <thread>
+#include "utils/StringUtils.h"
+#include "io/FileStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+const char *VolatileContentRepository::minimal_locking = "minimal.locking";
+
+bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) {
+  VolatileRepository::initialize(configure);
+  resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim> rhsPtr) {
+    if (lhsPtr == nullptr || rhsPtr == nullptr) {
+      return false;
+    }
+    return lhsPtr->getContentFullPath() == rhsPtr->getContentFullPath();};
+  resource_claim_check_ = [](std::shared_ptr<minifi::ResourceClaim> claim) {
+    return claim->getFlowFileRecordOwnedCount() <= 0;};
+  claim_reclaimer_ = [&](std::shared_ptr<minifi::ResourceClaim> claim) {if (claim->getFlowFileRecordOwnedCount() <= 0) {
+      remove(claim);
+    }
+  };
+
+  if (configure != nullptr) {
+    bool minimize_locking = false;
+    std::string value;
+    std::stringstream strstream;
+    strstream << Configure::nifi_volatile_repository_options << getName() << "." << minimal_locking;
+    if (configure->get(strstream.str(), value)) {
+      utils::StringUtils::StringToBool(value, minimize_locking);
+      minimize_locking_ = minimize_locking;
+    }
+  }
+  if (!minimize_locking_) {
+    for (auto ent : value_vector_) {
+      delete ent;
+    }
+    value_vector_.clear();
+  }
+  start();
+
+  return true;
+}
+
+void VolatileContentRepository::stop() {
+  running_ = false;
+}
+
+void VolatileContentRepository::run() {
+}
+
+void VolatileContentRepository::start() {
+  if (this->purge_period_ <= 0)
+    return;
+  if (running_)
+    return;
+  thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>());
+  thread_.detach();
+  running_ = true;
+  logger_->log_info("%s Repository Monitor Thread Start", name_);
+}
+
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  logger_->log_debug("enter write");
+  {
+    std::lock_guard<std::mutex> lock(map_mutex_);
+    auto claim_check = master_list_.find(claim->getContentFullPath());
+    if (claim_check != master_list_.end()) {
+      logger_->log_debug("Creating copy of atomic entry");
+      auto ent = claim_check->second->takeOwnership();
+      if (ent == nullptr) {
+        logger_->log_debug("write returns nullptr");
+        return nullptr;
+      }
+      return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+    }
+  }
+
+  int size = 0;
+  if (__builtin_expect(minimize_locking_ == true, 1)) {
+    logger_->log_debug("Minimize locking");
+    for (auto ent : value_vector_) {
+      if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
+        std::lock_guard<std::mutex> lock(map_mutex_);
+        master_list_[claim->getContentFullPath()] = ent;
+        return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+      }
+      size++;
+    }
+  } else {
+    std::lock_guard < std::mutex > lock(map_mutex_);
+    auto claim_check = master_list_.find(claim->getContentFullPath());
+    if (claim_check != master_list_.end()) {
+      logger_->log_debug("Creating copy of atomic entry");
+      return std::make_shared < io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second);
+    } else {
+      logger_->log_debug("Creating new atomic entry");
+      AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(&current_size_, &max_size_);
+      if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
+        master_list_[claim->getContentFullPath()] = ent;
+        return std::make_shared< io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+      }
+    }
+  }
+  logger_->log_debug("write returns nullptr %d", size);
+  return nullptr;
+}
+
+std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  logger_->log_debug("enter read");
+  int size = 0;
+  {
+    std::lock_guard<std::mutex> lock(map_mutex_);
+    auto claim_check = master_list_.find(claim->getContentFullPath());
+    if (claim_check != master_list_.end()) {
+      auto ent = claim_check->second->takeOwnership();
+      if (ent == nullptr) {
+        return nullptr;
+      }
+      return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+    }
+  }
+  logger_->log_debug("enter read for %s after %d", claim->getContentFullPath(), size);
+  return nullptr;
+}
+
+bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  logger_->log_debug("enter remove for %s, reducing %d", claim->getContentFullPath(), current_size_.load());
+  if (__builtin_expect(minimize_locking_ == true, 1)) {
+    std::lock_guard<std::mutex> lock(map_mutex_);
+    auto ent = master_list_.find(claim->getContentFullPath());
+    if (ent != master_list_.end()) {
+      // if we cannot remove the entry we will let the owner's destructor
+      // decrement the reference count and free it
+      if (ent->second->freeValue(claim)) {
+        logger_->log_debug("removed %s", claim->getContentFullPath());
+        return true;
+      }
+      master_list_.erase(claim->getContentFullPath());
+    }
+  } else {
+    std::lock_guard<std::mutex> lock(map_mutex_);
+    delete master_list_[claim->getContentFullPath()];
+    master_list_.erase(claim->getContentFullPath());
+    return true;
+  }
+
+  logger_->log_debug("could not remove %s", claim->getContentFullPath());
+  return false;
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/VolatileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp
index a7e3a51..6e3e1c6 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 #include "core/repository/VolatileRepository.h"
+#include <map>
 #include <memory>
+#include <limits>
 #include <string>
 #include <vector>
 #include "FlowFileRecord.h"
@@ -28,33 +30,6 @@ namespace minifi {
 namespace core {
 namespace repository {
 
-const char *VolatileRepository::volatile_repo_max_count = "max.count";
-
-void VolatileRepository::run() {
-  repo_full_ = false;
-}
-
-/**
- * Purge
- */
-void VolatileRepository::purge() {
-  while (current_size_ > max_size_) {
-    for (auto ent : value_vector_) {
-      // let the destructor do the cleanup
-      RepoValue value;
-      if (ent->getValue(value)) {
-        current_size_ -= value.size();
-        logger_->log_info("VolatileRepository -- purge %s %d %d %d", value.getKey(), current_size_.load(), max_size_, current_index_.load());
-      }
-      if (current_size_ < max_size_)
-        break;
-    }
-  }
-}
-
-void VolatileRepository::loadComponent() {
-}
-
 } /* namespace repository */
 } /* namespace core */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 4ce944e..b5d9a8f 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -35,7 +35,8 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
   uuid_t uuid;
   int64_t version = 0;
 
-  checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+  checkRequiredField(&rootFlowNode, "name",
+  CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   std::string flowName = rootFlowNode["name"].as<std::string>();
   std::string id = getOrGenerateId(&rootFlowNode);
   uuid_parse(id.c_str(), uuid);
@@ -47,10 +48,8 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
     }
   }
 
-  logger_->log_debug(
-      "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
-  std::unique_ptr<core::ProcessGroup> group =
-      FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
+  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
+  std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
 
   this->name_ = flowName;
 
@@ -77,7 +76,8 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
         core::ProcessorConfig procCfg;
         YAML::Node procNode = iter->as<YAML::Node>();
 
-        checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
+        checkRequiredField(&procNode, "name",
+        CONFIG_YAML_PROCESSORS_KEY);
         procCfg.name = procNode["name"].as<std::string>();
         procCfg.id = getOrGenerateId(&procNode);
         uuid_parse(procCfg.id.c_str(), uuid);
@@ -101,11 +101,13 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
         }
         processor->setName(procCfg.name);
 
-        checkRequiredField(&procNode, "scheduling strategy", CONFIG_YAML_PROCESSORS_KEY);
+        checkRequiredField(&procNode, "scheduling strategy",
+        CONFIG_YAML_PROCESSORS_KEY);
         procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
         logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
 
-        checkRequiredField(&procNode, "scheduling period", CONFIG_YAML_PROCESSORS_KEY);
+        checkRequiredField(&procNode, "scheduling period",
+        CONFIG_YAML_PROCESSORS_KEY);
         procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
         logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
 
@@ -224,13 +226,15 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
       for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
         YAML::Node currRpgNode = iter->as<YAML::Node>();
 
-        checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+        checkRequiredField(&currRpgNode, "name",
+        CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         auto name = currRpgNode["name"].as<std::string>();
         id = getOrGenerateId(&currRpgNode);
 
         logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
 
-        checkRequiredField(&currRpgNode, "url", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+        checkRequiredField(&currRpgNode, "url",
+        CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         std::string url = currRpgNode["url"].as<std::string>();
         logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
 
@@ -266,7 +270,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
         group->setTransmitting(true);
         group->setURL(url);
 
-        checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+        checkRequiredField(&currRpgNode, "Input Ports",
+        CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
         if (inputPorts && inputPorts.IsSequence()) {
           for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
@@ -312,9 +317,11 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
 
   YAML::Node node = reportNode->as<YAML::Node>();
 
-  checkRequiredField(&node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  checkRequiredField(&node, "scheduling strategy",
+  CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
-  checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  checkRequiredField(&node, "scheduling period",
+  CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
 
   core::TimeUnit unit;
@@ -423,7 +430,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
 
         // Configure basic connection
         uuid_t uuid;
-        checkRequiredField(&connectionNode, "name", CONFIG_YAML_CONNECTIONS_KEY);
+        checkRequiredField(&connectionNode, "name",
+        CONFIG_YAML_CONNECTIONS_KEY);
         std::string name = connectionNode["name"].as<std::string>();
         std::string id = getOrGenerateId(&connectionNode);
         uuid_parse(id.c_str(), uuid);
@@ -431,7 +439,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
         logger_->log_debug("Created connection with UUID %s and name %s", id, name);
 
         // Configure connection source
-        checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY);
+        checkRequiredField(&connectionNode, "source relationship name",
+        CONFIG_YAML_CONNECTIONS_KEY);
         auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
         core::Relationship relationship(rawRelationship, "");
         logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
@@ -441,6 +450,24 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
 
         uuid_t srcUUID;
 
+        if (connectionNode["max work queue size"]) {
+          auto max_work_queue_str = connectionNode["max work queue size"].as<std::string>();
+          int64_t max_work_queue_size = 0;
+          if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+            connection->setMaxQueueSize(max_work_queue_size);
+          }
+          logger_->log_debug("Setting %d as the max queue size for %s", max_work_queue_size, name);
+        }
+
+        if (connectionNode["max work queue data size"]) {
+          auto max_work_queue_str = connectionNode["max work queue data size"].as<std::string>();
+          int64_t max_work_queue_data_size = 0;
+          if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+            connection->setMaxQueueDataSize(max_work_queue_data_size);
+          }
+          logger_->log_debug("Setting %d as the max queue data size for %s", max_work_queue_data_size, name);
+        }
+
         if (connectionNode["source id"]) {
           std::string connectionSrcProcId = connectionNode["source id"].as<std::string>();
           uuid_parse(connectionSrcProcId.c_str(), srcUUID);
@@ -449,7 +476,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
                              name, connectionSrcProcId);
         } else {
           // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
-          checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY);
+          checkRequiredField(&connectionNode, "source name",
+          CONFIG_YAML_CONNECTIONS_KEY);
           std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
           uuid_t tmpUUID;
           if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) {
@@ -486,7 +514,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
         } else {
           // we use the same logic as above for resolving the source processor
           // for looking up the destination processor in absence of a processor id
-          checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY);
+          checkRequiredField(&connectionNode, "destination name",
+          CONFIG_YAML_CONNECTIONS_KEY);
           std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
           uuid_t tmpUUID;
           if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
@@ -534,7 +563,8 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
   YAML::Node inputPortsObj = portNode->as<YAML::Node>();
 
   // Check for required fields
-  checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+  checkRequiredField(&inputPortsObj, "name",
+  CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   auto nameStr = inputPortsObj["name"].as<std::string>();
   checkRequiredField(&inputPortsObj, "id",
   CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/AtomicEntryStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/AtomicEntryStream.cpp b/libminifi/src/io/AtomicEntryStream.cpp
new file mode 100644
index 0000000..aac9723
--- /dev/null
+++ b/libminifi/src/io/AtomicEntryStream.cpp
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "io/AtomicEntryStream.h"
+#include <vector>
+#include <mutex>
+#include <string>
+#include "io/Serializable.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 57d6f03..5f7f5f4 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -170,6 +170,9 @@ int16_t Socket::initialize() {
   int hh_errno;
   gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
 #endif
+  if (h == nullptr) {
+    return -1;
+  }
   memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
 
   auto p = addr_info_;
@@ -197,7 +200,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
   tv.tv_sec = msec / 1000;
   tv.tv_usec = (msec % 1000) * 1000;
 
-  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+  std::lock_guard < std::recursive_mutex > guard(selection_mutex_);
 
   if (msec > 0)
     retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
@@ -241,14 +244,12 @@ int16_t Socket::setSocketOptions(const int sock) {
   bool nagle_off = true;
 #ifndef __MACH__
   if (nagle_off) {
-    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt))
-        < 0) {
+    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) {
       logger_->log_error("setsockopt() TCP_NODELAY failed");
       close(sock);
       return -1;
     }
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt),
-            sizeof(opt)) < 0) {
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
       logger_->log_error("setsockopt() SO_REUSEADDR failed");
       close(sock);
       return -1;
@@ -256,8 +257,7 @@ int16_t Socket::setSocketOptions(const int sock) {
   }
 
   int sndsize = 256 * 1024;
-  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>( &sndsize),
-          sizeof(sndsize)) < 0) {
+  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>(&sndsize), sizeof(sndsize)) < 0) {
     logger_->log_error("setsockopt() SO_SNDBUF failed");
     close(sock);
     return -1;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/FileStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
new file mode 100644
index 0000000..3b2bfe1
--- /dev/null
+++ b/libminifi/src/io/FileStream.cpp
@@ -0,0 +1,160 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "io/FileStream.h"
+#include <fstream>
+#include <vector>
+#include <memory>
+#include <string>
+#include "io/validation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+FileStream::FileStream(const std::string &path)
+    : logger_(logging::LoggerFactory<FileStream>::getLogger()),
+      path_(path),
+      offset_(0) {
+  file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
+  file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
+  file_stream_->seekg(0, file_stream_->end);
+  file_stream_->seekp(0, file_stream_->end);
+  int len = file_stream_->tellg();
+  if (len > 0) {
+    length_ = len;
+  } else {
+    length_ = 0;
+  }
+  seek(offset_);
+}
+
+FileStream::FileStream(const std::string &path, uint32_t offset, bool write_enable)
+    : logger_(logging::LoggerFactory<FileStream>::getLogger()),
+      path_(path) {
+  file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
+  if (write_enable) {
+    file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
+  } else {
+    file_stream_->open(path.c_str(), std::fstream::in | std::fstream::binary);
+  }
+  file_stream_->seekg(0, file_stream_->end);
+  file_stream_->seekp(0, file_stream_->end);
+  int len = file_stream_->tellg();
+  if (len > 0) {
+    length_ = len;
+  } else {
+    length_ = 0;
+  }
+  seek(offset);
+}
+
+void FileStream::closeStream() {
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
+  if (file_stream_ != nullptr) {
+    file_stream_->close();
+    file_stream_ = nullptr;
+  }
+}
+
+void FileStream::seek(uint64_t offset) {
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
+  offset_ = offset;
+  file_stream_->clear();
+  file_stream_->seekg(offset_);
+  file_stream_->seekp(offset_);
+}
+
+int FileStream::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen) {
+    return -1;
+  }
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int FileStream::writeData(uint8_t *value, int size) {
+  if (!IsNullOrEmpty(value)) {
+    std::lock_guard<std::recursive_mutex> lock(file_lock_);
+    if (file_stream_->write(reinterpret_cast<const char*>(value), size)) {
+      offset_ += size;
+      if (offset_ > length_) {
+        length_ = offset_;
+      }
+      file_stream_->seekg(offset_);
+      file_stream_->flush();
+      return size;
+    } else {
+      return -1;
+    }
+  } else {
+    return -1;
+  }
+}
+
+template<typename T>
+inline std::vector<uint8_t> FileStream::readBuffer(const T& t) {
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  return buf;
+}
+
+int FileStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen) {
+    buf.resize(buflen);
+  }
+  int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+  if (ret < buflen) {
+    buf.resize(ret);
+  }
+  return ret;
+}
+
+int FileStream::readData(uint8_t *buf, int buflen) {
+  if (!IsNullOrEmpty(buf)) {
+    std::lock_guard<std::recursive_mutex> lock(file_lock_);
+    file_stream_->read(reinterpret_cast<char*>(buf), buflen);
+    if ((file_stream_->rdstate() & (file_stream_->eofbit | file_stream_->failbit)) != 0) {
+      file_stream_->clear();
+      file_stream_->seekg(0, file_stream_->end);
+      file_stream_->seekp(0, file_stream_->end);
+      int len = file_stream_->tellg();
+      offset_ = len;
+      length_ = len;
+      return offset_;
+    } else {
+      offset_ += buflen;
+      file_stream_->seekp(offset_);
+      return buflen;
+    }
+
+  } else {
+    return -1;
+  }
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp
index 7990edd..288f4d1 100644
--- a/libminifi/src/io/StreamFactory.cpp
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -47,11 +47,11 @@ class SocketCreator : public AbstractStreamFactory {
  public:
   template<typename Q = V>
   ContextTypeCheck<true, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) {
-    return std::make_shared<V>(configure);
+    return std::make_shared < V > (configure);
   }
   template<typename Q = V>
   ContextTypeCheck<false, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) {
-    return std::make_shared<SocketContext>(configure);
+    return std::make_shared < SocketContext > (configure);
   }
 
   SocketCreator<T, V>(std::shared_ptr<Configure> configure) {
@@ -69,7 +69,7 @@ class SocketCreator : public AbstractStreamFactory {
 
   std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) {
     T *socket = create(host, port);
-    return std::unique_ptr<Socket>(socket);
+    return std::unique_ptr < Socket > (socket);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 323d69a..9c6f732 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -145,7 +145,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               break;
             logger_->log_info("Execute Command Respond %d", numRead);
             ExecuteProcess::WriteCallback callback(buffer, numRead);
-            std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+            std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
             if (!flowFile)
               continue;
             flowFile->addAttribute("command", _command.c_str());
@@ -167,7 +167,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
                 // child exits and close the pipe
                 ExecuteProcess::WriteCallback callback(buffer, totalRead);
                 if (!flowFile) {
-                  flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+                  flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
                   if (!flowFile)
                     break;
                   flowFile->addAttribute("command", _command.c_str());
@@ -185,7 +185,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
                 logger_->log_info("Execute Command Max Respond %d", sizeof(buffer));
                 ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
-                  flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+                  flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
                   if (!flowFile)
                     continue;
                   flowFile->addAttribute("command", _command.c_str());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp
index 3741a8f..2fee3f2 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -91,7 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes
     }
     for (int i = 0; i < batchSize; i++) {
       // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
       if (!flowFile)
         return;
       if (fileSize > 0)
@@ -114,7 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes
     GenerateFlowFile::WriteCallback callback(_data, _dataSize);
     for (int i = 0; i < batchSize; i++) {
       // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
       if (!flowFile)
         return;
       if (fileSize > 0)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index f1dbb21..723d461 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -150,7 +150,7 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
         std::string fileName = list.front();
         list.pop();
         logger_->log_info("GetFile process %s", fileName.c_str());
-        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
         if (flowFile == nullptr)
           return;
         std::size_t found = fileName.find_last_of("/\\");
@@ -172,19 +172,19 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
 }
 
 bool GetFile::isListingEmpty() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   return _dirList.empty();
 }
 
 void GetFile::putListing(std::string fileName) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   _dirList.push(fileName);
 }
 
 void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) {
     std::string fileName = _dirList.front();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index d5097bb..7dc75d2 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -42,6 +42,8 @@
 #include "io/StreamFactory.h"
 #include "ResourceClaim.h"
 #include "utils/StringUtils.h"
+#include "utils/ByteInputCallBack.h"
+#include "utils/HTTPUtils.h"
 
 namespace org {
 namespace apache {
@@ -113,7 +115,7 @@ void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) {
   if (my_method == "POST") {
     curl_easy_setopt(curl, CURLOPT_POST, 1);
   } else if (my_method == "PUT") {
-    curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
+    curl_easy_setopt(curl, CURLOPT_PUT, 1);
   } else if (my_method == "GET") {
   } else {
     curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, my_method.c_str());
@@ -210,7 +212,7 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
   if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
     if (nullptr != service) {
-      ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+      ssl_context_service_ = std::static_pointer_cast < minifi::controllers::SSLContextService > (service);
     }
   }
 }
@@ -283,14 +285,14 @@ void InvokeHTTP::configure_secure_connection(CURL *http_session) {
 }
 
 void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
 
   logger_->log_info("onTrigger InvokeHTTP with  %s", method_.c_str());
 
   if (flowFile == nullptr) {
     if (!emitFlowFile(method_)) {
       logger_->log_info("InvokeHTTP -- create flow file with  %s", method_.c_str());
-      flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+      flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
     } else {
       logger_->log_info("exiting because method is %s", method_.c_str());
       return;
@@ -317,9 +319,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
   if (read_timeout_ > 0) {
     curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
   }
+
   utils::HTTPRequestResponse content;
-  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
-                   &utils::HTTPRequestResponse::recieve_write);
+  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
 
   curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content));
 
@@ -333,13 +335,10 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
       callbackObj->ptr = callback;
       callbackObj->pos = 0;
       logger_->log_info("InvokeHTTP -- Setting callback");
-      curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L);
-      curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE,
-                       (curl_off_t)callback->getBufferSize());
-      curl_easy_setopt(http_session, CURLOPT_READFUNCTION,
-                       &utils::HTTPRequestResponse::send_write);
-      curl_easy_setopt(http_session, CURLOPT_READDATA,
-                       static_cast<void*>(callbackObj));
+      curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize());
+      curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
+      curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj));
+
     } else {
       logger_->log_error("InvokeHTTP -- no resource claim");
     }
@@ -377,14 +376,14 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
     bool output_body_to_content = isSuccess && !putToAttribute;
     bool body_empty = IsNullOrEmpty(content.data);
 
-    logger_->log_info("isSuccess: %d", isSuccess);
+    logger_->log_info("isSuccess: %d, response code %d ", isSuccess, http_code);
     std::shared_ptr<FlowFileRecord> response_flow = nullptr;
 
     if (output_body_to_content) {
       if (flowFile != nullptr) {
-        response_flow = std::static_pointer_cast<FlowFileRecord>(session->create(flowFile));
+        response_flow = std::static_pointer_cast < FlowFileRecord > (session->create(flowFile));
       } else {
-        response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
+        response_flow = std::static_pointer_cast < FlowFileRecord > (session->create());
       }
 
       std::string ct = content_type;
@@ -398,7 +397,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
       session->importFrom(stream, response_flow);
     } else {
       logger_->log_info("Cannot output body to content");
-      response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
+      response_flow = std::static_pointer_cast < FlowFileRecord > (session->create());
     }
     route(flowFile, response_flow, session, context, isSuccess, http_code);
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
index c26b41d..d410547 100644
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -201,7 +201,7 @@ ListenHTTP::~ListenHTTP() {
 }
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -243,7 +243,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *
 
   auto session = _processSessionFactory->createSession();
   ListenHTTP::WriteCallback callback(conn, req_info);
-  auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+  auto flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
 
   if (!flowFile) {
     sendErrorResponse(conn);
@@ -295,11 +295,11 @@ ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struc
   _reqInfo = reqInfo;
 }
 
-void ListenHTTP::WriteCallback::process(std::ofstream *stream) {
+int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
   int64_t rlen;
   int64_t nlen = 0;
   int64_t tlen = _reqInfo->content_length;
-  char buf[16384];
+  uint8_t buf[16384];
 
   while (nlen < tlen) {
     rlen = tlen - nlen;
@@ -320,6 +320,8 @@ void ListenHTTP::WriteCallback::process(std::ofstream *stream) {
 
     nlen += rlen;
   }
+
+  return nlen;
 }
 
 } /* namespace processors */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp
index 054d585..a5fdf28 100644
--- a/libminifi/src/processors/ListenSyslog.cpp
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -279,7 +279,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession
     SysLogEvent event = eventQueue.front();
     eventQueue.pop();
     if (firstEvent) {
-      flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+      flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
       if (!flowFile)
         return;
       ListenSyslog::WriteCallback callback(event.payload, event.len);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp
index e308901..ad8e664 100644
--- a/libminifi/src/processors/LogAttribute.cpp
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -71,8 +71,9 @@ void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession
 
   std::shared_ptr<core::FlowFile> flow = session->get();
 
-  if (!flow)
+  if (!flow) {
     return;
+  }
 
   std::string value;
   if (context->getProperty(LogLevel.getName(), value)) {
@@ -107,10 +108,10 @@ void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession
     message << "\n" << "Payload:" << "\n";
     ReadCallback callback(flow->getSize());
     session->read(flow, &callback);
-    for (unsigned int i = 0, j = 0; i < callback._readSize; i++) {
-      message << std::hex << callback._buffer[i];
+    for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) {
+      message << std::hex << callback.buffer_[i];
       j++;
-      if (j == 16) {
+      if (j == 80) {
         message << '\n';
         j = 0;
       }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index d72c56a..b425ae9 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -17,20 +17,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "processors/PutFile.h"
-#include <stdio.h>
+
+#include "../../include/processors/PutFile.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
 #include <uuid/uuid.h>
-#include <sstream>
-#include <string>
+#include <cstdint>
+#include <cstdio>
 #include <iostream>
 #include <memory>
 #include <set>
-#include <fstream>
-#include "io/validation.h"
-#include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
+#include <string>
+
+#include "../../include/core/logging/Logger.h"
+#include "../../include/core/ProcessContext.h"
+#include "../../include/core/Property.h"
+#include "../../include/core/Relationship.h"
+#include "../../include/io/BaseStream.h"
+#include "../../include/io/DataStream.h"
+#include "../../include/io/validation.h"
 
 namespace org {
 namespace apache {
@@ -76,7 +82,7 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
     return;
   }
 
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -142,10 +148,23 @@ PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::strin
 }
 
 // Copy the entire file contents to the temporary file
-void PutFile::ReadCallback::process(std::ifstream *stream) {
+int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
   // Copy file contents into tmp file
   _writeSucceeded = false;
-  _tmpFileOs << stream->rdbuf();
+  size_t size = 0;
+  uint8_t buffer[1024];
+  do {
+    int read = stream->read(buffer, 1024);
+    if (read < 0) {
+      return -1;
+    }
+    if (read == 0) {
+      break;
+    }
+    _tmpFileOs.write(reinterpret_cast<char*>(buffer), read);
+    size += read;
+  } while (size < stream->getSize());
+  return size;
   _writeSucceeded = true;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index 46ed1fb..f87f4ec 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -51,7 +51,8 @@ core::Property TailFile::StateFile("State File", "Specifies the file that should
                                    " what data has been ingested so that upon restart NiFi can resume from where it left off",
                                    "TailFileState");
 core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
-                                    "from the incoming file.", "");
+                                   "from the incoming file.",
+                                   "");
 core::Relationship TailFile::Success("success", "All files are routed to success");
 
 void TailFile::initialize() {
@@ -240,10 +241,9 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
       context->yield();
       return;
     }
-
-      std::size_t found = _currentTailFileName.find_last_of(".");
-      std::string baseName = _currentTailFileName.substr(0, found);
-      std::string extension = _currentTailFileName.substr(found + 1);
+    std::size_t found = _currentTailFileName.find_last_of(".");
+    std::string baseName = _currentTailFileName.substr(0, found);
+    std::string extension = _currentTailFileName.substr(found + 1);
 
     if (!this->_delimiter.empty()) {
       char delim = this->_delimiter.c_str()[0];
@@ -254,20 +254,20 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
       for (std::shared_ptr<FlowFileRecord> ffr : flowFiles) {
         logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize());
         std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
-          ffr->updateKeyedAttribute(PATH, fileLocation);
-          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+        ffr->updateKeyedAttribute(PATH, fileLocation);
+        ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
         ffr->updateKeyedAttribute(FILENAME, logName);
-          session->transfer(ffr, Success);
+        session->transfer(ffr, Success);
         this->_currentTailFilePosition += ffr->getSize() + 1;
         storeState();
       }
 
     } else {
-        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-        if (!flowFile)
-            return;
-        flowFile->updateKeyedAttribute(PATH, fileLocation);
-        flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+      if (!flowFile)
+        return;
+      flowFile->updateKeyedAttribute(PATH, fileLocation);
+      flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
       session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
       session->transfer(flowFile, Success);
       logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, flowFile->getSize());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 8686a58..3e42a5a 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -17,13 +17,12 @@
  */
 
 #include "provenance/Provenance.h"
-
 #include <arpa/inet.h>
 #include <cstdint>
 #include <memory>
 #include <string>
 #include <vector>
-
+#include "core/Repository.h"
 #include "io/DataStream.h"
 #include "io/Serializable.h"
 #include "core/logging/Logger.h"
@@ -42,30 +41,35 @@ std::shared_ptr<logging::Logger> ProvenanceEventRecord::logger_ = logging::Logge
 const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED",
     "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
 
-ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType) {
+ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType)
+    : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()),
+      _eventDuration(0),
+      _entryDate(0),
+      _lineageStartDate(0) {
   _eventType = event;
   _componentId = componentId;
   _componentType = componentType;
   _eventTime = getTimeMillis();
-  char eventIdStr[37];
-  // Generate the global UUID for th event
-  id_generator_->generate(_eventId);
-  uuid_unparse_lower(_eventId, eventIdStr);
-  _eventIdStr = eventIdStr;
 }
 
 // DeSerialize
-bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key) {
+bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) {
   std::string value;
   bool ret;
 
-  ret = repo->Get(key, value);
+  const std::shared_ptr<core::Repository> repo = std::dynamic_pointer_cast<core::Repository>(store);
+
+  if (nullptr == repo || IsNullOrEmpty(uuidStr_)) {
+    logger_->log_error("Repo could not be assigned");
+    return false;
+  }
+  ret = repo->Get(uuidStr_, value);
 
   if (!ret) {
-    logger_->log_error("NiFi Provenance Store event %s can not found", key.c_str());
+    logger_->log_error("NiFi Provenance Store event %s can not be found", uuidStr_);
     return false;
   } else {
-    logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length());
+    logger_->log_debug("NiFi Provenance Read event %s length %d", uuidStr_, value.length());
   }
 
   org::apache::nifi::minifi::io::DataStream stream((const uint8_t*) value.data(), value.length());
@@ -73,20 +77,20 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository>
   ret = DeSerialize(stream);
 
   if (ret) {
-    logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), stream.getSize(), _eventType);
+    logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", uuidStr_, stream.getSize(), _eventType);
   } else {
-    logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), stream.getSize(), _eventType);
+    logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", uuidStr_, stream.getSize(), _eventType);
   }
 
   return ret;
 }
 
-bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &repo) {
+bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) {
   org::apache::nifi::minifi::io::DataStream outStream;
 
   int ret;
 
-  ret = writeUTF(this->_eventIdStr, &outStream);
+  ret = writeUTF(this->uuidStr_, &outStream);
   if (ret <= 0) {
     return false;
   }
@@ -127,7 +131,7 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &r
     return false;
   }
 
-  ret = writeUTF(this->uuid_, &outStream);
+  ret = writeUTF(this->flow_uuid_, &outStream);
   if (ret <= 0) {
     return false;
   }
@@ -215,20 +219,20 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &r
     }
   }
   // Persistent to the DB
-  if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
-    logger_->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), outStream.getSize());
+  if (repo->Serialize(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+    logger_->log_debug("NiFi Provenance Store event %s size %d success", uuidStr_, outStream.getSize());
   } else {
-    logger_->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), outStream.getSize());
+    logger_->log_error("NiFi Provenance Store event %s size %d fail", uuidStr_, outStream.getSize());
   }
   return true;
 }
 
-bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
+bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
   int ret;
 
   org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize);
 
-  ret = readUTF(this->_eventIdStr, &outStream);
+  ret = readUTF(this->uuidStr_, &outStream);
 
   if (ret <= 0) {
     return false;
@@ -271,7 +275,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferS
     return false;
   }
 
-  ret = readUTF(this->uuid_, &outStream);
+  ret = readUTF(this->flow_uuid_, &outStream);
   if (ret <= 0) {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
index e4a8ffa..ce19fe4 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -31,31 +31,23 @@ void ProvenanceRepository::run() {
   uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
   while (running_) {
     std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
-    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
     uint64_t curTime = getTimeMillis();
     uint64_t size = repoSize();
     if (size >= purgeThreshold) {
-      std::vector<std::string> purgeList;
       leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
       for (it->SeekToFirst(); it->Valid(); it->Next()) {
         ProvenanceEventRecord eventRead;
         std::string key = it->key().ToString();
-        if (eventRead.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size())) {
-          if ((curTime - eventRead.getEventTime()) > max_partition_millis_)
-            purgeList.push_back(key);
+        uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size());
+        if (eventTime > 0) {
+          if ((curTime - eventTime) > max_partition_millis_)
+            Delete(key);
         } else {
           logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
-          purgeList.push_back(key);
+          Delete(key);
         }
       }
       delete it;
-      std::vector<std::string>::iterator itPurge;
-
-      for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
-        std::string eventId = *itPurge;
-        logger_->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str());
-        Delete(eventId);
-      }
     }
     if (size > max_partition_bytes_)
       repo_full_ = true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/CPPLINT.cfg
----------------------------------------------------------------------
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
index beed48a..a1e22ad 100644
--- a/libminifi/test/CPPLINT.cfg
+++ b/libminifi/test/CPPLINT.cfg
@@ -1,3 +1,4 @@
 set noparent
 filter=-build/include_order,-build/include_alpha
 exclude_files=Server.cpp
+exclude_files=TestBase.cpp
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
new file mode 100644
index 0000000..4c0814d
--- /dev/null
+++ b/libminifi/test/TestBase.cpp
@@ -0,0 +1,211 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "./TestBase.h"
+#include <memory>
+#include <vector>
+#include <set>
+#include <string>
+
+TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
+    :
+      content_repo_(content_repo),
+      flow_repo_(flow_repo),
+      prov_repo_(prov_repo),
+      location(-1),
+      finalized(false),
+      current_flowfile_(nullptr) {
+}
+
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+
+  uuid_t uuid;
+  uuid_generate(uuid);
+
+  // initialize the processor
+  processor->initialize();
+
+  processor_mapping_[processor->getUUIDStr()] = processor;
+
+  if (!linkToPrevious) {
+    termination_ = relationship;
+  } else {
+    std::shared_ptr<core::Processor> last = processor_queue_.back();
+
+    if (last == nullptr) {
+      last = processor;
+      termination_ = relationship;
+    }
+
+    std::stringstream connection_name;
+    connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+    connection->setRelationship(relationship);
+
+    // link the connections so that we can test results at the end for this
+    connection->setSource(last);
+    connection->setDestination(processor);
+
+    uuid_t uuid_copy, uuid_copy_next;
+    last->getUUID(uuid_copy);
+    connection->setSourceUUID(uuid_copy);
+    processor->getUUID(uuid_copy_next);
+    connection->setDestinationUUID(uuid_copy_next);
+    last->addConnection(connection);
+    if (last != processor) {
+      processor->addConnection(connection);
+    }
+    relationships_.push_back(connection);
+  }
+
+  std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+
+  processor_nodes_.push_back(node);
+
+  std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(*(node.get()), controller_services_provider_, flow_repo_, prov_repo_, content_repo_);
+  processor_contexts_.push_back(context);
+
+  processor_queue_.push_back(processor);
+
+  return processor;
+}
+
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+
+  uuid_t uuid;
+  uuid_generate(uuid);
+
+  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+  if (nullptr == ptr) {
+    throw std::exception();
+  }
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+  processor->setName(name);
+
+  return addProcessor(processor, name, relationship, linkToPrevious);
+}
+
+bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+  int i = 0;
+  for (i = 0; i < processor_queue_.size(); i++) {
+    if (processor_queue_.at(i) == proc) {
+      break;
+    }
+  }
+  if (i >= processor_queue_.size() || i < 0 || i >= processor_contexts_.size()) {
+    return false;
+  }
+
+  return processor_contexts_.at(i)->setProperty(prop, value);
+}
+
+void TestPlan::reset() {
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+  process_sessions_.clear();
+  factories_.clear();
+  location = -1;
+  for (auto proc : processor_queue_) {
+    while (proc->getActiveTasks() > 0) {
+      proc->decrementActiveTask();
+    }
+  }
+}
+
+bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify) {
+  if (!finalized) {
+    finalize();
+  }
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+  location++;
+  std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
+  std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
+  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context.get());
+  factories_.push_back(factory);
+  if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
+    processor->onSchedule(context.get(), factory.get());
+    configured_processors_.push_back(processor);
+  }
+  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context.get());
+  process_sessions_.push_back(current_session);
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  if (verify != nullptr) {
+    verify(context.get(), current_session.get());
+  } else {
+    processor->onTrigger(context.get(), current_session.get());
+  }
+  current_session->commit();
+  current_flowfile_ = current_session->get();
+  return location + 1 < processor_queue_.size();
+}
+
+std::set<provenance::ProvenanceEventRecord*> TestPlan::getProvenanceRecords() {
+  return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
+}
+
+std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() {
+  return current_flowfile_;
+}
+
+std::shared_ptr<minifi::Connection> TestPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
+  std::stringstream connection_name;
+  std::shared_ptr<core::Processor> last = processor;
+  connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+  connection->setRelationship(termination_);
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(last);
+  if (setDest)
+    connection->setDestination(processor);
+
+  uuid_t uuid_copy;
+  last->getUUID(uuid_copy);
+  connection->setSourceUUID(uuid_copy);
+  if (setDest)
+    connection->setDestinationUUID(uuid_copy);
+
+  processor->addConnection(connection);
+  return connection;
+}
+
+void TestPlan::finalize() {
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+  if (relationships_.size() > 0) {
+    relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+  } else {
+    for (auto processor : processor_queue_) {
+      relationships_.push_back(buildFinalConnection(processor, true));
+    }
+  }
+
+  finalized = true;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 331df08..47db4c3 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -25,6 +25,8 @@
 #include "ResourceClaim.h"
 #include "catch.hpp"
 #include <vector>
+#include <set>
+#include <map>
 #include "core/logging/Logger.h"
 #include "core/Core.h"
 #include "properties/Configure.h"
@@ -33,6 +35,14 @@
 #include "utils/Id.h"
 #include "spdlog/sinks/ostream_sink.h"
 #include "spdlog/sinks/dist_sink.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 
 class LogTestController {
  public:
@@ -105,7 +115,7 @@ class LogTestController {
   std::ostringstream log_output;
 
   std::shared_ptr<logging::Logger> logger_;
- private:
+   private:
   class TestBootstrapLogger : public logging::Logger {
    public:
     TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
@@ -138,6 +148,73 @@ class LogTestController {
   std::vector<std::string> modified_loggers;
 };
 
+class TestPlan {
+ public:
+
+  explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+                                                core::Relationship relationship = core::Relationship("success", "description"),
+                                                bool linkToPrevious = false);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+  bool linkToPrevious = false);
+
+  bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+  void reset();
+
+  bool runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr);
+
+  std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+  std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+  std::shared_ptr<core::Repository> getFlowRepo() {
+    return flow_repo_;
+  }
+
+  std::shared_ptr<core::Repository> getProvenanceRepo() {
+    return prov_repo_;
+  }
+
+  std::shared_ptr<core::ContentRepository> getContentRepo() {
+    return content_repo_;
+  }
+
+ protected:
+
+  void finalize();
+
+  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
+
+  std::atomic<bool> finalized;
+
+  std::shared_ptr<core::ContentRepository> content_repo_;
+
+  std::shared_ptr<core::Repository> flow_repo_;
+  std::shared_ptr<core::Repository> prov_repo_;
+
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+  std::recursive_mutex mutex;
+
+  int location;
+
+  std::shared_ptr<core::ProcessSession> current_session_;
+  std::shared_ptr<core::FlowFile> current_flowfile_;
+
+  std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+  std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+  std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+  std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+  std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+  std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+  std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+  std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+  core::Relationship termination_;
+};
+
 class TestController {
  public:
 
@@ -148,6 +225,25 @@ class TestController {
     utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>());
   }
 
+  std::shared_ptr<TestPlan> createPlan()
+  {
+    std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+    content_repo->initialize(configuration);
+
+    std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
+    return std::make_shared<TestPlan>(content_repo, repo, repo);
+  }
+
+  void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr) {
+
+    while (plan->runNextProcessor(verify) && runToCompletion)
+    {
+
+    }
+  }
+
   ~TestController() {
     for (auto dir : directories) {
       DIR *created_dir;
@@ -176,6 +272,10 @@ class TestController {
   }
 
  protected:
+
+  std::mutex test_mutex;
+  //std::map<std::string,>
+
   LogTestController &log;
   std::vector<char*> directories;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/TestServer.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h
new file mode 100644
index 0000000..263a6b3
--- /dev/null
+++ b/libminifi/test/TestServer.h
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_TEST_TESTSERVER_H_
+#define LIBMINIFI_TEST_TESTSERVER_H_
+#include <regex.h>
+#include <string>
+#include <iostream>
+#include "civetweb.h"
+
+/* Server context handle */
+static struct mg_context *ctx;
+static std::string resp_str;
+
+static int responder(struct mg_connection *conn, void *response) {
+  const char *msg = resp_str.c_str();
+
+
+  mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+            "Content-Length: %lu\r\n"
+            "Content-Type: text/plain\r\n"
+            "Connection: close\r\n\r\n",
+            resp_str.size());
+
+  mg_write(conn, msg, resp_str.size());
+
+  return 200;
+}
+
+void init_webserver() {
+  mg_init_library(0);
+}
+
+void start_webserver(std::string &port, std::string &rooturi, const std::string &response, struct mg_callbacks *callbacks, std::string &cert) {
+
+  std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl;
+  resp_str = response;
+  const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", cert.c_str(), "ssl_protocol_version", "3", "ssl_cipher_list",
+      "ECDHE-RSA-AES256-GCM-SHA384:DES-CBC3-SHA:AES128-SHA:AES128-GCM-SHA256", 0 };
+
+  if (!mg_check_feature(2)) {
+    std::cerr << "Error: Embedded example built with SSL support, " << "but civetweb library build without" << std::endl;
+    exit(1);
+  }
+
+  ctx = mg_start(callbacks, 0, options);
+  if (ctx == nullptr) {
+    std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl;
+    exit(1);
+  }
+
+  mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str);
+
+}
+
+void start_webserver(std::string &port, std::string &rooturi, const std::string &response) {
+
+  std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl;
+  resp_str = response;
+
+  const char *options[] = { "listening_ports", port.c_str(), 0 };
+  ctx = mg_start(nullptr, 0, options);
+
+  if (ctx == nullptr) {
+    std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl;
+    exit(1);
+  }
+
+  mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str);
+
+}
+
+bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
+  regex_t regex;
+
+  const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
+
+  int ret = regcomp(&regex, regexstr, REG_EXTENDED);
+  if (ret) {
+    return false;
+  }
+
+  size_t potentialGroups = regex.re_nsub + 1;
+  regmatch_t groups[potentialGroups];
+  if (regexec(&regex, url.c_str(), potentialGroups, groups, 0) == 0) {
+    for (int i = 0; i < potentialGroups; i++) {
+      if (groups[i].rm_so == -1)
+        break;
+
+      std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so);
+      switch (i) {
+        case 1:
+          scheme = str;
+          break;
+        case 3:
+          port = str;
+          break;
+        case 4:
+          path = str;
+          break;
+        default:
+          break;
+      }
+    }
+  }
+  if (path.empty() || scheme.empty() || port.empty())
+    return false;
+
+  regfree(&regex);
+
+  return true;
+
+}
+
+static void stop_webserver() {
+  /* Stop the server */
+  mg_stop(ctx);
+
+  /* Un-initialize the library */
+  mg_exit_library();
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
index 3f27b66..15720eb 100644
--- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
+++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
@@ -80,18 +80,21 @@ int main(int argc, char **argv) {
   configuration->set(minifi::Configure::nifi_default_directory, key_dir);
 
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(configuration);
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
   std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
-  DEFAULT_ROOT_GROUP_NAME,
+                                                                                                content_repo,
+                                                                                                DEFAULT_ROOT_GROUP_NAME,
                                                                                                 true);
 
   disabled = false;
   std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
 
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/HttpConfigurationListenerTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpConfigurationListenerTest.cpp b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
index a86b884..b559f41 100644
--- a/libminifi/test/integration/HttpConfigurationListenerTest.cpp
+++ b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
@@ -46,7 +46,7 @@ void waitToVerifyProcessor() {
   std::this_thread::sleep_for(std::chrono::seconds(10));
 }
 
-class ConfigHandler: public CivetHandler {
+class ConfigHandler : public CivetHandler {
  public:
   bool handleGet(CivetServer *server, struct mg_connection *conn) {
     std::ifstream myfile(test_file_location_.c_str());
@@ -57,8 +57,8 @@ class ConfigHandler: public CivetHandler {
       std::string str = buffer.str();
       myfile.close();
       mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-          "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-          str.length());
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                str.length());
       mg_printf(conn, "%s", str.c_str());
     } else {
       mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
@@ -75,7 +75,7 @@ int main(int argc, char **argv) {
   LogTestController::getInstance().setInfo<minifi::HttpConfigurationListener>();
 
   const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
-  std::vector < std::string > cpp_options;
+  std::vector<std::string> cpp_options;
   for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
     cpp_options.push_back(options[i]);
   }
@@ -89,45 +89,32 @@ int main(int argc, char **argv) {
     h_ex.test_file_location_ = test_file_location = argv[1];
     key_dir = argv[2];
   }
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<
-      minifi::Configure>();
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   configuration->set(minifi::Configure::nifi_default_directory, key_dir);
-  configuration->set(minifi::Configure::nifi_configuration_listener_type,
-      "http");
-  configuration->set(
-      minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec");
-  configuration->set(minifi::Configure::nifi_configuration_listener_http_url,
-      "http://localhost:9090/config");
+  configuration->set(minifi::Configure::nifi_configuration_listener_type, "http");
+  configuration->set(minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec");
+  configuration->set(minifi::Configure::nifi_configuration_listener_http_url, "http://localhost:9090/config");
   mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
-      TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file,
-      test_file_location);
-
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
-      < minifi::io::StreamFactory > (configuration);
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
-      < core::YamlConfiguration
-      > (new core::YamlConfiguration(test_repo, test_repo, stream_factory,
-          configuration, test_file_location));
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
-      < TestRepository > (test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller =
-      std::make_shared < minifi::FlowController
-          > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
-      configuration, test_file_location);
-
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
-      test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup
-      > (ptr.get());
+  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared < minifi::io::StreamFactory > (configuration);
+
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr < core::YamlConfiguration
+      > (new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast < TestRepository > (test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared < minifi::FlowController
+      > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup > (ptr.get());
   ptr.release();
 
   controller->load();