You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:10 UTC

[05/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces and removes use of raw pointers for user facing API.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
new file mode 100644
index 0000000..e5703d1
--- /dev/null
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ConfigurableComponent.h"
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ConfigurableComponent::ConfigurableComponent(std::shared_ptr<logging::Logger> logger)
+    : logger_(logger) {
+
+}
+
+ConfigurableComponent::ConfigurableComponent(
+    const ConfigurableComponent &&other)
+    : properties_(std::move(other.properties_)),
+      logger_(std::move(other.logger_)) {
+
+}
+ConfigurableComponent::~ConfigurableComponent() {
+
+}
+
+/**
+ * Get property using the provided name.
+ * @param name property name.
+ * @param value value passed in by reference
+ * @return result of getting property.
+ */
+bool ConfigurableComponent::getProperty(const std::string name,
+                                        std::string &value) {
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+  auto &&it = properties_.find(name);
+
+  if (it != properties_.end()) {
+    Property item = it->second;
+    value = item.getValue();
+    logger_->log_info("Processor %s property name %s value %s", name.c_str(),
+                      item.getName().c_str(), value.c_str());
+    return true;
+  } else {
+    return false;
+  }
+}
+/**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return result of setting property.
+ */
+bool ConfigurableComponent::setProperty(const std::string name,
+                                        std::string value) {
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  auto &&it = properties_.find(name);
+
+  if (it != properties_.end()) {
+    Property item = it->second;
+    item.setValue(value);
+    properties_[item.getName()] = item;
+    logger_->log_info("Component %s property name %s value %s", name.c_str(),
+                      item.getName().c_str(), value.c_str());
+    return true;
+  } else {
+    return false;
+  }
+}
+
+/**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return whether property was set or not
+ */
+bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  auto it = properties_.find(prop.getName());
+
+  if (it != properties_.end()) {
+    Property item = it->second;
+    item.setValue(value);
+    properties_[item.getName()] = item;
+    logger_->log_info("property name %s value %s", prop.getName().c_str(),
+                      item.getName().c_str(), value.c_str());
+    return true;
+  } else {
+    Property newProp(prop);
+    newProp.setValue(value);
+    properties_.insert(
+        std::pair<std::string, Property>(prop.getName(), newProp));
+    return true;
+
+  }
+  return false;
+}
+
+/**
+ * Sets supported properties for the ConfigurableComponent
+ * @param supported properties
+ * @return result of set operation.
+ */
+bool ConfigurableComponent::setSupportedProperties(
+    std::set<Property> properties) {
+  if (!canEdit()) {
+    return false;
+  }
+
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+  properties_.clear();
+  for (auto item : properties) {
+    properties_[item.getName()] = item;
+  }
+
+  return true;
+}
+
+} /* namespace components */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
new file mode 100644
index 0000000..52bde69
--- /dev/null
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ConfigurationFactory.h"
+#include "core/FlowConfiguration.h"
+#include  <type_traits>
+#ifdef YAML_SUPPORT
+#include "core/yaml/YamlConfiguration.h"
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+#ifndef YAML_SUPPORT
+  class YamlConfiguration;
+#endif
+
+  std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
+      std::shared_ptr<core::Repository> repo,
+      std::shared_ptr<core::Repository> flow_file_repo,
+      const std::string configuration_class_name, const std::string path,
+      bool fail_safe) {
+
+    std::string class_name_lc = configuration_class_name;
+    std::transform(class_name_lc.begin(), class_name_lc.end(),
+                   class_name_lc.begin(), ::tolower);
+    try {
+
+      if (class_name_lc == "flowconfiguration") {
+	// load the base configuration.
+        return std::unique_ptr<core::FlowConfiguration>(
+            new core::FlowConfiguration(repo, flow_file_repo, path));
+	
+      } else if (class_name_lc == "yamlconfiguration") {
+	// only load if the class is defined.
+        return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, path));
+            
+
+      } else {
+        if (fail_safe) {
+          return std::unique_ptr<core::FlowConfiguration>(
+              new core::FlowConfiguration(repo, flow_file_repo, path));
+        } else {
+          throw std::runtime_error(
+              "Support for the provided configuration class could not be found");
+        }
+      }
+    } catch (const std::runtime_error &r) {
+      if (fail_safe) {
+        return std::unique_ptr<core::FlowConfiguration>(
+            new core::FlowConfiguration(repo, flow_file_repo, path));
+      }
+    }
+
+    throw std::runtime_error(
+        "Support for the provided configuration class could not be found");
+  }
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
new file mode 100644
index 0000000..ac61568
--- /dev/null
+++ b/libminifi/src/core/Connectable.cpp
@@ -0,0 +1,174 @@
+/*
+ * Connectable.cpp
+ *
+ *  Created on: Feb 27, 2017
+ *      Author: mparisi
+ */
+
+#include "../../include/core/Connectable.h"
+
+#include <uuid/uuid.h>
+#include "core/logging/Logger.h"
+#include "core/Relationship.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+Connectable::Connectable(std::string name, uuid_t uuid)
+    : CoreComponent(name, uuid),
+      max_concurrent_tasks_(1) {
+
+}
+
+Connectable::Connectable(const Connectable &&other)
+    : CoreComponent(std::move(other)),
+      max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)) {
+  has_work_ = other.has_work_.load();
+  strategy_ = other.strategy_.load();
+}
+
+Connectable::~Connectable() {
+
+}
+
+bool Connectable::setSupportedRelationships(
+    std::set<core::Relationship> relationships) {
+  if (isRunning()) {
+    logger_->log_info(
+        "Can not set processor supported relationship while the process %s is running",
+        name_.c_str());
+    return false;
+  }
+
+  std::lock_guard<std::mutex> lock(relationship_mutex_);
+
+  relationships_.clear();
+  for (auto item : relationships) {
+    relationships_[item.getName()] = item;
+    logger_->log_info("Processor %s supported relationship name %s",
+                      name_.c_str(), item.getName().c_str());
+  }
+
+  return true;
+}
+
+// Whether the relationship is supported
+bool Connectable::isSupportedRelationship(core::Relationship relationship) {
+  const bool requiresLock = isRunning();
+
+  const auto conditionalLock =
+      !requiresLock ?
+          std::unique_lock<std::mutex>() :
+          std::unique_lock<std::mutex>(relationship_mutex_);
+
+  const auto &it = relationships_.find(relationship.getName());
+  if (it != relationships_.end()) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool Connectable::setAutoTerminatedRelationships(
+    std::set<Relationship> relationships) {
+  if (isRunning()) {
+    logger_->log_info(
+        "Can not set processor auto terminated relationship while the process %s is running",
+        name_.c_str());
+    return false;
+  }
+
+  std::lock_guard<std::mutex> lock(relationship_mutex_);
+
+  auto_terminated_relationships_.clear();
+  for (auto item : relationships) {
+    auto_terminated_relationships_[item.getName()] = item;
+    logger_->log_info("Processor %s auto terminated relationship name %s",
+                      name_.c_str(), item.getName().c_str());
+  }
+
+  return true;
+}
+
+// Check whether the relationship is auto terminated
+bool Connectable::isAutoTerminated(core::Relationship relationship) {
+  const bool requiresLock = isRunning();
+
+  const auto conditionalLock =
+      !requiresLock ?
+          std::unique_lock<std::mutex>() :
+          std::unique_lock<std::mutex>(relationship_mutex_);
+
+  const auto &it = auto_terminated_relationships_.find(relationship.getName());
+  if (it != auto_terminated_relationships_.end()) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void Connectable::waitForWork(uint64_t timeoutMs) {
+  has_work_.store(isWorkAvailable());
+
+  if (!has_work_.load()) {
+    std::unique_lock<std::mutex> lock(work_available_mutex_);
+    work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
+                             [&] {return has_work_.load();});
+  }
+
+}
+
+void Connectable::notifyWork() {
+  // Do nothing if we are not event-driven
+  if (strategy_ != EVENT_DRIVEN) {
+    return;
+  }
+
+  {
+    has_work_.store(isWorkAvailable());
+
+    if (has_work_.load()) {
+      work_condition_.notify_one();
+    }
+  }
+
+}
+
+std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(
+    std::string relationship) {
+  std::set<std::shared_ptr<Connectable>> empty;
+
+  auto &&it = _outGoingConnections.find(relationship);
+  if (it != _outGoingConnections.end()) {
+    return _outGoingConnections[relationship];
+  } else {
+    return empty;
+  }
+}
+
+std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() {
+  std::lock_guard<std::mutex> lock(relationship_mutex_);
+
+  if (_incomingConnections.size() == 0)
+    return NULL;
+
+  if (incoming_connections_Iter == _incomingConnections.end())
+    incoming_connections_Iter = _incomingConnections.begin();
+
+  std::shared_ptr<Connectable> ret = *incoming_connections_Iter;
+  incoming_connections_Iter++;
+
+  if (incoming_connections_Iter == _incomingConnections.end())
+    incoming_connections_Iter = _incomingConnections.begin();
+
+  return ret;
+}
+
+} /* namespace components */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Core.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
new file mode 100644
index 0000000..39969f6
--- /dev/null
+++ b/libminifi/src/core/Core.cpp
@@ -0,0 +1,51 @@
+/*
+ * Core.cpp
+ *
+ *  Created on: Mar 10, 2017
+ *      Author: mparisi
+ */
+
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// Set UUID
+void CoreComponent::setUUID(uuid_t uuid) {
+  uuid_copy(uuid_, uuid);
+  char uuidStr[37];
+  uuid_unparse_lower(uuid_, uuidStr);
+  uuidStr_ = uuidStr;
+}
+// Get UUID
+bool CoreComponent::getUUID(uuid_t uuid) {
+  if (uuid) {
+    uuid_copy(uuid, uuid_);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+// Get UUID
+unsigned const char *CoreComponent::getUUID() {
+  return uuid_;
+}
+
+// Set Processor Name
+void CoreComponent::setName(const std::string name) {
+  name_ = name;
+
+}
+// Get Process Name
+std::string CoreComponent::getName() {
+  return name_;
+}
+}
+}
+}
+}
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
new file mode 100644
index 0000000..c6472cc
--- /dev/null
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/FlowConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+FlowConfiguration::~FlowConfiguration() {
+
+}
+
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
+    std::string name, uuid_t uuid) {
+  std::shared_ptr<core::Processor> processor = nullptr;
+  if (name
+      == org::apache::nifi::minifi::processors::GenerateFlowFile::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::processors::GenerateFlowFile>(name, uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::LogAttribute::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::processors::LogAttribute>(name, uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::RealTimeDataCollector::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::processors::RealTimeDataCollector>(name,
+                                                                      uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::GetFile::ProcessorName) {
+    processor =
+        std::make_shared<org::apache::nifi::minifi::processors::GetFile>(name,
+                                                                         uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::PutFile::ProcessorName) {
+    processor =
+        std::make_shared<org::apache::nifi::minifi::processors::PutFile>(name,
+                                                                         uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::TailFile::ProcessorName) {
+    processor =
+        std::make_shared<org::apache::nifi::minifi::processors::TailFile>(name,
+                                                                          uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::ListenSyslog::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::processors::ExecuteProcess>(name, uuid);
+  } else if (name
+      == org::apache::nifi::minifi::processors::AppendHostInfo::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::processors::AppendHostInfo>(name, uuid);
+  } else {
+    logger_->log_error("No Processor defined for %s", name.c_str());
+    return nullptr;
+  }
+
+  // initialize the processor
+  processor->initialize();
+
+  return processor;
+}
+
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(
+    std::string name, uuid_t uuid) {
+  return std::unique_ptr<core::ProcessGroup>(
+      new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid));
+}
+
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(
+    std::string name, uuid_t uuid) {
+  return std::unique_ptr<core::ProcessGroup>(
+      new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+}
+
+std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(
+    std::string name, uuid_t uuid) {
+  return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid);
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
new file mode 100644
index 0000000..baa3ebd
--- /dev/null
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -0,0 +1,312 @@
+/**
+ * @file ProcessGroup.cpp
+ * ProcessGroup class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "core/ProcessGroup.h"
+#include "core/Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
+                           ProcessGroup *parent)
+    : name_(name),
+      type_(type),
+      parent_process_group_(parent) {
+  if (!uuid)
+    // Generate the global UUID for the flow record
+    uuid_generate(uuid_);
+  else
+    uuid_copy(uuid_, uuid);
+
+  yield_period_msec_ = 0;
+  transmitting_ = false;
+
+  logger_ = logging::Logger::getLogger();
+  logger_->log_info("ProcessGroup %s created", name_.c_str());
+}
+
+ProcessGroup::~ProcessGroup() {
+  for (auto &&connection : connections_) {
+    connection->drain();
+  }
+
+  for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
+      it != child_process_groups_.end(); ++it) {
+    ProcessGroup *processGroup(*it);
+    delete processGroup;
+  }
+
+}
+
+bool ProcessGroup::isRootProcessGroup() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return (type_ == ROOT_PROCESS_GROUP);
+}
+
+void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (processors_.find(processor) == processors_.end()) {
+    // We do not have the same processor in this process group yet
+    processors_.insert(processor);
+    logger_->log_info("Add processor %s into process group %s",
+                      processor->getName().c_str(), name_.c_str());
+  }
+}
+
+void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (processors_.find(processor) != processors_.end()) {
+    // We do have the same processor in this process group yet
+    processors_.erase(processor);
+    logger_->log_info("Remove processor %s from process group %s",
+                      processor->getName().c_str(), name_.c_str());
+  }
+}
+
+void ProcessGroup::addProcessGroup(ProcessGroup *child) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (child_process_groups_.find(child) == child_process_groups_.end()) {
+    // We do not have the same child process group in this process group yet
+    child_process_groups_.insert(child);
+    logger_->log_info("Add child process group %s into process group %s",
+                      child->getName().c_str(), name_.c_str());
+  }
+}
+
+void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (child_process_groups_.find(child) != child_process_groups_.end()) {
+    // We do have the same child process group in this process group yet
+    child_process_groups_.erase(child);
+    logger_->log_info("Remove child process group %s from process group %s",
+                      child->getName().c_str(), name_.c_str());
+  }
+}
+
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+                                   EventDrivenSchedulingAgent *eventScheduler) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  try {
+    // Start all the processor node, input and output ports
+    for (auto processor : processors_) {
+      logger_->log_debug("Starting %s", processor->getName().c_str());
+
+      if (!processor->isRunning()
+          && processor->getScheduledState() != DISABLED) {
+        if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+          timeScheduler->schedule(processor);
+        else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+          eventScheduler->schedule(processor);
+      }
+    }
+    // Start processing the group
+    for (auto processGroup : child_process_groups_) {
+      processGroup->startProcessing(timeScheduler, eventScheduler);
+    }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug(
+        "Caught Exception during process group start processing");
+    throw;
+  }
+}
+
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+                                  EventDrivenSchedulingAgent *eventScheduler) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  try {
+    // Stop all the processor node, input and output ports
+    for (std::set<std::shared_ptr<Processor> >::iterator it =
+        processors_.begin(); it != processors_.end(); ++it) {
+      std::shared_ptr<Processor> processor(*it);
+      if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+        timeScheduler->unschedule(processor);
+      else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+        eventScheduler->unschedule(processor);
+    }
+
+    for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
+        it != child_process_groups_.end(); ++it) {
+      ProcessGroup *processGroup(*it);
+      processGroup->stopProcessing(timeScheduler, eventScheduler);
+    }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception during process group stop processing");
+    throw;
+  }
+}
+
+std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
+
+  std::shared_ptr<Processor> ret = NULL;
+  // std::lock_guard<std::mutex> lock(mutex_);
+
+  for (auto processor : processors_) {
+    logger_->log_info("find processor %s", processor->getName().c_str());
+    uuid_t processorUUID;
+
+    if (processor->getUUID(processorUUID)) {
+
+      char uuid_str[37];  // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
+      uuid_unparse_lower(processorUUID, uuid_str);
+      std::string processorUUIDstr = uuid_str;
+      uuid_unparse_lower(uuid, uuid_str);
+      std::string uuidStr = uuid_str;
+      if (processorUUIDstr == uuidStr) {
+        return processor;
+      }
+    }
+
+  }
+  for (auto processGroup : child_process_groups_) {
+
+    logger_->log_info("find processor child %s",
+                      processGroup->getName().c_str());
+    std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
+    if (processor)
+      return processor;
+  }
+
+  return ret;
+}
+
+std::shared_ptr<Processor> ProcessGroup::findProcessor(
+    const std::string &processorName) {
+  std::shared_ptr<Processor> ret = NULL;
+
+  for (auto processor : processors_) {
+    logger_->log_debug("Current processor is %s", processor->getName().c_str());
+    if (processor->getName() == processorName)
+      return processor;
+  }
+
+  for (auto processGroup : child_process_groups_) {
+    std::shared_ptr<Processor> processor = processGroup->findProcessor(
+        processorName);
+    if (processor)
+      return processor;
+  }
+
+  return ret;
+}
+
+void ProcessGroup::updatePropertyValue(std::string processorName,
+                                       std::string propertyName,
+                                       std::string propertyValue) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  for (auto processor : processors_) {
+    if (processor->getName() == processorName) {
+      processor->setProperty(propertyName, propertyValue);
+    }
+  }
+
+  for (auto processGroup : child_process_groups_) {
+    processGroup->updatePropertyValue(processorName, propertyName,
+                                      propertyValue);
+  }
+
+  return;
+}
+
+void ProcessGroup::getConnections(
+    std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
+  for (auto connection : connections_) {
+    connectionMap[connection->getUUIDStr()] = connection;
+  }
+
+  for (auto processGroup : child_process_groups_) {
+    processGroup->getConnections(connectionMap);
+  }
+}
+
+void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (connections_.find(connection) == connections_.end()) {
+    // We do not have the same connection in this process group yet
+    connections_.insert(connection);
+    logger_->log_info("Add connection %s into process group %s",
+                      connection->getName().c_str(), name_.c_str());
+    uuid_t sourceUUID;
+    std::shared_ptr<Processor> source = NULL;
+    connection->getSourceUUID(sourceUUID);
+    source = this->findProcessor(sourceUUID);
+    if (source)
+      source->addConnection(connection);
+    std::shared_ptr<Processor> destination = NULL;
+    uuid_t destinationUUID;
+    connection->getDestinationUUID(destinationUUID);
+    destination = this->findProcessor(destinationUUID);
+    if (destination && destination != source)
+      destination->addConnection(connection);
+  }
+}
+
+void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (connections_.find(connection) != connections_.end()) {
+    // We do not have the same connection in this process group yet
+    connections_.erase(connection);
+    logger_->log_info("Remove connection %s into process group %s",
+                      connection->getName().c_str(), name_.c_str());
+    uuid_t sourceUUID;
+    std::shared_ptr<Processor> source = NULL;
+    connection->getSourceUUID(sourceUUID);
+    source = this->findProcessor(sourceUUID);
+    if (source)
+      source->removeConnection(connection);
+    std::shared_ptr<Processor> destination = NULL;
+    uuid_t destinationUUID;
+    connection->getDestinationUUID(destinationUUID);
+    destination = this->findProcessor(destinationUUID);
+    if (destination && destination != source)
+      destination->removeConnection(connection);
+  }
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
new file mode 100644
index 0000000..e6fa7c4
--- /dev/null
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -0,0 +1,941 @@
+/**
+ * @file ProcessSession.cpp
+ * ProcessSession class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+std::shared_ptr<core::FlowFile> ProcessSession::create() {
+  std::map<std::string, std::string> empty;
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
+      empty);
+
+  _addedFlowFiles[record->getUUIDStr()] = record;
+  logger_->log_debug("Create FlowFile with UUID %s",
+                     record->getUUIDStr().c_str());
+  std::string details = process_context_->getProcessorNode().getName()
+      + " creates flow record " + record->getUUIDStr();
+  provenance_report_->create(record, details);
+
+  return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &parent) {
+  std::map<std::string, std::string> empty;
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
+      empty);
+
+  if (record) {
+    _addedFlowFiles[record->getUUIDStr()] = record;
+    logger_->log_debug("Create FlowFile with UUID %s",
+                       record->getUUIDStr().c_str());
+  }
+
+  if (record) {
+    // Copy attributes
+    std::map<std::string, std::string> parentAttributes =
+        parent->getAttributes();
+    std::map<std::string, std::string>::iterator it;
+    for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
+      if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
+          || it->first == FlowAttributeKey(DISCARD_REASON)
+          || it->first == FlowAttributeKey(UUID))
+        // Do not copy special attributes from parent
+        continue;
+      record->setAttribute(it->first, it->second);
+    }
+    record->setLineageStartDate(parent->getlineageStartDate());
+    record->setLineageIdentifiers(parent->getlineageIdentifiers());
+    parent->getlineageIdentifiers().insert(parent->getUUIDStr());
+
+  }
+  return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::clone(
+    std::shared_ptr<core::FlowFile> &parent) {
+  std::shared_ptr<core::FlowFile> record = this->create(parent);
+  if (record) {
+    // Copy Resource Claim
+    std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
+    record->setResourceClaim(parent_claim);
+    if (parent_claim != nullptr) {
+      record->setOffset(parent->getOffset());
+      record->setSize(parent->getSize());
+      record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
+      ;
+    }
+    provenance_report_->clone(parent, record);
+  }
+  return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
+    std::shared_ptr<core::FlowFile> &parent) {
+  std::map<std::string, std::string> empty;
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
+      empty);
+
+  if (record) {
+    this->_clonedFlowFiles[record->getUUIDStr()] = record;
+    logger_->log_debug("Clone FlowFile with UUID %s during transfer",
+                       record->getUUIDStr().c_str());
+    // Copy attributes
+    std::map<std::string, std::string> parentAttributes =
+        parent->getAttributes();
+    std::map<std::string, std::string>::iterator it;
+    for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
+      if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
+          || it->first == FlowAttributeKey(DISCARD_REASON)
+          || it->first == FlowAttributeKey(UUID))
+        // Do not copy special attributes from parent
+        continue;
+      record->setAttribute(it->first, it->second);
+    }
+    record->setLineageStartDate(parent->getlineageStartDate());
+
+    record->setLineageIdentifiers(parent->getlineageIdentifiers());
+    record->getlineageIdentifiers().insert(parent->getUUIDStr());
+
+    // Copy Resource Claim
+    std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
+    record->setResourceClaim(parent_claim);
+    if (parent_claim != nullptr) {
+      record->setOffset(parent->getOffset());
+      record->setSize(parent->getSize());
+      record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
+      ;
+    }
+    provenance_report_->clone(parent, record);
+  }
+
+  return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::clone(
+    std::shared_ptr<core::FlowFile> &parent, long offset, long size) {
+  std::shared_ptr<core::FlowFile> record = this->create(parent);
+  if (record) {
+
+    if (parent->getResourceClaim()) {
+      if ((offset + size) > (long) parent->getSize()) {
+        // Set offset and size
+        logger_->log_error("clone offset %d and size %d exceed parent size %d",
+                           offset, size, parent->getSize());
+        // Remove the Add FlowFile for the session
+        std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it =
+            this->_addedFlowFiles.find(record->getUUIDStr());
+        if (it != this->_addedFlowFiles.end())
+          this->_addedFlowFiles.erase(record->getUUIDStr());
+        return nullptr;
+      }
+      record->setOffset(parent->getOffset() + parent->getOffset());
+      record->setSize(size);
+      // Copy Resource Claim
+      std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
+      record->setResourceClaim(parent_claim);
+      if (parent_claim != nullptr) {
+
+        record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
+      }
+    }
+    provenance_report_->clone(parent, record);
+  }
+  return record;
+}
+
+void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) {
+  flow->setDeleted(true);
+  _deletedFlowFiles[flow->getUUIDStr()] = flow;
+  std::string reason = process_context_->getProcessorNode().getName()
+      + " drop flow record " + flow->getUUIDStr();
+  provenance_report_->drop(flow, reason);
+}
+
+void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
+  flow->setDeleted(true);
+  _deletedFlowFiles[flow->getUUIDStr()] = flow;
+  std::string reason = process_context_->getProcessorNode().getName()
+      + " drop flow record " + flow->getUUIDStr();
+  provenance_report_->drop(flow, reason);
+}
+
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow,
+                                  std::string key, std::string value) {
+  flow->setAttribute(key, value);
+  std::string details = process_context_->getProcessorNode().getName()
+      + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
+      + value;
+  provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow,
+                                     std::string key) {
+  flow->removeAttribute(key);
+  std::string details = process_context_->getProcessorNode().getName()
+      + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+  provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow,
+                                  std::string key, std::string value) {
+  flow->setAttribute(key, value);
+  std::string details = process_context_->getProcessorNode().getName()
+      + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
+      + value;
+  provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow,
+                                     std::string key) {
+  flow->removeAttribute(key);
+  std::string details = process_context_->getProcessorNode().getName()
+      + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+  provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
+  flow->setPenaltyExpiration(
+      getTimeMillis()
+          + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+}
+
+void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) {
+  flow->setPenaltyExpiration(
+      getTimeMillis()
+          + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+}
+
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow,
+                              Relationship relationship) {
+  _transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow,
+                              Relationship relationship) {
+  _transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
+                           OutputStreamCallback *callback) {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(
+  DEFAULT_CONTENT_DIRECTORY);
+
+  try {
+    std::ofstream fs;
+    uint64_t startTime = getTimeMillis();
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    if (fs.is_open()) {
+      // Call the callback to write the content
+      callback->process(&fs);
+      if (fs.good() && fs.tellp() >= 0) {
+        flow->setSize(fs.tellp());
+        flow->setOffset(0);
+        std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+        if (flow_claim != nullptr) {
+          // Remove the old claim
+          flow_claim->decreaseFlowFileRecordOwnedCount();
+          flow->clearResourceClaim();
+        }
+        flow->setResourceClaim(claim);
+        claim->increaseFlowFileRecordOwnedCount();
+        /*
+         logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+         flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+        fs.close();
+        std::string details = process_context_->getProcessorNode().getName()
+            + " modify flow record content " + flow->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flow, details, endTime - startTime);
+      } else {
+        fs.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    }
+  } catch (std::exception &exception) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception during process session write");
+    throw;
+  }
+}
+
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
+                           OutputStreamCallback *callback) {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+  try {
+    std::ofstream fs;
+    uint64_t startTime = getTimeMillis();
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    if (fs.is_open()) {
+      // Call the callback to write the content
+      callback->process(&fs);
+      if (fs.good() && fs.tellp() >= 0) {
+        flow->setSize(fs.tellp());
+        flow->setOffset(0);
+        std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+        if (flow_claim != nullptr) {
+          // Remove the old claim
+          flow_claim->decreaseFlowFileRecordOwnedCount();
+          flow->clearResourceClaim();
+        }
+        flow->setResourceClaim(claim);
+        claim->increaseFlowFileRecordOwnedCount();
+        /*
+         logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+         flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+        fs.close();
+        std::string details = process_context_->getProcessorNode().getName()
+            + " modify flow record content " + flow->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flow, details, endTime - startTime);
+      } else {
+        fs.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    }
+  } catch (std::exception &exception) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception during process session write");
+    throw;
+  }
+}
+
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
+                            OutputStreamCallback *callback) {
+  std::shared_ptr<ResourceClaim> claim = nullptr;
+
+  if (flow->getResourceClaim() == nullptr) {
+    // No existed claim for append, we need to create new claim
+    return write(flow, callback);
+  }
+
+  claim = flow->getResourceClaim();
+
+  try {
+    std::ofstream fs;
+    uint64_t startTime = getTimeMillis();
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::out | std::fstream::binary | std::fstream::app);
+    if (fs.is_open()) {
+      // Call the callback to write the content
+      std::streampos oldPos = fs.tellp();
+      callback->process(&fs);
+      if (fs.good() && fs.tellp() >= 0) {
+        uint64_t appendSize = fs.tellp() - oldPos;
+        flow->setSize(flow->getSize() + appendSize);
+        /*
+         logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
+         flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+        fs.close();
+        std::string details = process_context_->getProcessorNode().getName()
+            + " modify flow record content " + flow->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flow, details, endTime - startTime);
+      } else {
+        fs.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception during process session append");
+    throw;
+  }
+}
+
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
+                            OutputStreamCallback *callback) {
+  std::shared_ptr<ResourceClaim> claim = nullptr;
+
+  if (flow->getResourceClaim() == nullptr) {
+    // No existed claim for append, we need to create new claim
+    return write(flow, callback);
+  }
+
+  claim = flow->getResourceClaim();
+
+  try {
+    std::ofstream fs;
+    uint64_t startTime = getTimeMillis();
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::out | std::fstream::binary | std::fstream::app);
+    if (fs.is_open()) {
+      // Call the callback to write the content
+      std::streampos oldPos = fs.tellp();
+      callback->process(&fs);
+      if (fs.good() && fs.tellp() >= 0) {
+        uint64_t appendSize = fs.tellp() - oldPos;
+        flow->setSize(flow->getSize() + appendSize);
+        /*
+         logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
+         flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+        fs.close();
+        std::string details = process_context_->getProcessorNode().getName()
+            + " modify flow record content " + flow->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flow, details, endTime - startTime);
+      } else {
+        fs.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception during process session append");
+    throw;
+  }
+}
+
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow,
+                          InputStreamCallback *callback) {
+  try {
+    std::shared_ptr<ResourceClaim> claim = nullptr;
+
+    if (flow->getResourceClaim() == nullptr) {
+      // No existed claim for read, we throw exception
+      throw Exception(FILE_OPERATION_EXCEPTION,
+                      "No Content Claim existed for read");
+    }
+
+    claim = flow->getResourceClaim();
+    std::ifstream fs;
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::in | std::fstream::binary);
+    if (fs.is_open()) {
+      fs.seekg(flow->getOffset(), fs.beg);
+
+      if (fs.good()) {
+        callback->process(&fs);
+        /*
+         logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
+         flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+        fs.close();
+      } else {
+        fs.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception during process session read");
+    throw;
+  }
+}
+
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow,
+                          InputStreamCallback *callback) {
+  try {
+    std::shared_ptr<ResourceClaim> claim = nullptr;
+
+    if (flow->getResourceClaim() == nullptr) {
+      // No existed claim for read, we throw exception
+      throw Exception(FILE_OPERATION_EXCEPTION,
+                      "No Content Claim existed for read");
+    }
+
+    claim = flow->getResourceClaim();
+    std::ifstream fs;
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::in | std::fstream::binary);
+    if (fs.is_open()) {
+      fs.seekg(flow->getOffset(), fs.beg);
+
+      if (fs.good()) {
+        callback->process(&fs);
+        /*
+         logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
+         flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+        fs.close();
+      } else {
+        fs.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+    }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception during process session read");
+    throw;
+  }
+}
+
+void ProcessSession::import(std::string source,
+                            std::shared_ptr<core::FlowFile> &flow,
+                            bool keepSource, uint64_t offset) {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+  char *buf = NULL;
+  int size = 4096;
+  buf = new char[size];
+
+  try {
+    std::ofstream fs;
+    uint64_t startTime = getTimeMillis();
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    std::ifstream input;
+    input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+    if (fs.is_open() && input.is_open()) {
+      // Open the source file and stream to the flow file
+      input.seekg(offset, fs.beg);
+      while (input.good()) {
+        input.read(buf, size);
+        if (input)
+          fs.write(buf, size);
+        else
+          fs.write(buf, input.gcount());
+      }
+
+      if (fs.good() && fs.tellp() >= 0) {
+        flow->setSize(fs.tellp());
+        flow->setOffset(0);
+        if (flow->getResourceClaim() != nullptr) {
+          // Remove the old claim
+          flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+          flow->clearResourceClaim();
+        }
+        flow->setResourceClaim(claim);
+        claim->increaseFlowFileRecordOwnedCount();
+
+        logger_->log_debug(
+            "Import offset %d length %d into content %s for FlowFile UUID %s",
+            flow->getOffset(), flow->getSize(),
+            flow->getResourceClaim()->getContentFullPath().c_str(),
+            flow->getUUIDStr().c_str());
+
+        fs.close();
+        input.close();
+        if (!keepSource)
+          std::remove(source.c_str());
+        std::string details = process_context_->getProcessorNode().getName()
+            + " modify flow record content " + flow->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flow, details, endTime - startTime);
+      } else {
+        fs.close();
+        input.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+    }
+
+    delete[] buf;
+  } catch (std::exception &exception) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception %s", exception.what());
+    delete[] buf;
+    throw;
+  } catch (...) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception during process session write");
+    delete[] buf;
+    throw;
+  }
+}
+
+void ProcessSession::import(std::string source,
+                            std::shared_ptr<core::FlowFile> &&flow,
+                            bool keepSource, uint64_t offset) {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+
+  char *buf = NULL;
+  int size = 4096;
+  buf = new char[size];
+
+  try {
+    std::ofstream fs;
+    uint64_t startTime = getTimeMillis();
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    std::ifstream input;
+    input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+    if (fs.is_open() && input.is_open()) {
+      // Open the source file and stream to the flow file
+      input.seekg(offset, fs.beg);
+      while (input.good()) {
+        input.read(buf, size);
+        if (input)
+          fs.write(buf, size);
+        else
+          fs.write(buf, input.gcount());
+      }
+
+      if (fs.good() && fs.tellp() >= 0) {
+        flow->setSize(fs.tellp());
+        flow->setOffset(0);
+        if (flow->getResourceClaim() != nullptr) {
+          // Remove the old claim
+          flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+          flow->clearResourceClaim();
+        }
+        flow->setResourceClaim(claim);
+        claim->increaseFlowFileRecordOwnedCount();
+
+        logger_->log_debug(
+            "Import offset %d length %d into content %s for FlowFile UUID %s",
+            flow->getOffset(), flow->getSize(),
+            flow->getResourceClaim()->getContentFullPath().c_str(),
+            flow->getUUIDStr().c_str());
+
+        fs.close();
+        input.close();
+        if (!keepSource)
+          std::remove(source.c_str());
+        std::string details = process_context_->getProcessorNode().getName()
+            + " modify flow record content " + flow->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flow, details, endTime - startTime);
+      } else {
+        fs.close();
+        input.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+    }
+
+    delete[] buf;
+  } catch (std::exception &exception) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception %s", exception.what());
+    delete[] buf;
+    throw;
+  } catch (...) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception during process session write");
+    delete[] buf;
+    throw;
+  }
+}
+
+void ProcessSession::commit() {
+
+  try {
+    // First we clone the flow record based on the transfered relationship for updated flow record
+    for (auto && it : _updatedFlowFiles) {
+      std::shared_ptr<core::FlowFile> record = it.second;
+      if (record->isDeleted())
+        continue;
+      std::map<std::string, Relationship>::iterator itRelationship = this
+          ->_transferRelationship.find(record->getUUIDStr());
+      if (itRelationship != _transferRelationship.end()) {
+        Relationship relationship = itRelationship->second;
+        // Find the relationship, we need to find the connections for that relationship
+        std::set<std::shared_ptr<Connectable>> connections = process_context_
+            ->getProcessorNode().getOutGoingConnections(relationship.getName());
+        if (connections.empty()) {
+          // No connection
+          if (!process_context_->getProcessorNode().isAutoTerminated(
+              relationship)) {
+            // Not autoterminate, we should have the connect
+            std::string message =
+                "Connect empty for non auto terminated relationship"
+                    + relationship.getName();
+            throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+          } else {
+            // Autoterminated
+            remove(record);
+          }
+        } else {
+          // We connections, clone the flow and assign the connection accordingly
+          for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
+              connections.begin(); itConnection != connections.end();
+              ++itConnection) {
+            std::shared_ptr<Connectable> connection = *itConnection;
+            if (itConnection == connections.begin()) {
+              // First connection which the flow need be routed to
+              record->setConnection(connection);
+            } else {
+              // Clone the flow file and route to the connection
+              std::shared_ptr<core::FlowFile> cloneRecord;
+              cloneRecord = this->cloneDuringTransfer(record);
+              if (cloneRecord)
+                cloneRecord->setConnection(connection);
+              else
+                throw Exception(PROCESS_SESSION_EXCEPTION,
+                                "Can not clone the flow for transfer");
+            }
+          }
+        }
+      } else {
+        // Can not find relationship for the flow
+        throw Exception(PROCESS_SESSION_EXCEPTION,
+                        "Can not find the transfer relationship for the flow");
+      }
+    }
+
+    // Do the samething for added flow file
+    for (const auto it : _addedFlowFiles) {
+      std::shared_ptr<core::FlowFile> record = it.second;
+      if (record->isDeleted())
+        continue;
+      std::map<std::string, Relationship>::iterator itRelationship = this
+          ->_transferRelationship.find(record->getUUIDStr());
+      if (itRelationship != _transferRelationship.end()) {
+        Relationship relationship = itRelationship->second;
+        // Find the relationship, we need to find the connections for that relationship
+        std::set<std::shared_ptr<Connectable>> connections = process_context_
+            ->getProcessorNode().getOutGoingConnections(relationship.getName());
+        if (connections.empty()) {
+          // No connection
+          if (!process_context_->getProcessorNode().isAutoTerminated(
+              relationship)) {
+            // Not autoterminate, we should have the connect
+            std::string message =
+                "Connect empty for non auto terminated relationship "
+                    + relationship.getName();
+            throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+          } else {
+            // Autoterminated
+            remove(record);
+          }
+        } else {
+          // We connections, clone the flow and assign the connection accordingly
+          for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
+              connections.begin(); itConnection != connections.end();
+              ++itConnection) {
+            std::shared_ptr<Connectable> connection(*itConnection);
+            if (itConnection == connections.begin()) {
+              // First connection which the flow need be routed to
+              record->setConnection(connection);
+            } else {
+              // Clone the flow file and route to the connection
+              std::shared_ptr<core::FlowFile> cloneRecord;
+              cloneRecord = this->cloneDuringTransfer(record);
+              if (cloneRecord)
+                cloneRecord->setConnection(connection);
+              else
+                throw Exception(PROCESS_SESSION_EXCEPTION,
+                                "Can not clone the flow for transfer");
+            }
+          }
+        }
+      } else {
+        // Can not find relationship for the flow
+        throw Exception(PROCESS_SESSION_EXCEPTION,
+                        "Can not find the transfer relationship for the flow");
+      }
+    }
+
+    std::shared_ptr<Connection> connection = nullptr;
+    // Complete process the added and update flow files for the session, send the flow file to its queue
+    for (const auto &it : _updatedFlowFiles) {
+      std::shared_ptr<core::FlowFile> record = it.second;
+      if (record->isDeleted()) {
+        continue;
+      }
+
+      connection = std::static_pointer_cast<Connection>(
+          record->getConnection());
+      if ((connection) != nullptr)
+        connection->put(record);
+    }
+    for (const auto &it : _addedFlowFiles) {
+      std::shared_ptr<core::FlowFile> record = it.second;
+      if (record->isDeleted()) {
+        continue;
+      }
+      connection = std::static_pointer_cast<Connection>(
+          record->getConnection());
+      if ((connection) != nullptr)
+        connection->put(record);
+    }
+    // Process the clone flow files
+    for (const auto &it : _clonedFlowFiles) {
+      std::shared_ptr<core::FlowFile> record = it.second;
+      if (record->isDeleted()) {
+        continue;
+      }
+      connection = std::static_pointer_cast<Connection>(
+          record->getConnection());
+      if ((connection) != nullptr)
+        connection->put(record);
+    }
+
+    // All done
+    _updatedFlowFiles.clear();
+    _addedFlowFiles.clear();
+    _clonedFlowFiles.clear();
+    _deletedFlowFiles.clear();
+    _originalFlowFiles.clear();
+    // persistent the provenance report
+    this->provenance_report_->commit();
+    logger_->log_trace("ProcessSession committed for %s",
+                       process_context_->getProcessorNode().getName().c_str());
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception during process session commit");
+    throw;
+  }
+}
+
+void ProcessSession::rollback() {
+  try {
+    std::shared_ptr<Connection> connection = nullptr;
+    // Requeue the snapshot of the flowfile back
+    for (const auto &it : _originalFlowFiles) {
+      std::shared_ptr<core::FlowFile> record = it.second;
+      connection = std::static_pointer_cast<Connection>(
+          record->getOriginalConnection());
+      if ((connection) != nullptr) {
+        std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<
+            FlowFileRecord>(record);
+        flowf->setSnapShot(false);
+        connection->put(record);
+      }
+
+    }
+    _originalFlowFiles.clear();
+
+    _clonedFlowFiles.clear();
+    _addedFlowFiles.clear();
+    _updatedFlowFiles.clear();
+    _deletedFlowFiles.clear();
+    logger_->log_trace("ProcessSession rollback for %s",
+                       process_context_->getProcessorNode().getName().c_str());
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception during process session roll back");
+    throw;
+  }
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::get() {
+  std::shared_ptr<Connectable> first = process_context_->getProcessorNode()
+      .getNextIncomingConnection();
+
+  if (first == NULL)
+    return NULL;
+
+  std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(
+      first);
+
+  do {
+    std::set<std::shared_ptr<core::FlowFile> > expired;
+    std::shared_ptr<core::FlowFile> ret = current->poll(expired);
+    if (expired.size() > 0) {
+      // Remove expired flow record
+      for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired
+          .begin(); it != expired.end(); ++it) {
+        std::shared_ptr<core::FlowFile> record = *it;
+        std::string details = process_context_->getProcessorNode().getName()
+            + " expire flow record " + record->getUUIDStr();
+        provenance_report_->expire(record, details);
+      }
+    }
+    if (ret) {
+      // add the flow record to the current process session update map
+      ret->setDeleted(false);
+      _updatedFlowFiles[ret->getUUIDStr()] = ret;
+      std::map<std::string, std::string> empty;
+      std::shared_ptr<core::FlowFile> snapshot =
+          std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),empty);
+      logger_->log_debug("Create Snapshot FlowFile with UUID %s",
+                         snapshot->getUUIDStr().c_str());
+      snapshot = ret;
+//      snapshot->duplicate(ret);
+      // save a snapshot
+      _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
+      return ret;
+    }
+    current = std::static_pointer_cast<Connection>(
+        process_context_->getProcessorNode().getNextIncomingConnection());
+  } while (current != NULL && current != first);
+
+  return NULL;
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp
new file mode 100644
index 0000000..445ca58
--- /dev/null
+++ b/libminifi/src/core/ProcessSessionFactory.cpp
@@ -0,0 +1,42 @@
+/**
+ * @file ProcessSessionFactory.cpp
+ * ProcessSessionFactory class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ProcessSessionFactory.h"
+
+#include <memory>
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession()
+{
+	return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_));
+}
+
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
new file mode 100644
index 0000000..ba52c28
--- /dev/null
+++ b/libminifi/src/core/Processor.cpp
@@ -0,0 +1,272 @@
+/**
+ * @file Processor.cpp
+ * Processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <memory>
+#include <functional>
+
+#include "core/Processor.h"
+
+#include "Connection.h"
+#include "core/Connectable.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+Processor::Processor(std::string name, uuid_t uuid)
+    : Connectable(name, uuid),
+      ConfigurableComponent(logging::Logger::getLogger()) {
+
+  has_work_.store(false);
+  // Setup the default values
+  state_ = DISABLED;
+  strategy_ = TIMER_DRIVEN;
+  loss_tolerant_ = false;
+  _triggerWhenEmpty = false;
+  scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
+  run_durantion_nano_ = 0;
+  yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+  _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+  max_concurrent_tasks_ = 1;
+  active_tasks_ = 0;
+  yield_expiration_ = 0;
+  incoming_connections_Iter = this->_incomingConnections.begin();
+  logger_ = logging::Logger::getLogger();
+  logger_->log_info("Processor %s created UUID %s", name_.c_str(),
+                    uuidStr_.c_str());
+}
+
+bool Processor::isRunning() {
+  return (state_ == RUNNING && active_tasks_ > 0);
+}
+
+void Processor::setScheduledState(ScheduledState state) {
+  state_ = state;
+}
+
+bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
+  
+  bool ret = false;
+
+  if (isRunning()) {
+    logger_->log_info("Can not add connection while the process %s is running",
+                      name_.c_str());
+    return false;
+  }
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  uuid_t srcUUID;
+  uuid_t destUUID;
+
+  connection->getSourceUUID(srcUUID);
+  connection->getDestinationUUID(destUUID);
+  char uuid_str[37];
+
+  uuid_unparse_lower(uuid_, uuid_str);
+  std::string my_uuid = uuid_str;
+  uuid_unparse_lower(destUUID, uuid_str);
+  std::string destination_uuid = uuid_str;
+  if (my_uuid == destination_uuid) {
+    // Connection is destination to the current processor
+    if (_incomingConnections.find(connection) == _incomingConnections.end()) {
+      _incomingConnections.insert(connection);
+      connection->setDestination(shared_from_this());
+      logger_->log_info(
+          "Add connection %s into Processor %s incoming connection",
+          connection->getName().c_str(), name_.c_str());
+      incoming_connections_Iter = this->_incomingConnections.begin();
+      ret = true;
+    }
+  }
+  uuid_unparse_lower(srcUUID, uuid_str);
+  std::string source_uuid = uuid_str;
+  if (my_uuid == source_uuid) {
+    std::string relationship = connection->getRelationship().getName();
+    // Connection is source from the current processor
+    auto &&it = _outGoingConnections.find(relationship);
+    if (it != _outGoingConnections.end()) {
+      // We already has connection for this relationship
+      std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
+      if (existedConnection.find(connection) == existedConnection.end()) {
+        // We do not have the same connection for this relationship yet
+        existedConnection.insert(connection);
+        connection->setSource(shared_from_this());
+        _outGoingConnections[relationship] = existedConnection;
+        logger_->log_info(
+            "Add connection %s into Processor %s outgoing connection for relationship %s",
+            connection->getName().c_str(), name_.c_str(), relationship.c_str());
+        ret = true;
+      }
+    } else {
+
+      // We do not have any outgoing connection for this relationship yet
+      std::set<std::shared_ptr<Connectable>> newConnection;
+      newConnection.insert(connection);
+      connection->setSource(shared_from_this());
+      _outGoingConnections[relationship] = newConnection;
+      logger_->log_info(
+          "Add connection %s into Processor %s outgoing connection for relationship %s",
+          connection->getName().c_str(), name_.c_str(), relationship.c_str());
+      ret = true;
+    }
+  }
+
+  return ret;
+}
+
+void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
+  if (isRunning()) {
+    logger_->log_info(
+        "Can not remove connection while the process %s is running",
+        name_.c_str());
+    return;
+  }
+
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  uuid_t srcUUID;
+  uuid_t destUUID;
+  
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+
+  connection->getSourceUUID(srcUUID);
+  connection->getDestinationUUID(destUUID);
+
+  if (uuid_compare(uuid_, destUUID) == 0) {
+    // Connection is destination to the current processor
+    if (_incomingConnections.find(connection) != _incomingConnections.end()) {
+      _incomingConnections.erase(connection);
+      connection->setDestination(NULL);
+      logger_->log_info(
+          "Remove connection %s into Processor %s incoming connection",
+          connection->getName().c_str(), name_.c_str());
+      incoming_connections_Iter = this->_incomingConnections.begin();
+    }
+  }
+
+  if (uuid_compare(uuid_, srcUUID) == 0) {
+    std::string relationship = connection->getRelationship().getName();
+    // Connection is source from the current processor
+    auto &&it = _outGoingConnections.find(relationship);
+    if (it == _outGoingConnections.end()) {
+      return;
+    } else {
+      if (_outGoingConnections[relationship].find(connection)
+          != _outGoingConnections[relationship].end()) {
+        _outGoingConnections[relationship].erase(connection);
+        connection->setSource(NULL);
+        logger_->log_info(
+            "Remove connection %s into Processor %s outgoing connection for relationship %s",
+            connection->getName().c_str(), name_.c_str(), relationship.c_str());
+      }
+    }
+  }
+}
+
+
+
+bool Processor::flowFilesQueued() {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (_incomingConnections.size() == 0)
+    return false;
+
+  for (auto &&conn : _incomingConnections) {
+    std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+    if (connection->getQueueSize() > 0)
+      return true;
+  }
+
+  return false;
+}
+
+bool Processor::flowFilesOutGoingFull() {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  for (auto &&connection : _outGoingConnections) {
+    // We already has connection for this relationship
+    std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
+    for (const auto conn : existedConnection) {
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+      if (connection->isFull())
+        return true;
+    }
+  }
+
+  return false;
+}
+
+void Processor::onTrigger(ProcessContext *context,
+                          ProcessSessionFactory *sessionFactory) {
+  auto session = sessionFactory->createSession();
+
+  try {
+    // Call the virtual trigger function
+    onTrigger(context, session.get());
+    session->commit();
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    session->rollback();
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception Processor::onTrigger");
+    session->rollback();
+    throw;
+  }
+}
+
+bool Processor::isWorkAvailable() {
+  // We have work if any incoming connection has work
+  bool hasWork = false;
+
+  try {
+    for (const auto &conn : _incomingConnections) {
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+      if (connection->getQueueSize() > 0) {
+        hasWork = true;
+        break;
+      }
+    }
+  } catch (...) {
+    logger_->log_error(
+        "Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!");
+  }
+
+  return hasWork;
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessorNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
new file mode 100644
index 0000000..44491d3
--- /dev/null
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ProcessorNode.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor)
+    : processor_(processor),
+      Connectable(processor->getName(),0),
+      ConfigurableComponent(logging::Logger::getLogger()) {
+	
+	uuid_t copy;
+	processor->getUUID(copy);
+	setUUID( copy );
+
+
+}
+
+ProcessorNode::ProcessorNode(const ProcessorNode &other)
+    : processor_(other.processor_),
+      Connectable(other.getName(), 0),
+      ConfigurableComponent(logging::Logger::getLogger()) {
+	
+	uuid_t copy;
+	processor_->getUUID(copy);
+	setUUID( copy );
+
+}
+
+ProcessorNode::~ProcessorNode() {
+
+}
+
+bool ProcessorNode::isWorkAvailable() {
+  return processor_->isWorkAvailable();
+}
+
+bool ProcessorNode::isRunning() {
+  return processor_->isRunning();
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Property.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp
new file mode 100644
index 0000000..287b7ec
--- /dev/null
+++ b/libminifi/src/core/Property.cpp
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {namespace minifi {
+namespace core {
+
+// Get Name for the property
+std::string Property::getName() const {
+  return name_;
+}
+// Get Description for the property
+std::string Property::getDescription() {
+  return description_;
+}
+// Get value for the property
+std::string Property::getValue() const {
+  return value_;
+}
+// Set value for the property
+void Property::setValue(std::string value) {
+  value_ = value;
+}
+// Compare
+bool Property::operator <(const Property & right) const {
+  return name_ < right.name_;
+}
+
+const Property &Property::operator=(const Property &other) {
+  name_ = other.name_;
+  value_ = other.value_;
+  return *this;
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Record.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp
new file mode 100644
index 0000000..dbf0102
--- /dev/null
+++ b/libminifi/src/core/Record.cpp
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 <copyright holder> <email>
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ */
+
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+FlowFile::FlowFile()
+    : size_(0),
+      id_(0),
+      stored(false),
+      offset_(0),
+      last_queue_date_(0),
+      penaltyExpiration_ms_(0),
+      claim_(nullptr),
+      marked_delete_(false),
+      connection_(nullptr),
+      original_connection_() {
+  entry_date_ = getTimeMillis();
+  lineage_start_date_ = entry_date_;
+
+  char uuidStr[37];
+
+  // Generate the global UUID for the flow record
+  uuid_generate(uuid_);
+
+  uuid_unparse_lower(uuid_, uuidStr);
+  uuid_str_ = uuidStr;
+  
+  logger_ = logging::Logger::getLogger();
+
+}
+
+FlowFile::~FlowFile() {
+
+}
+
+FlowFile& FlowFile::operator=(const FlowFile& other) {
+
+  uuid_copy(uuid_, other.uuid_);
+  stored = other.stored;
+  marked_delete_ = other.marked_delete_;
+  entry_date_ = other.entry_date_;
+  lineage_start_date_ = other.lineage_start_date_;
+  lineage_Identifiers_ = other.lineage_Identifiers_;
+  last_queue_date_ = other.last_queue_date_;
+  size_ = other.size_;
+  penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
+  attributes_ = other.attributes_;
+  claim_ = other.claim_;
+  if (claim_ != nullptr)
+    this->claim_->increaseFlowFileRecordOwnedCount();
+  uuid_str_ = other.uuid_str_;
+  connection_ = other.connection_;
+  original_connection_ = other.original_connection_;
+
+  return *this;
+}
+
+/**
+ * Returns whether or not this flow file record
+ * is marked as deleted.
+ * @return marked deleted
+ */
+bool FlowFile::isDeleted() {
+  return marked_delete_;
+}
+
+/**
+ * Sets whether to mark this flow file record
+ * as deleted
+ * @param deleted deleted flag
+ */
+void FlowFile::setDeleted(const bool deleted) {
+  marked_delete_ = deleted;
+}
+
+std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
+  return claim_;
+}
+
+void FlowFile::clearResourceClaim() {
+  claim_ = nullptr;
+}
+void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) {
+  claim_ = claim;
+}
+
+// ! Get Entry Date
+uint64_t FlowFile::getEntryDate() {
+  return entry_date_;
+}
+uint64_t FlowFile::getEventTime() {
+  return event_time_;
+}
+// ! Get Lineage Start Date
+uint64_t FlowFile::getlineageStartDate() {
+  return lineage_start_date_;
+}
+
+std::set<std::string> &FlowFile::getlineageIdentifiers() {
+  return lineage_Identifiers_;
+}
+
+bool FlowFile::getAttribute(std::string key, std::string &value) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    value = it->second;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+// Get Size
+uint64_t FlowFile::getSize() {
+  return size_;
+}
+// ! Get Offset
+uint64_t FlowFile::getOffset() {
+  return offset_;
+}
+
+bool FlowFile::removeAttribute(const std::string key) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    attributes_.erase(key);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool FlowFile::updateAttribute(const std::string key, const std::string value) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    attributes_[key] = value;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
+  auto it = attributes_.find(key);
+  if (it != attributes_.end()) {
+    // attribute already there in the map
+    return false;
+  } else {
+    attributes_[key] = value;
+    return true;
+  }
+}
+
+void FlowFile::setLineageStartDate(const uint64_t date) {
+  lineage_start_date_ = date;
+}
+
+/**
+ * Sets the original connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setOriginalConnection(
+    std::shared_ptr<core::Connectable> &connection) {
+  original_connection_ = connection;
+}
+
+/**
+ * Sets the connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) {
+  connection_ = connection;
+}
+
+/**
+ * Sets the connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) {
+  connection_ = connection;
+}
+
+/**
+ * Returns the connection referenced by this record.
+ * @return shared connection pointer.
+ */
+std::shared_ptr<core::Connectable> FlowFile::getConnection() {
+  return connection_;
+}
+
+/**
+ * Returns the original connection referenced by this record.
+ * @return shared original connection pointer.
+ */
+std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() {
+  return original_connection_;
+}
+
+}
+}
+}
+}
+}