You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/07/27 16:43:43 UTC

[4/6] nifi-minifi-cpp git commit: MINIFI-249: Update prov repo to better abstract deser.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 24ba146..1060830 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -42,7 +42,7 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
 void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   logger_->log_trace("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
   // reference the enable function from serviceNode
-  std::function<bool()> f_ex = [serviceNode] {
+  std::function < bool() > f_ex = [serviceNode] {
     return serviceNode->enable();
   };
   // create a functor that will be submitted to the thread pool.
@@ -55,7 +55,7 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::
 
 void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   // reference the disable function from serviceNode
-  std::function<bool()> f_ex = [serviceNode] {
+  std::function < bool() > f_ex = [serviceNode] {
     return serviceNode->disable();
   };
   // create a functor that will be submitted to the thread pool.
@@ -77,13 +77,15 @@ bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core
   // No need to yield, reset yield expiration to 0
   processor->clearYield();
 
-  if (!hasWorkToDo(processor))
+  if (!hasWorkToDo(processor)) {
     // No work to do, yield
     return true;
-
-  if (hasTooMuchOutGoing(processor))
+  }
+  if (hasTooMuchOutGoing(processor)) {
+    logger_->log_debug("backpressure applied because too much outgoing");
     // need to apply backpressure
     return true;
+  }
 
   processor->incrementActiveTasks();
   try {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
index 7d6e3f3..024bd35 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -726,13 +726,19 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet
     if (ret != 8) {
       return false;
     }
-    if (flowFile->getSize()) {
+    if (flowFile->getSize() > 0) {
       Site2SiteClientProtocol::ReadCallback callback(packet);
       session->read(flowFile, &callback);
       if (flowFile->getSize() != packet->_size) {
         return false;
       }
     }
+    if (packet->payload_.length() == 0 && len == 0) {
+      if (flowFile->getResourceClaim() == nullptr)
+        logger_->log_debug("no claim");
+      else
+        logger_->log_debug("Flowfile empty %s", flowFile->getResourceClaim()->getContentFullPath());
+    }
   } else if (packet->payload_.length() > 0) {
     len = packet->payload_.length();
 
@@ -1101,8 +1107,9 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
 
   Transaction *transaction = NULL;
 
-  if (!flow)
+  if (!flow) {
     return;
+  }
 
   if (_peerState != READY) {
     bootstrap();
@@ -1158,11 +1165,15 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
     }  // while true
 
     if (!confirm(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+      std::stringstream ss;
+      ss << "Confirm Failed for " << transactionID;
+      throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
       return;
     }
     if (!complete(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+      std::stringstream ss;
+      ss << "Complete Failed for " << transactionID;
+      throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
       return;
     }
     logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 46a4710..7b4ce85 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -36,7 +36,7 @@ namespace nifi {
 namespace minifi {
 
 void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   admin_yield_duration_ = 0;
   std::string yieldValue;
@@ -68,8 +68,8 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
   }
 
   core::ProcessorNode processor_node(processor);
-  auto processContext = std::make_shared<core::ProcessContext>(processor_node, controller_service_provider_, repo_);
-  auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext.get());
+  auto processContext = std::make_shared < core::ProcessContext > (processor_node, controller_service_provider_, repo_, flow_repo_, content_repo_);
+  auto sessionFactory = std::make_shared < core::ProcessSessionFactory > (processContext.get());
 
   processor->onSchedule(processContext.get(), sessionFactory.get());
 
@@ -89,7 +89,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
 }
 
 void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
   logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str());
 
   if (processor->getScheduledState() != core::RUNNING) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index a9450f6..73c9e35 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -35,7 +35,7 @@ void SSLContextService::initialize() {
   if (initialized_)
     return;
 
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  std::lock_guard < std::mutex > lock(initialization_mutex_);
 
   ControllerService::initialize();
 
@@ -75,31 +75,31 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
   if (retp == 0) {
     logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
   }
-  return std::unique_ptr<SSLContext>(new SSLContext(ctx));
+  return std::unique_ptr < SSLContext > (new SSLContext(ctx));
 }
 
 const std::string &SSLContextService::getCertificateFile() {
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  std::lock_guard < std::mutex > lock(initialization_mutex_);
   return certificate;
 }
 
 const std::string &SSLContextService::getPassphrase() {
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  std::lock_guard < std::mutex > lock(initialization_mutex_);
   return passphrase_;
 }
 
 const std::string &SSLContextService::getPassphraseFile() {
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  std::lock_guard < std::mutex > lock(initialization_mutex_);
   return passphrase_file_;
 }
 
 const std::string &SSLContextService::getPrivateKeyFile() {
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  std::lock_guard < std::mutex > lock(initialization_mutex_);
   return private_key_;
 }
 
 const std::string &SSLContextService::getCACertificate() {
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  std::lock_guard < std::mutex > lock(initialization_mutex_);
   return ca_certificate_;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp
index 9bead0e..fbd46f6 100644
--- a/libminifi/src/core/ClassLoader.cpp
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -43,7 +43,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
     logger_->log_error("Cannot load library: %s", dlerror());
     return RESOURCE_FAILURE;
   } else {
-    std::lock_guard<std::mutex> lock(internal_mutex_);
+    std::lock_guard < std::mutex > lock(internal_mutex_);
     dl_handles_.push_back(resource_ptr);
   }
 
@@ -60,9 +60,9 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
 
   ObjectFactory *factory = create_factory_func();
 
-  std::lock_guard<std::mutex> lock(internal_mutex_);
+  std::lock_guard < std::mutex > lock(internal_mutex_);
 
-  loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(factory);
+  loaded_factories_[factory->getClassName()] = std::unique_ptr < ObjectFactory > (factory);
 
   return RESOURCE_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index f5247ac..62a08db 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -29,6 +29,7 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+
 ConfigurableComponent::ConfigurableComponent()
     : logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) {
 }
@@ -42,7 +43,7 @@ ConfigurableComponent::~ConfigurableComponent() {
 }
 
 bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) {
-  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  std::lock_guard < std::mutex > lock(configuration_mutex_);
 
   auto &&it = properties_.find(name);
 
@@ -61,7 +62,7 @@ bool ConfigurableComponent::getProperty(const std::string &name, Property &prop)
  * @return result of getting property.
  */
 bool ConfigurableComponent::getProperty(const std::string name, std::string &value) {
-  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  std::lock_guard < std::mutex > lock(configuration_mutex_);
 
   auto &&it = properties_.find(name);
   if (it != properties_.end()) {
@@ -80,7 +81,7 @@ bool ConfigurableComponent::getProperty(const std::string name, std::string &val
  * @return result of setting property.
  */
 bool ConfigurableComponent::setProperty(const std::string name, std::string value) {
-  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  std::lock_guard < std::mutex > lock(configuration_mutex_);
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
@@ -101,7 +102,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu
  * @return result of setting property.
  */
 bool ConfigurableComponent::updateProperty(const std::string &name, const std::string &value) {
-  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  std::lock_guard < std::mutex > lock(configuration_mutex_);
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
@@ -122,7 +123,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s
  * @return whether property was set or not
  */
 bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
-  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  std::lock_guard < std::mutex > lock(configuration_mutex_);
   auto it = properties_.find(prop.getName());
 
   if (it != properties_.end()) {
@@ -150,7 +151,7 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties
     return false;
   }
 
-  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  std::lock_guard < std::mutex > lock(configuration_mutex_);
 
   properties_.clear();
   for (auto item : properties) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index ea2ed5c..0a0e911 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -39,7 +39,8 @@ namespace core {
 class YamlConfiguration;
 #endif
 
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
+                                                                 std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
                                                                  std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path,
                                                                  bool fail_safe) {
   std::string class_name_lc = configuration_class_name;
@@ -47,22 +48,23 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr
   try {
     if (class_name_lc == "flowconfiguration") {
       // load the base configuration.
-      return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
+
+      return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
 
     } else if (class_name_lc == "yamlconfiguration") {
       // only load if the class is defined.
-      return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, stream_factory, configure, path));
+      return std::unique_ptr < core::FlowConfiguration > (instantiate<core::YamlConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path));
 
     } else {
       if (fail_safe) {
-        return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
+        return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
       } else {
         throw std::runtime_error("Support for the provided configuration class could not be found");
       }
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
+      return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index cf01f0c..9c3b26a 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -53,7 +53,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio
     return false;
   }
 
-  std::lock_guard<std::mutex> lock(relationship_mutex_);
+  std::lock_guard < std::mutex > lock(relationship_mutex_);
 
   relationships_.clear();
   for (auto item : relationships) {
@@ -67,7 +67,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio
 bool Connectable::isSupportedRelationship(core::Relationship relationship) {
   const bool requiresLock = isRunning();
 
-  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
+  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_);
 
   const auto &it = relationships_.find(relationship.getName());
   if (it != relationships_.end()) {
@@ -83,7 +83,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation
     return false;
   }
 
-  std::lock_guard<std::mutex> lock(relationship_mutex_);
+  std::lock_guard < std::mutex > lock(relationship_mutex_);
 
   auto_terminated_relationships_.clear();
   for (auto item : relationships) {
@@ -97,7 +97,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation
 bool Connectable::isAutoTerminated(core::Relationship relationship) {
   const bool requiresLock = isRunning();
 
-  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
+  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_);
 
   const auto &it = auto_terminated_relationships_.find(relationship.getName());
   if (it != auto_terminated_relationships_.end()) {
@@ -111,7 +111,7 @@ void Connectable::waitForWork(uint64_t timeoutMs) {
   has_work_.store(isWorkAvailable());
 
   if (!has_work_.load()) {
-    std::unique_lock<std::mutex> lock(work_available_mutex_);
+    std::unique_lock < std::mutex > lock(work_available_mutex_);
     work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] {return has_work_.load();});
   }
 }
@@ -143,7 +143,7 @@ std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(std::
 }
 
 std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() {
-  std::lock_guard<std::mutex> lock(relationship_mutex_);
+  std::lock_guard < std::mutex > lock(relationship_mutex_);
 
   if (_incomingConnections.size() == 0)
     return NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Core.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index 304d4ce..995c001 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -35,6 +35,11 @@ void CoreComponent::setUUID(uuid_t uuid) {
   uuid_unparse_lower(uuid_, uuidStr);
   uuidStr_ = uuidStr;
 }
+
+void CoreComponent::setUUIDStr(const std::string uuidStr) {
+  uuid_parse(uuidStr.c_str(), uuid_);
+  uuidStr_ = uuidStr;
+}
 // Get UUID
 bool CoreComponent::getUUID(uuid_t uuid) {
   if (uuid) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index c32add6..e8e7462 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -35,7 +35,7 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string
   if (nullptr == ptr) {
     logger_->log_error("No Processor defined for %s", name.c_str());
   }
-  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast < core::Processor > (ptr);
 
   // initialize the processor
   processor->initialize();
@@ -53,18 +53,16 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask()
   return processor;
 }
 
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(
-    std::string name, uuid_t uuid, int version) {
-  return std::unique_ptr<core::ProcessGroup>(
-      new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) {
+  return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
 }
 
 std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {
-  return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+  return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
 }
 
 std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) {
-  return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid);
+  return std::make_shared < minifi::Connection > (flow_file_repo_, content_repo_, name, uuid);
 }
 
 std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index d9057c5..6afd0fe 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -47,7 +47,7 @@ FlowFile::FlowFile()
   entry_date_ = getTimeMillis();
   lineage_start_date_ = entry_date_;
 
-  char uuidStr[37];
+  char uuidStr[37] = { 0 };
 
   // Generate the global UUID for the flow record
   id_generator_->generate(uuid_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 2cf3db0..db0fe08 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -39,8 +39,7 @@ namespace core {
 
 std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
 
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version,
-                           ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent)
     : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
       name_(name),
       type_(type),
@@ -55,7 +54,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
   yield_period_msec_ = 0;
   transmitting_ = false;
 
-  logger_->log_info("ProcessGroup %s created", name_.c_str());
+  logger_->log_info("ProcessGroup %s created", name_);
 }
 
 ProcessGroup::~ProcessGroup() {
@@ -70,12 +69,12 @@ ProcessGroup::~ProcessGroup() {
 }
 
 bool ProcessGroup::isRootProcessGroup() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
   return (type_ == ROOT_PROCESS_GROUP);
 }
 
 void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   if (processors_.find(processor) == processors_.end()) {
     // We do not have the same processor in this process group yet
@@ -85,7 +84,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
 }
 
 void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   if (processors_.find(processor) != processors_.end()) {
     // We do have the same processor in this process group yet
@@ -95,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
 }
 
 void ProcessGroup::addProcessGroup(ProcessGroup *child) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   if (child_process_groups_.find(child) == child_process_groups_.end()) {
     // We do not have the same child process group in this process group yet
@@ -105,7 +104,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
 }
 
 void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   if (child_process_groups_.find(child) != child_process_groups_.end()) {
     // We do have the same child process group in this process group yet
@@ -115,7 +114,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
 }
 
 void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   try {
     // Start all the processor node, input and output ports
@@ -143,7 +142,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev
 }
 
 void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   try {
     // Stop all the processor node, input and output ports
@@ -169,7 +168,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve
 }
 
 std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
     logger_->log_info("find processor %s", processor->getName().c_str());
@@ -209,7 +208,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr
 }
 
 std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
     logger_->log_debug("Current processor is %s", processor->getName().c_str());
@@ -225,7 +224,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &proces
 }
 
 void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
   for (auto processor : processors_) {
     if (processor->getName() == processorName) {
       processor->setProperty(propertyName, propertyValue);
@@ -247,7 +246,7 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecti
 }
 
 void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   if (connections_.find(connection) == connections_.end()) {
     // We do not have the same connection in this process group yet
@@ -269,7 +268,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
 }
 
 void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard < std::recursive_mutex > lock(mutex_);
 
   if (connections_.find(connection) != connections_.end()) {
     // We do not have the same connection in this process group yet

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index df21a34..c69b361 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -38,19 +38,21 @@ namespace core {
 
 std::shared_ptr<core::FlowFile> ProcessSession::create() {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
 
   _addedFlowFiles[record->getUUIDStr()] = record;
   logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
-  std::string details = process_context_->getProcessorNode().getName() + " creates flow record " + record->getUUIDStr();
-  provenance_report_->create(record, details);
+  std::stringstream details;
+  details << process_context_->getProcessorNode().getName() << " creates flow record " << record->getUUIDStr();
+  provenance_report_->create(record, details.str());
 
   return record;
 }
 
 std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
 
   if (record) {
     _addedFlowFiles[record->getUUIDStr()] = record;
@@ -92,7 +94,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::Flow
 
 std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent) {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
 
   if (record) {
     this->_clonedFlowFiles[record->getUUIDStr()] = record;
@@ -168,26 +170,30 @@ void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
 
 void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
   flow->setAttribute(key, value);
-  std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
-  provenance_report_->modifyAttributes(flow, details);
+  std::stringstream details;
+  details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
+  provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) {
   flow->removeAttribute(key);
-  std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
-  provenance_report_->modifyAttributes(flow, details);
+  std::stringstream details;
+  details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key;
+  provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) {
   flow->setAttribute(key, value);
-  std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
-  provenance_report_->modifyAttributes(flow, details);
+  std::stringstream details;
+  details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
+  provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) {
   flow->removeAttribute(key);
-  std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
-  provenance_report_->modifyAttributes(flow, details);
+  std::stringstream details;
+  details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key;
+  provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
@@ -207,41 +213,41 @@ void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, Relationsh
 }
 
 void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(
-  DEFAULT_CONTENT_DIRECTORY);
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
 
   try {
-    std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-    if (fs.is_open()) {
-      // Call the callback to write the content
-      callback->process(&fs);
-      if (fs.good() && fs.tellp() >= 0) {
-        flow->setSize(fs.tellp());
-        flow->setOffset(0);
-        std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
-        if (flow_claim != nullptr) {
-          // Remove the old claim
-          flow_claim->decreaseFlowFileRecordOwnedCount();
-          flow->clearResourceClaim();
-        }
-        flow->setResourceClaim(claim);
-        claim->increaseFlowFileRecordOwnedCount();
-        /*
-         logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
-         flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-        fs.close();
-        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
-        uint64_t endTime = getTimeMillis();
-        provenance_report_->modifyContent(flow, details, endTime - startTime);
-      } else {
-        fs.close();
-        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
-      }
-    } else {
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    claim->increaseFlowFileRecordOwnedCount();
+//    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    // Call the callback to write the content
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+    if (callback->process(stream) < 0) {
+      rollback();
+      return;
+    }
+
+    flow->setSize(stream->getSize());
+    flow->setOffset(0);
+    std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+    if (flow_claim != nullptr) {
+      // Remove the old claim
+      flow_claim->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
     }
+    flow->setResourceClaim(claim);
+
+    /*
+     logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+     flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+    stream->closeStream();
+    std::stringstream details;
+    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    uint64_t endTime = getTimeMillis();
+    provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
     if (flow && flow->getResourceClaim() == claim) {
       flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -260,39 +266,34 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa
 }
 
 void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
   try {
-    std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-    if (fs.is_open()) {
-      // Call the callback to write the content
-      callback->process(&fs);
-      if (fs.good() && fs.tellp() >= 0) {
-        flow->setSize(fs.tellp());
-        flow->setOffset(0);
-        std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
-        if (flow_claim != nullptr) {
-          // Remove the old claim
-          flow_claim->decreaseFlowFileRecordOwnedCount();
-          flow->clearResourceClaim();
-        }
-        flow->setResourceClaim(claim);
-        claim->increaseFlowFileRecordOwnedCount();
-        /*
-         logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
-         flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-        fs.close();
-        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
-        uint64_t endTime = getTimeMillis();
-        provenance_report_->modifyContent(flow, details, endTime - startTime);
-      } else {
-        fs.close();
-        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
-      }
-    } else {
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    claim->increaseFlowFileRecordOwnedCount();
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+    // Call the callback to write the content
+    if (callback->process(stream) < 0) {
+      rollback();
+      return;
+    }
+    flow->setSize(stream->getSize());
+    flow->setOffset(0);
+    std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+    if (flow_claim != nullptr) {
+      // Remove the old claim
+      flow_claim->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
     }
+    flow->setResourceClaim(claim);
+
+    std::stringstream details;
+    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    uint64_t endTime = getTimeMillis();
+    provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
     if (flow && flow->getResourceClaim() == claim) {
       flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -321,30 +322,25 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStream
   claim = flow->getResourceClaim();
 
   try {
-    std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
-    if (fs.is_open()) {
-      // Call the callback to write the content
-      std::streampos oldPos = fs.tellp();
-      callback->process(&fs);
-      if (fs.good() && fs.tellp() >= 0) {
-        uint64_t appendSize = fs.tellp() - oldPos;
-        flow->setSize(flow->getSize() + appendSize);
-        /*
-         logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
-         flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-        fs.close();
-        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
-        uint64_t endTime = getTimeMillis();
-        provenance_report_->modifyContent(flow, details, endTime - startTime);
-      } else {
-        fs.close();
-        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
-      }
-    } else {
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
-    }
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+    // Call the callback to write the content
+    size_t oldPos = stream->getSize();
+    stream->seek(oldPos + 1);
+    if (callback->process(stream) < 0) {
+      rollback();
+      return;
+    }
+    uint64_t appendSize = stream->getSize() - oldPos;
+    flow->setSize(flow->getSize() + appendSize);
+    std::stringstream details;
+    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    uint64_t endTime = getTimeMillis();
+    provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
@@ -365,30 +361,26 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamC
   claim = flow->getResourceClaim();
 
   try {
-    std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
-    if (fs.is_open()) {
-      // Call the callback to write the content
-      std::streampos oldPos = fs.tellp();
-      callback->process(&fs);
-      if (fs.good() && fs.tellp() >= 0) {
-        uint64_t appendSize = fs.tellp() - oldPos;
-        flow->setSize(flow->getSize() + appendSize);
-        /*
-         logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
-         flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-        fs.close();
-        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
-        uint64_t endTime = getTimeMillis();
-        provenance_report_->modifyContent(flow, details, endTime - startTime);
-      } else {
-        fs.close();
-        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
-      }
-    } else {
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
-    }
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+    // Call the callback to write the content
+    size_t oldPos = stream->getSize();
+    stream->seek(oldPos + 1);
+    if (callback->process(stream) < 0) {
+      rollback();
+      return;
+    }
+    uint64_t appendSize = stream->getSize() - oldPos;
+    flow->setSize(flow->getSize() + appendSize);
+
+    std::stringstream details;
+    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    uint64_t endTime = getTimeMillis();
+    provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
@@ -408,23 +400,19 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCall
     }
 
     claim = flow->getResourceClaim();
-    std::ifstream fs;
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
-    if (fs.is_open()) {
-      fs.seekg(flow->getOffset(), fs.beg);
-
-      if (fs.good()) {
-        callback->process(&fs);
-        /*
-         logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
-         flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-        fs.close();
-      } else {
-        fs.close();
-        throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
-      }
-    } else {
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
+
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+
+    stream->seek(flow->getOffset());
+
+    if (callback->process(stream) < 0) {
+      rollback();
+      return;
     }
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
@@ -445,23 +433,17 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal
     }
 
     claim = flow->getResourceClaim();
-    std::ifstream fs;
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
-    if (fs.is_open()) {
-      fs.seekg(flow->getOffset(), fs.beg);
-
-      if (fs.good()) {
-        callback->process(&fs);
-        /*
-         logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
-         flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-        fs.close();
-      } else {
-        fs.close();
-        throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
-      }
-    } else {
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
+
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+    stream->seek(flow->getOffset());
+
+    if (callback->process(stream) < 0) {
+      rollback();
+      return;
     }
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
@@ -479,60 +461,55 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal
  *
  */
 void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
-
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
   int max_read = getpagesize();
   std::vector<uint8_t> charBuffer;
   charBuffer.resize(max_read);
 
   try {
-    std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-
-    if (fs.is_open()) {
-      size_t position = 0;
-      const size_t max_size = stream.getSize();
-      size_t read_size = max_read;
-      while (position < max_size) {
-        if ((max_size - position) > max_read) {
-          read_size = max_read;
-        } else {
-          read_size = max_size - position;
-        }
-        charBuffer.clear();
-        stream.readData(charBuffer, read_size);
-
-        fs.write((const char*) charBuffer.data(), read_size);
-        position += read_size;
+    claim->increaseFlowFileRecordOwnedCount();
+    std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
+
+    if (nullptr == content_stream) {
+      logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath());
+      rollback();
+      return;
+    }
+    size_t position = 0;
+    const size_t max_size = stream.getSize();
+    size_t read_size = max_read;
+    while (position < max_size) {
+      if ((max_size - position) > max_read) {
+        read_size = max_read;
+      } else {
+        read_size = max_size - position;
       }
-      // Open the source file and stream to the flow file
+      charBuffer.clear();
+      stream.readData(charBuffer, read_size);
 
-      if (fs.good() && fs.tellp() >= 0) {
-        flow->setSize(fs.tellp());
-        flow->setOffset(0);
-        if (flow->getResourceClaim() != nullptr) {
-          // Remove the old claim
-          flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-          flow->clearResourceClaim();
-        }
-        flow->setResourceClaim(claim);
-        claim->increaseFlowFileRecordOwnedCount();
-
-        logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
-                           flow->getUUIDStr().c_str());
+      content_stream->write(charBuffer.data(), read_size);
+      position += read_size;
+    }
+    // Open the source file and stream to the flow file
 
-        fs.close();
-        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
-        uint64_t endTime = getTimeMillis();
-        provenance_report_->modifyContent(flow, details, endTime - startTime);
-      } else {
-        fs.close();
-        throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
-      }
-    } else {
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+    flow->setSize(content_stream->getSize());
+    flow->setOffset(0);
+    if (flow->getResourceClaim() != nullptr) {
+      // Remove the old claim
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
     }
+    flow->setResourceClaim(claim);
+
+    logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+                       flow->getUUIDStr().c_str());
+
+    content_stream->closeStream();
+    std::stringstream details;
+    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    uint64_t endTime = getTimeMillis();
+    provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
     if (flow && flow->getResourceClaim() == claim) {
       flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -550,34 +527,44 @@ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::Fl
   }
 }
 
-void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow,
-bool keepSource,
-                            uint64_t offset) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
   char *buf = NULL;
   int size = 4096;
   buf = new char[size];
 
   try {
-    std::ofstream fs;
+    //  std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-
-    if (fs.is_open() && input.is_open()) {
+    claim->increaseFlowFileRecordOwnedCount();
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+    if (input.is_open()) {
       // Open the source file and stream to the flow file
-      input.seekg(offset, fs.beg);
+      input.seekg(offset);
+      bool invalidWrite = false;
       while (input.good()) {
         input.read(buf, size);
-        if (input)
-          fs.write(buf, size);
-        else
-          fs.write(buf, input.gcount());
+        if (input) {
+          if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+            invalidWrite = true;
+            break;
+          }
+        } else {
+          if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+            invalidWrite = true;
+            break;
+          }
+        }
       }
 
-      if (fs.good() && fs.tellp() >= 0) {
-        flow->setSize(fs.tellp());
+      if (!invalidWrite) {
+        flow->setSize(stream->getSize());
         flow->setOffset(0);
         if (flow->getResourceClaim() != nullptr) {
           // Remove the old claim
@@ -585,20 +572,20 @@ bool keepSource,
           flow->clearResourceClaim();
         }
         flow->setResourceClaim(claim);
-        claim->increaseFlowFileRecordOwnedCount();
 
         logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
                            flow->getUUIDStr().c_str());
 
-        fs.close();
+        stream->closeStream();
         input.close();
         if (!keepSource)
           std::remove(source.c_str());
-        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
+        std::stringstream details;
+        details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
-        provenance_report_->modifyContent(flow, details, endTime - startTime);
+        provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
       } else {
-        fs.close();
+        stream->closeStream();
         input.close();
         throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
       }
@@ -626,8 +613,7 @@ bool keepSource,
   }
 }
 
-void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows,
-  bool keepSource, uint64_t offset, char inputDelimiter) {
+void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, bool keepSource, uint64_t offset, char inputDelimiter) {
   std::shared_ptr<ResourceClaim> claim;
 
   std::shared_ptr<FlowFileRecord> flowFile;
@@ -639,48 +625,61 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
   try {
     // Open the input file and seek to the appropriate location.
     std::ifstream input;
+    logger_->log_debug("Opening %s", source);
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
     if (input.is_open()) {
       input.seekg(offset, input.beg);
       while (input.good()) {
+        bool invalidWrite = false;
         flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-        claim = std::make_shared<ResourceClaim>();
+        claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
         uint64_t startTime = getTimeMillis();
         input.getline(buf, size, inputDelimiter);
-        std::ofstream fs;
-        fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-
-        if (fs.is_open()) {
-          if (input)
-            fs.write(buf, strlen(buf));
-          else
-            fs.write(buf, input.gcount());
-
-          if (fs.good() && fs.tellp() >= 0) {
-            flowFile->setSize(fs.tellp());
-            flowFile->setOffset(0);
-            if (flowFile->getResourceClaim() != nullptr) {
-              // Remove the old claim
-              flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-              flowFile->clearResourceClaim();
-            }
-            flowFile->setResourceClaim(claim);
-            claim->increaseFlowFileRecordOwnedCount();
 
-            logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(),
-                               flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(),
-                               flowFile->getUUIDStr().c_str());
+        std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+        if (nullptr == stream) {
+          logger_->log_debug("Stream is null");
+          rollback();
+          return;
+        }
 
-            fs.close();
-            std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr();
-            uint64_t endTime = getTimeMillis();
-            provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-            flows.push_back(flowFile);
+        if (input) {
+          if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+            invalidWrite = true;
+            break;
+          }
+        } else {
+          if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+            invalidWrite = true;
+            break;
+          }
+        }
 
-          } else {
-            fs.close();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        if (!invalidWrite) {
+          flowFile->setSize(stream->getSize());
+          flowFile->setOffset(0);
+          if (flowFile->getResourceClaim() != nullptr) {
+            // Remove the old claim
+            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+            flowFile->clearResourceClaim();
           }
+          flowFile->setResourceClaim(claim);
+          claim->increaseFlowFileRecordOwnedCount();
+
+          logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(),
+                             flowFile->getSize(),
+                             flowFile->getResourceClaim()->getContentFullPath().c_str(),
+                             flowFile->getUUIDStr().c_str());
+
+          stream->closeStream();
+          std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr();
+          uint64_t endTime = getTimeMillis();
+          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+          flows.push_back(flowFile);
+        } else {
+          logger_->log_debug("Error while writing");
+          stream->closeStream();
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
         }
       }
       input.close();
@@ -711,35 +710,44 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
   }
 }
 
-void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
-bool keepSource,
-                            uint64_t offset) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
-
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow, bool keepSource, uint64_t offset) {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
   char *buf = NULL;
   int size = 4096;
   buf = new char[size];
 
   try {
-    std::ofstream fs;
+    //  std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-
-    if (fs.is_open() && input.is_open()) {
+    claim->increaseFlowFileRecordOwnedCount();
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    if (nullptr == stream) {
+      rollback();
+      return;
+    }
+    if (input.is_open()) {
       // Open the source file and stream to the flow file
-      input.seekg(offset, fs.beg);
+      input.seekg(offset);
+      int sizeWritten = 0;
+      bool invalidWrite = false;
       while (input.good()) {
         input.read(buf, size);
-        if (input)
-          fs.write(buf, size);
-        else
-          fs.write(buf, input.gcount());
+        if (input) {
+          if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+            invalidWrite = true;
+            break;
+          }
+        } else {
+          if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+            invalidWrite = true;
+            break;
+          }
+        }
       }
-
-      if (fs.good() && fs.tellp() >= 0) {
-        flow->setSize(fs.tellp());
+      if (!invalidWrite) {
+        flow->setSize(stream->getSize());
         flow->setOffset(0);
         if (flow->getResourceClaim() != nullptr) {
           // Remove the old claim
@@ -747,20 +755,20 @@ bool keepSource,
           flow->clearResourceClaim();
         }
         flow->setResourceClaim(claim);
-        claim->increaseFlowFileRecordOwnedCount();
 
         logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
                            flow->getUUIDStr().c_str());
 
-        fs.close();
+        stream->closeStream();
         input.close();
         if (!keepSource)
           std::remove(source.c_str());
-        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
+        std::stringstream details;
+        details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
-        provenance_report_->modifyContent(flow, details, endTime - startTime);
+        provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
       } else {
-        fs.close();
+        stream->closeStream();
         input.close();
         throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
       }
@@ -834,7 +842,7 @@ void ProcessSession::commit() {
       }
     }
 
-    // Do the samething for added flow file
+    // Do the same thing for added flow file
     for (const auto it : _addedFlowFiles) {
       std::shared_ptr<core::FlowFile> record = it.second;
       if (record->isDeleted())
@@ -851,6 +859,7 @@ void ProcessSession::commit() {
             std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
             throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
           } else {
+            logger_->log_debug("added flow file is auto terminated");
             // Autoterminated
             remove(record);
           }
@@ -947,7 +956,7 @@ void ProcessSession::rollback() {
     _addedFlowFiles.clear();
     _updatedFlowFiles.clear();
     _deletedFlowFiles.clear();
-    logger_->log_trace("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str());
+    logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str());
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
@@ -960,8 +969,10 @@ void ProcessSession::rollback() {
 std::shared_ptr<core::FlowFile> ProcessSession::get() {
   std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection();
 
-  if (first == NULL)
+  if (first == NULL) {
+    logger_->log_debug("Get is null for %s", process_context_->getProcessorNode().getName());
     return NULL;
+  }
 
   std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(first);
 
@@ -972,8 +983,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       // Remove expired flow record
       for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) {
         std::shared_ptr<core::FlowFile> record = *it;
-        std::string details = process_context_->getProcessorNode().getName() + " expire flow record " + record->getUUIDStr();
-        provenance_report_->expire(record, details);
+        std::stringstream details;
+        details << process_context_->getProcessorNode().getName() << " expire flow record " << record->getUUIDStr();
+        provenance_report_->expire(record, details.str());
       }
     }
     if (ret) {
@@ -981,10 +993,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       ret->setDeleted(false);
       _updatedFlowFiles[ret->getUUIDStr()] = ret;
       std::map<std::string, std::string> empty;
-      std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+      std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
       logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
       snapshot = ret;
-//      snapshot->duplicate(ret);
       // save a snapshot
       _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
       return ret;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp
index 31b7481..570d895 100644
--- a/libminifi/src/core/ProcessSessionFactory.cpp
+++ b/libminifi/src/core/ProcessSessionFactory.cpp
@@ -28,7 +28,7 @@ namespace minifi {
 namespace core {
 
 std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() {
-  return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_));
+  return std::unique_ptr < ProcessSession > (new ProcessSession(process_context_));
 }
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 7b07638..0c2e7cf 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -62,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
   active_tasks_ = 0;
   yield_expiration_ = 0;
   incoming_connections_Iter = this->_incomingConnections.begin();
-  logger_->log_info("Processor %s created UUID %s", name_.c_str(), uuidStr_.c_str());
+  logger_->log_info("Processor %s created UUID %s", name_, uuidStr_);
 }
 
 bool Processor::isRunning() {
@@ -80,8 +80,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
     logger_->log_info("Can not add connection while the process %s is running", name_.c_str());
     return false;
   }
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
@@ -141,12 +141,12 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     return;
   }
 
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
 
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
@@ -178,13 +178,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
 }
 
 bool Processor::flowFilesQueued() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   if (_incomingConnections.size() == 0)
     return false;
 
   for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+    std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
     if (connection->getQueueSize() > 0)
       return true;
   }
@@ -193,13 +193,13 @@ bool Processor::flowFilesQueued() {
 }
 
 bool Processor::flowFilesOutGoingFull() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   for (auto &&connection : out_going_connections_) {
     // We already has connection for this relationship
     std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
     for (const auto conn : existedConnection) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+      std::shared_ptr < Connection > connection = std::static_pointer_cast < Connection > (conn);
       if (connection->isFull())
         return true;
     }
@@ -232,7 +232,7 @@ bool Processor::isWorkAvailable() {
 
   try {
     for (const auto &conn : _incomingConnections) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
       if (connection->getQueueSize() > 0) {
         hasWork = true;
         break;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index 50e8cd2..cf26a0d 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -19,13 +19,14 @@
 #include <arpa/inet.h>
 #include <cstdint>
 #include <vector>
+
+#include "../../include/core/repository/FlowFileRepository.h"
 #include "io/DataStream.h"
 #include "io/Serializable.h"
 #include "core/Relationship.h"
 #include "core/logging/Logger.h"
 #include "FlowController.h"
 #include "provenance/Provenance.h"
-#include "core/repository/FlowFileRepository.h"
 
 namespace org {
 namespace apache {
@@ -38,9 +39,8 @@ void Repository::start() {
     return;
   if (running_)
     return;
-  thread_ = std::thread(&Repository::threadExecutor, this);
-  thread_.detach();
   running_ = true;
+  thread_ = std::thread(&Repository::threadExecutor, this);
   logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index cf18601..9e99718 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -18,13 +18,17 @@
 #include <memory>
 #include <string>
 #include <algorithm>
+#include "core/ContentRepository.h"
+#include "core/repository/FileSystemRepository.h"
+#include "core/repository/VolatileContentRepository.h"
 #include "core/Repository.h"
 #ifdef LEVELDB_SUPPORT
 #include "core/repository/FlowFileRepository.h"
 #include "provenance/ProvenanceRepository.h"
 #endif
 
-#include "core/repository/VolatileRepository.h"
+#include "core/repository/VolatileProvenanceRepository.h"
+#include "core/repository/VolatileFlowFileRepository.h"
 
 namespace org {
 namespace apache {
@@ -48,14 +52,14 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati
   try {
     std::shared_ptr<core::Repository> return_obj = nullptr;
     if (class_name_lc == "flowfilerepository") {
-      std::cout << "creating flow" << std::endl;
       return_obj = instantiate<core::repository::FlowFileRepository>(repo_name);
     } else if (class_name_lc == "provenancerepository") {
       return_obj = instantiate<provenance::ProvenanceRepository>(repo_name);
-    } else if (class_name_lc == "volatilerepository") {
-      return_obj = instantiate<repository::VolatileRepository>(repo_name);
+    } else if (class_name_lc == "volatileflowfilerepository") {
+      return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "volatileprovenancefilerepository") {
+      return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name);
     } else if (class_name_lc == "nooprepository") {
-      std::cout << "creating noop" << std::endl;
       return_obj = instantiate<core::Repository>(repo_name);
     }
 
@@ -63,13 +67,42 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati
       return return_obj;
     }
     if (fail_safe) {
-      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
+      return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1);
     } else {
       throw std::runtime_error("Support for the provided configuration class could not be found");
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
+      return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1);
+    }
+  }
+
+  throw std::runtime_error("Support for the provided configuration class could not be found");
+}
+
+std::shared_ptr<core::ContentRepository> createContentRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
+  std::shared_ptr<core::ContentRepository> return_obj = nullptr;
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
+  try {
+    std::shared_ptr<core::ContentRepository> return_obj = nullptr;
+    if (class_name_lc == "volatilecontentrepository") {
+      return_obj = instantiate<core::repository::VolatileContentRepository>(repo_name);
+    } else {
+      return_obj = instantiate<core::repository::FileSystemRepository>(repo_name);
+    }
+
+    if (return_obj) {
+      return return_obj;
+    }
+    if (fail_safe) {
+      return std::make_shared < core::repository::FileSystemRepository > ("fail_safe");
+    } else {
+      throw std::runtime_error("Support for the provided configuration class could not be found");
+    }
+  } catch (const std::runtime_error &r) {
+    if (fail_safe) {
+      return std::make_shared < core::repository::FileSystemRepository > ("fail_safe");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/controller/StandardControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 5c4aa70..69004c1 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -27,12 +27,12 @@ namespace minifi {
 namespace core {
 namespace controller {
 std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGroup() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
   return process_group_;
 }
 
 void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
   process_group_ = processGroup;
 }
 
@@ -45,7 +45,7 @@ bool StandardControllerServiceNode::enable() {
     for (auto linked_service : property.getValues()) {
       std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service);
       if (nullptr != csNode) {
-        std::lock_guard<std::mutex> lock(mutex_);
+        std::lock_guard < std::mutex > lock(mutex_);
         linked_controller_services_.push_back(csNode);
       }
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/logging/LoggerConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index c06239b..4b97055 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -56,19 +56,19 @@ std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &t
 LoggerConfiguration::LoggerConfiguration()
     : root_namespace_(create_default_root()),
       loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
-      formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
-  logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
+      formatter_(std::make_shared < spdlog::pattern_formatter > (spdlog_default_pattern)) {
+  logger_ = std::shared_ptr < LoggerImpl > (new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
   loggers.push_back(logger_);
 }
 
 void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
-  std::lock_guard<std::mutex> lock(mutex);
+  std::lock_guard < std::mutex > lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
   std::string spdlog_pattern;
   if (!logger_properties->get("spdlog.pattern", spdlog_pattern)) {
     spdlog_pattern = spdlog_default_pattern;
   }
-  formatter_ = std::make_shared<spdlog::pattern_formatter>(spdlog_pattern);
+  formatter_ = std::make_shared < spdlog::pattern_formatter > (spdlog_pattern);
   std::map<std::string, std::shared_ptr<spdlog::logger>> spdloggers;
   for (auto const & logger_impl : loggers) {
     std::shared_ptr<spdlog::logger> spdlogger;
@@ -85,8 +85,8 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo
 }
 
 std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) {
-  std::lock_guard<std::mutex> lock(mutex);
-  std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, get_logger(logger_, root_namespace_, name, formatter_));
+  std::lock_guard < std::mutex > lock(mutex);
+  std::shared_ptr<LoggerImpl> result = std::make_shared < LoggerImpl > (name, get_logger(logger_, root_namespace_, name, formatter_));
   loggers.push_back(result);
   return result;
 }
@@ -130,7 +130,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names
         } catch (const std::out_of_range &oor) {
         }
       }
-      sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files);
+      sink_map[appender_name] = std::make_shared < spdlog::sinks::rotating_file_sink_mt > (file_name, max_file_size, max_files);
     } else if ("stdout" == appender_type) {
       sink_map[appender_name] = spdlog::sinks::stdout_sink_mt::instance();
     } else {
@@ -227,7 +227,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
   if (logger != nullptr) {
     logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, spdlog::level::level_names[level], level_namespace_str);
   }
-  spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks));
+  spdlogger = std::make_shared < spdlog::logger > (name, begin(sinks), end(sinks));
   spdlogger->set_level(level);
   spdlogger->set_formatter(formatter);
   spdlogger->flush_on(std::max(spdlog::level::info, current_namespace->level));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index 02ddb52..d4059d6 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -24,8 +24,10 @@
 #include <string>
 #include <memory>
 #include <sstream>
+#include <functional>
 #include <iostream>
 #include <utility>
+#include "core/Repository.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "../include/io/StreamFactory.h"
 #include "io/ClientSocket.h"
@@ -51,10 +53,14 @@ void SiteToSiteProvenanceReportingTask::initialize() {
   RemoteProcessorGroupPort::initialize();
 }
 
-void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<core::SerializableComponent>> &records,
                                                       std::string &report) {
   Json::Value array;
-  for (auto record : records) {
+  for (auto sercomp : records) {
+    std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast < provenance::ProvenanceEventRecord > (sercomp);
+    if (nullptr == record) {
+      break;
+    }
     Json::Value recordJson;
     Json::Value updatedAttributesJson;
     Json::Value parentUuidJson;
@@ -108,23 +114,32 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
     return;
   }
 
+  logger_->log_debug("SiteToSiteProvenanceReportingTask -- onTrigger");
+
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor());
     logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec());
     returnProtocol(std::move(protocol_));
     return;
   }
 
-  std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> records;
-  std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast<provenance::ProvenanceRepository>(context->getProvenanceRepository());
-  repo->getProvenanceRecord(records, batch_size_);
-  if (records.size() <= 0) {
+  std::vector<std::shared_ptr<core::SerializableComponent>> records;
+
+  logger_->log_debug("batch size %d records", batch_size_);
+  size_t deserialized = batch_size_;
+  std::shared_ptr<core::Repository> repo = context->getProvenanceRepository();
+  std::function < std::shared_ptr<core::SerializableComponent>() > constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();};
+  if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) {
+    logger_->log_debug("Not sending because deserialized is %d", deserialized);
     returnProtocol(std::move(protocol_));
     return;
   }
 
+  logger_->log_debug("batch size %d records", batch_size_, deserialized);
+
+  logger_->log_debug("Captured %d records", deserialized);
   std::string jsonStr;
   this->getJsonReport(context, session, records, jsonStr);
   if (jsonStr.length() <= 0) {
@@ -141,7 +156,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
   }
 
   // we transfer the record, purge the record from DB
-  repo->purgeProvenanceRecord(records);
+  repo->Delete(records);
   returnProtocol(std::move(protocol_));
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/FileSystemRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
new file mode 100644
index 0000000..fba1fe3
--- /dev/null
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/repository/FileSystemRepository.h"
+#include <memory>
+#include "io/FileStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) {
+  return true;
+}
+void FileSystemRepository::stop() {
+}
+
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  return std::make_shared<io::FileStream>(claim->getContentFullPath());
+}
+
+std::shared_ptr<io::BaseStream> FileSystemRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  return std::make_shared<io::FileStream>(claim->getContentFullPath(), 0, false);
+}
+
+bool FileSystemRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  std::remove(claim->getContentFullPath().c_str());
+  return true;
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */