You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:18 UTC

[13/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces and removes use of raw pointers for user facing API.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h
index e89bb74..de3a42f 100644
--- a/libminifi/include/Site2SitePeer.h
+++ b/libminifi/include/Site2SitePeer.h
@@ -29,242 +29,261 @@
 #include <mutex>
 #include <atomic>
 #include <memory>
-#include "Logger.h"
-#include "Configure.h"
-#include "Property.h"
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+#include "properties/Configure.h"
 #include "io/ClientSocket.h"
 #include "io/BaseStream.h"
 #include "utils/TimeUtil.h"
 
 
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
 static const char MAGIC_BYTES[] = { 'N', 'i', 'F', 'i' };
 
-//! Site2SitePeer Class
-class Site2SitePeer : public BaseStream{
-public:
-	
+// Site2SitePeer Class
+class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
+ public:
+
+  Site2SitePeer()
+      : stream_(nullptr),
+        host_(""),
+        port_(-1) {
+
+  }
+  /*
+   * Create a new site2site peer
+   */
+  explicit Site2SitePeer(
+      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket,
+      const std::string host_, uint16_t port_)
+      : host_(host_),
+        port_(port_),
+        stream_(injected_socket.release()) {
+    logger_ = logging::Logger::getLogger();
+    configure_ = Configure::getConfigure();
+    _yieldExpiration = 0;
+    _timeOut = 30000;  // 30 seconds
+    _url = "nifi://" + host_ + ":" + std::to_string(port_);
+  }
 
-	Site2SitePeer() : stream_(nullptr),host_(""),port_(-1){
-	  
-	}
-	/*
-	 * Create a new site2site peer
-	 */
-	explicit Site2SitePeer(std::unique_ptr<DataStream> injected_socket, const std::string host_, uint16_t port_ ) :
-		host_(host_), port_(port_), stream_(injected_socket.release()){
-		logger_ = Logger::getLogger();
-		configure_ = Configure::getConfigure();
-		_yieldExpiration = 0;
-		_timeOut = 30000; // 30 seconds
-		_url = "nifi://" + host_ + ":" + std::to_string(port_);
-	}
-	
-	explicit Site2SitePeer(Site2SitePeer &&ss) : stream_( ss.stream_.release()), host_( std::move(ss.host_) ), port_ (std::move(ss.port_) )
-	{
-	  logger_ = Logger::getLogger();
-	  configure_ = Configure::getConfigure();
-	  _yieldExpiration.store(ss._yieldExpiration);
-	  _timeOut.store(ss._timeOut);
-	  _url = std::move(ss._url);
-	}
-	//! Destructor
-	virtual ~Site2SitePeer() {
-		Close();
-	}
-	//! Set Processor yield period in MilliSecond
-	void setYieldPeriodMsec(uint64_t period) {
-		_yieldPeriodMsec = period;
-	}
-	//! get URL
-	std::string getURL() {
-		return _url;
-	}
-	//! Get Processor yield period in MilliSecond
-	uint64_t getYieldPeriodMsec(void) {
-		return (_yieldPeriodMsec);
-	}
-	//! Yield based on the yield period
-	void yield() {
-		_yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
-	}
-	//! setHostName
-	void setHostName(std::string host_) {
-		this->host_ = host_;
-		_url = "nifi://" + host_ + ":" + std::to_string(port_);
-	}
-	//! setPort
-	void setPort(uint16_t port_) {
-		this->port_ = port_;
-		_url = "nifi://" + host_ + ":" + std::to_string(port_);
-	}
-	//! getHostName
-	std::string getHostName() {
-		return host_;
-	}
-	//! getPort
-	uint16_t getPort() {
-		return port_;
-	}
-	//! Yield based on the input time
-	void yield(uint64_t time) {
-		_yieldExpiration = (getTimeMillis() + time);
-	}
-	//! whether need be to yield
-	bool isYield() {
-		if (_yieldExpiration > 0)
-			return (_yieldExpiration >= getTimeMillis());
-		else
-			return false;
-	}
-	// clear yield expiration
-	void clearYield() {
-		_yieldExpiration = 0;
-	}
-	//! Yield based on the yield period
-	void yield(std::string portId) {
-		std::lock_guard<std::mutex> lock(_mtx);
-		uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
-		_yieldExpirationPortIdMap[portId] = yieldExpiration;
-	}
-	//! Yield based on the input time
-	void yield(std::string portId, uint64_t time) {
-		std::lock_guard<std::mutex> lock(_mtx);
-		uint64_t yieldExpiration = (getTimeMillis() + time);
-		_yieldExpirationPortIdMap[portId] = yieldExpiration;
-	}
-	//! whether need be to yield
-	bool isYield(std::string portId) {
-		std::lock_guard<std::mutex> lock(_mtx);
-		std::map<std::string, uint64_t>::iterator it =
-				this->_yieldExpirationPortIdMap.find(portId);
-		if (it != _yieldExpirationPortIdMap.end()) {
-			uint64_t yieldExpiration = it->second;
-			return (yieldExpiration >= getTimeMillis());
-		} else {
-			return false;
-		}
-	}
-	//! clear yield expiration
-	void clearYield(std::string portId) {
-		std::lock_guard<std::mutex> lock(_mtx);
-		std::map<std::string, uint64_t>::iterator it =
-				this->_yieldExpirationPortIdMap.find(portId);
-		if (it != _yieldExpirationPortIdMap.end()) {
-			_yieldExpirationPortIdMap.erase(portId);
-		}
-	}
-	//! setTimeOut
-	void setTimeOut(uint64_t time) {
-		_timeOut = time;
-	}
-	//! getTimeOut
-	uint64_t getTimeOut() {
-		return _timeOut;
-	}
-	int write(uint8_t value) {
-		return Serializable::write(value,stream_.get());
-	}
-	int write(char value) {
-		return Serializable::write(value,stream_.get());
-	}
-	int write(uint32_t value) {
+  explicit Site2SitePeer(Site2SitePeer &&ss)
+      : stream_(ss.stream_.release()),
+        host_(std::move(ss.host_)),
+        port_(std::move(ss.port_)) {
+    logger_ = logging::Logger::getLogger();
+    configure_ = Configure::getConfigure();
+    _yieldExpiration.store(ss._yieldExpiration);
+    _timeOut.store(ss._timeOut);
+    _url = std::move(ss._url);
+  }
+  // Destructor
+  virtual ~Site2SitePeer() {
+    Close();
+  }
+  // Set Processor yield period in MilliSecond
+  void setYieldPeriodMsec(uint64_t period) {
+    _yieldPeriodMsec = period;
+  }
+  // get URL
+  std::string getURL() {
+    return _url;
+  }
+  // Get Processor yield period in MilliSecond
+  uint64_t getYieldPeriodMsec(void) {
+    return (_yieldPeriodMsec);
+  }
+  // Yield based on the yield period
+  void yield() {
+    _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+  }
+  // setHostName
+  void setHostName(std::string host_) {
+    this->host_ = host_;
+    _url = "nifi://" + host_ + ":" + std::to_string(port_);
+  }
+  // setPort
+  void setPort(uint16_t port_) {
+    this->port_ = port_;
+    _url = "nifi://" + host_ + ":" + std::to_string(port_);
+  }
+  // getHostName
+  std::string getHostName() {
+    return host_;
+  }
+  // getPort
+  uint16_t getPort() {
+    return port_;
+  }
+  // Yield based on the input time
+  void yield(uint64_t time) {
+    _yieldExpiration = (getTimeMillis() + time);
+  }
+  // whether need be to yield
+  bool isYield() {
+    if (_yieldExpiration > 0)
+      return (_yieldExpiration >= getTimeMillis());
+    else
+      return false;
+  }
+  // clear yield expiration
+  void clearYield() {
+    _yieldExpiration = 0;
+  }
+  // Yield based on the yield period
+  void yield(std::string portId) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+    _yieldExpirationPortIdMap[portId] = yieldExpiration;
+  }
+  // Yield based on the input time
+  void yield(std::string portId, uint64_t time) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    uint64_t yieldExpiration = (getTimeMillis() + time);
+    _yieldExpirationPortIdMap[portId] = yieldExpiration;
+  }
+  // whether need be to yield
+  bool isYield(std::string portId) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::map<std::string, uint64_t>::iterator it = this
+        ->_yieldExpirationPortIdMap.find(portId);
+    if (it != _yieldExpirationPortIdMap.end()) {
+      uint64_t yieldExpiration = it->second;
+      return (yieldExpiration >= getTimeMillis());
+    } else {
+      return false;
+    }
+  }
+  // clear yield expiration
+  void clearYield(std::string portId) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::map<std::string, uint64_t>::iterator it = this
+        ->_yieldExpirationPortIdMap.find(portId);
+    if (it != _yieldExpirationPortIdMap.end()) {
+      _yieldExpirationPortIdMap.erase(portId);
+    }
+  }
+  // setTimeOut
+  void setTimeOut(uint64_t time) {
+    _timeOut = time;
+  }
+  // getTimeOut
+  uint64_t getTimeOut() {
+    return _timeOut;
+  }
+  int write(uint8_t value) {
+    return Serializable::write(value, stream_.get());
+  }
+  int write(char value) {
+    return Serializable::write(value, stream_.get());
+  }
+  int write(uint32_t value) {
 
-		return Serializable::write(value,stream_.get());
+    return Serializable::write(value, stream_.get());
 
-	}
-	int write(uint16_t value) {
-		return Serializable::write(value,stream_.get());
-	}
-	int write(uint8_t *value, int len) {
-		return Serializable::write(value,len,stream_.get());
-	}
-	int write(uint64_t value) {
-		return Serializable::write(value,stream_.get());
-	}
-	int write(bool value) {
-		uint8_t temp = value;
-		return Serializable::write(temp,stream_.get());
-	}
-	int writeUTF(std::string str, bool widen = false){
-		return Serializable::writeUTF(str,stream_.get(),widen);
-	}
-	int read(uint8_t &value) {
-		return Serializable::read(value,stream_.get());
-	}
-	int read(uint16_t &value) {
-		return Serializable::read(value,stream_.get());
-	}
-	int read(char &value) {
-		return Serializable::read(value,stream_.get());
-	}
-	int read(uint8_t *value, int len) {
-		return Serializable::read(value,len,stream_.get());
-	}
-	int read(uint32_t &value) {
-		return Serializable::read(value,stream_.get());
-	}
-	int read(uint64_t &value) {
-		return Serializable::read(value,stream_.get());
-	}
-	int readUTF(std::string &str, bool widen = false)
-	{
-		return Serializable::readUTF(str,stream_.get(),widen);
-	}
-	//! open connection to the peer
-	bool Open();
-	//! close connection to the peer
-	void Close();
-	
-	/**
-	 * Move assignment operator.
-	 */
-	Site2SitePeer& operator=(Site2SitePeer&& other)
-	{
-	  stream_ = std::unique_ptr<DataStream>( other.stream_.release());
-	  host_ = std::move(other.host_);
-	  port_ = std::move(other.port_);
-	  logger_ = Logger::getLogger();
-	  configure_ = Configure::getConfigure();
-	  _yieldExpiration = 0;
-	  _timeOut = 30000; // 30 seconds
-	  _url = "nifi://" + host_ + ":" + std::to_string(port_);
-	  
-	  return *this;
-	}
+  }
+  int write(uint16_t value) {
+    return Serializable::write(value, stream_.get());
+  }
+  int write(uint8_t *value, int len) {
+    return Serializable::write(value, len, stream_.get());
+  }
+  int write(uint64_t value) {
+    return Serializable::write(value, stream_.get());
+  }
+  int write(bool value) {
+    uint8_t temp = value;
+    return Serializable::write(temp, stream_.get());
+  }
+  int writeUTF(std::string str, bool widen = false) {
+    return Serializable::writeUTF(str, stream_.get(), widen);
+  }
+  int read(uint8_t &value) {
+    return Serializable::read(value, stream_.get());
+  }
+  int read(uint16_t &value) {
+    return Serializable::read(value, stream_.get());
+  }
+  int read(char &value) {
+    return Serializable::read(value, stream_.get());
+  }
+  int read(uint8_t *value, int len) {
+    return Serializable::read(value, len, stream_.get());
+  }
+  int read(uint32_t &value) {
+    return Serializable::read(value, stream_.get());
+  }
+  int read(uint64_t &value) {
+    return Serializable::read(value, stream_.get());
+  }
+  int readUTF(std::string &str, bool widen = false) {
+    return org::apache::nifi::minifi::io::Serializable::readUTF(str,
+                                                                stream_.get(),
+                                                                widen);
+  }
+  // open connection to the peer
+  bool Open();
+  // close connection to the peer
+  void Close();
 
-	Site2SitePeer(const Site2SitePeer &parent) = delete;
-	Site2SitePeer &operator=(const Site2SitePeer &parent) = delete;
+  /**
+   * Move assignment operator.
+   */
+  Site2SitePeer& operator=(Site2SitePeer&& other) {
+    stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
+        other.stream_.release());
+    host_ = std::move(other.host_);
+    port_ = std::move(other.port_);
+    logger_ = logging::Logger::getLogger();
+    configure_ = Configure::getConfigure();
+    _yieldExpiration = 0;
+    _timeOut = 30000;  // 30 seconds
+    _url = "nifi://" + host_ + ":" + std::to_string(port_);
 
-protected:
+    return *this;
+  }
 
-private:
+  Site2SitePeer(const Site2SitePeer &parent) = delete;
+  Site2SitePeer &operator=(const Site2SitePeer &parent) = delete;
 
-	std::unique_ptr<DataStream> stream_;
+ protected:
 
-	std::string host_;
-	uint16_t port_;
+ private:
 
-	//! Mutex for protection
-	std::mutex _mtx;
-	//! URL
-	std::string _url;
-	//! socket timeout;
-	std::atomic<uint64_t> _timeOut;
-	//! Logger
-	std::shared_ptr<Logger> logger_;
-	//! Configure
-	Configure *configure_;
-	//! Yield Period in Milliseconds
-	std::atomic<uint64_t> _yieldPeriodMsec;
-	//! Yield Expiration
-	std::atomic<uint64_t> _yieldExpiration;
-	//! Yield Expiration per destination PortID
-	std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
-	//! OpenSSL connection state
-	// Prevent default copy constructor and assignment operation
-	// Only support pass by reference or pointer
+  std::unique_ptr<org::apache::nifi::minifi::io::DataStream> stream_;
+
+  std::string host_;
+  uint16_t port_;
+
+  // Mutex for protection
+  std::mutex mutex_;
+  // URL
+  std::string _url;
+  // socket timeout;
+  std::atomic<uint64_t> _timeOut;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Configure
+  Configure *configure_;
+  // Yield Period in Milliseconds
+  std::atomic<uint64_t> _yieldPeriodMsec;
+  // Yield Expiration
+  std::atomic<uint64_t> _yieldExpiration;
+  // Yield Expiration per destination PortID
+  std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
+  // OpenSSL connection state
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
 
 };
 
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TailFile.h b/libminifi/include/TailFile.h
deleted file mode 100644
index d68748e..0000000
--- a/libminifi/include/TailFile.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * @file TailFile.h
- * TailFile class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TAIL_FILE_H__
-#define __TAIL_FILE_H__
-
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-
-//! TailFile Class
-class TailFile : public Processor
-{
-public:
-	//! Constructor
-	/*!
-	 * Create a new processor
-	 */
-	TailFile(std::string name, uuid_t uuid = NULL)
-	: Processor(name, uuid)
-	{
-		logger_ = Logger::getLogger();
-		_stateRecovered = false;
-	}
-	//! Destructor
-	virtual ~TailFile()
-	{
-		storeState();
-	}
-	//! Processor Name
-	static const std::string ProcessorName;
-	//! Supported Properties
-	static Property FileName;
-	static Property StateFile;
-	//! Supported Relationships
-	static Relationship Success;
-
-public:
-	//! OnTrigger method, implemented by NiFi TailFile
-	virtual void onTrigger(ProcessContext *context, ProcessSession *session);
-	//! Initialize, over write by NiFi TailFile
-	virtual void initialize(void);
-	//! recoverState
-	void recoverState();
-	//! storeState
-	void storeState();
-
-protected:
-
-private:
-	//! Logger
-	std::shared_ptr<Logger> logger_;
-	std::string _fileLocation;
-	//! Property Specified Tailed File Name
-	std::string _fileName;
-	//! File to save state
-	std::string _stateFile;
-	//! State related to the tailed file
-	std::string _currentTailFileName;
-	uint64_t _currentTailFilePosition;
-	bool _stateRecovered;
-	uint64_t _currentTailFileCreatedTime;
-	//! Utils functions for parse state file
-	std::string trimLeft(const std::string& s);
-	std::string trimRight(const std::string& s);
-	void parseStateFileLine(char *buf);
-	void checkRollOver();
-
-};
-
-//! Matched File Item for Roll over check
-typedef struct {
-	std::string fileName;
-	uint64_t modifiedTime;
-} TailMatchedFileItem;
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index 5eb5d8a..4e39da3 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -20,51 +20,60 @@
 #ifndef __THREADED_SCHEDULING_AGENT_H__
 #define __THREADED_SCHEDULING_AGENT_H__
 
-#include "Configure.h"
-#include "Logger.h"
-#include "Processor.h"
-#include "ProcessContext.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "core/Repository.h"
+#include "core/ProcessContext.h"
 #include "SchedulingAgent.h"
 
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
 /**
  * An abstract scheduling agent which creates and manages a pool of threads for
  * each processor scheduled.
  */
-class ThreadedSchedulingAgent : public SchedulingAgent
-{
-public:
-	//! Constructor
-	/*!
-	 * Create a new processor
-	 */
-	ThreadedSchedulingAgent()
-	: SchedulingAgent()
-	{
-	}
-	//! Destructor
-	virtual ~ThreadedSchedulingAgent()
-	{
-	}
+class ThreadedSchedulingAgent : public SchedulingAgent {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  ThreadedSchedulingAgent(std::shared_ptr<core::Repository> repo)
+      : SchedulingAgent(repo) {
+  }
+  // Destructor
+  virtual ~ThreadedSchedulingAgent() {
+  }
 
-	//! Run function for the thread
-	virtual void run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory) = 0;
+  // Run function for the thread
+  virtual void run(std::shared_ptr<core::Processor> processor,
+                   core::ProcessContext *processContext,
+                   core::ProcessSessionFactory *sessionFactory) = 0;
 
-public:
-	//! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
-	virtual void schedule(Processor *processor);
-	//! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
-	virtual void unschedule(Processor *processor);
+ public:
+  // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+  virtual void schedule(std::shared_ptr<core::Processor> processor);
+  // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+  virtual void unschedule(std::shared_ptr<core::Processor> processor);
 
-protected:
-	//! Threads
-	std::map<std::string, std::vector<std::thread *>> _threads;
+ protected:
+  // Threads
+  std::map<std::string, std::vector<std::thread *>> _threads;
 
-private:
-	// Prevent default copy constructor and assignment operation
-	// Only support pass by reference or pointer
-	ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
-	ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
+ private:
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
+  ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
 
 };
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 389ccf6..7da2abd 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -20,36 +20,47 @@
 #ifndef __TIMER_DRIVEN_SCHEDULING_AGENT_H__
 #define __TIMER_DRIVEN_SCHEDULING_AGENT_H__
 
-#include "Logger.h"
-#include "Processor.h"
-#include "ProcessContext.h"
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/Repository.h"
 #include "ThreadedSchedulingAgent.h"
 
-//! TimerDrivenSchedulingAgent Class
-class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent
-{
-public:
-	//! Constructor
-	/*!
-	 * Create a new processor
-	 */
-	TimerDrivenSchedulingAgent()
-	: ThreadedSchedulingAgent()
-	{
-	}
-	//! Destructor
-	virtual ~TimerDrivenSchedulingAgent()
-	{
-	}
-	//! Run function for the thread
-	void run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory);
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+//  TimerDrivenSchedulingAgent Class
+class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
+ public:
+  //  Constructor
+  /*!
+   * Create a new processor
+   */
+  TimerDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo)
+      : ThreadedSchedulingAgent(repo) {
+  }
+  //  Destructor
+  virtual ~TimerDrivenSchedulingAgent() {
+  }
+  /**
+   * Run function that accepts the processor, context and session factory.
+   */
+  void run(std::shared_ptr<core::Processor> processor,
+           core::ProcessContext *processContext,
+           core::ProcessSessionFactory *sessionFactory);
 
-private:
-	// Prevent default copy constructor and assignment operation
-	// Only support pass by reference or pointer
-	TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
-	TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
+ private:
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
+  TimerDrivenSchedulingAgent &operator=(
+      const TimerDrivenSchedulingAgent &parent);
 
 };
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ConfigurableComponent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
new file mode 100644
index 0000000..c0cc623
--- /dev/null
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_
+#define LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_
+
+#include "Property.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Represents a configurable component
+ * Purpose: Extracts configuration items for all components and localized them
+ */
+class ConfigurableComponent {
+ public:
+
+
+  ConfigurableComponent() = delete;
+
+
+  explicit ConfigurableComponent(std::shared_ptr<logging::Logger> logger);
+
+  explicit ConfigurableComponent(const ConfigurableComponent &&other);
+
+  /**
+   * Get property using the provided name.
+   * @param name property name.
+   * @param value value passed in by reference
+   * @return result of getting property.
+   */
+  bool getProperty(const std::string name, std::string &value);
+  /**
+   * Sets the property using the provided name
+   * @param property name
+   * @param value property value.
+   * @return result of setting property.
+   */
+  bool setProperty(const std::string name, std::string value);
+  /**
+   * Sets the property using the provided name
+   * @param property name
+   * @param value property value.
+   * @return whether property was set or not
+   */
+  bool setProperty(Property &prop, std::string value);
+
+  /**
+   * Sets supported properties for the ConfigurableComponent
+   * @param supported properties
+   * @return result of set operation.
+   */
+  bool setSupportedProperties(std::set<Property> properties);
+  /**
+   * Sets supported properties for the ConfigurableComponent
+   * @param supported properties
+   * @return result of set operation.
+   */
+
+  virtual ~ConfigurableComponent();
+
+ protected:
+
+
+  /**
+   * Returns true if the instance can be edited.
+   * @return true/false
+   */
+  virtual bool canEdit()= 0;
+
+  std::mutex configuration_mutex_;
+  std::shared_ptr<logging::Logger> logger_;
+  // Supported properties
+  std::map<std::string, Property> properties_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ConfigurationFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h
new file mode 100644
index 0000000..19ed5f4
--- /dev/null
+++ b/libminifi/include/core/ConfigurationFactory.h
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_
+#define LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_
+
+#include "FlowConfiguration.h"
+#include  <type_traits>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+
+
+
+template<typename T>
+typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(
+      std::shared_ptr<core::Repository> repo,
+      std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) {
+  throw std::runtime_error("Cannot instantiate class");
+}
+
+template<typename T>
+typename std::enable_if<class_operations<T>::value, T*>::type instantiate(
+      std::shared_ptr<core::Repository> repo,
+      std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) {
+  return new T(repo,flow_file_repo,path);
+}
+
+  
+/**
+ * Configuration factory is used to create a new FlowConfiguration
+ * object.
+ */
+ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
+      std::shared_ptr<core::Repository> repo,
+      std::shared_ptr<core::Repository> flow_file_repo,
+      const std::string configuration_class_name, const std::string path = "",
+      bool fail_safe = false);
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Connectable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
new file mode 100644
index 0000000..15e618f
--- /dev/null
+++ b/libminifi/include/core/Connectable.h
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_
+#define LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_
+
+#include <set>
+#include "core.h"
+#include <condition_variable>
+#include "core/logging/Logger.h"
+#include "Relationship.h"
+#include "Scheduling.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Represents the base connectable component
+ * Purpose: As in NiFi, this represents a connection point and allows the derived
+ * object to be connected to other connectables.
+ */
+class Connectable : public CoreComponent {
+ public:
+
+  explicit Connectable(std::string name, uuid_t uuid);
+
+  explicit Connectable(const Connectable &&other);
+
+  bool setSupportedRelationships(std::set<Relationship> relationships);
+
+  // Whether the relationship is supported
+  bool isSupportedRelationship(Relationship relationship);
+
+  /**
+   * Sets auto terminated relationships
+   * @param relationships
+   * @return result of set operation.
+   */
+  bool setAutoTerminatedRelationships(std::set<Relationship> relationships);
+
+  // Check whether the relationship is auto terminated
+  bool isAutoTerminated(Relationship relationship);
+
+  // Get Processor penalization period in MilliSecond
+  uint64_t getPenalizationPeriodMsec(void) {
+    return (_penalizationPeriodMsec);
+  }
+
+  /**
+   * Get outgoing connection based on relationship
+   * @return set of outgoing connections.
+   */
+  std::set<std::shared_ptr<Connectable>> getOutGoingConnections(
+      std::string relationship);
+
+  /**
+   * Get next incoming connection
+   * @return next incoming connection
+   */
+  std::shared_ptr<Connectable> getNextIncomingConnection();
+
+  /**
+   * @return true if incoming connections > 0
+   */
+  bool hasIncomingConnections() {
+    return (_incomingConnections.size() > 0);
+  }
+
+  uint8_t getMaxConcurrentTasks() {
+    return max_concurrent_tasks_;
+  }
+
+  void setMaxConcurrentTasks(const uint8_t tasks) {
+    max_concurrent_tasks_ = tasks;
+  }
+  /**
+   * Yield
+   */
+  virtual void yield() = 0;
+
+  virtual ~Connectable();
+
+  /**
+   * Determines if we are connected and operating
+   */
+  virtual bool isRunning() = 0;
+
+  /**
+   * Block until work is available on any input connection, or the given duration elapses
+   * @param timeoutMs timeout in milliseconds
+   */
+  void waitForWork(uint64_t timeoutMs);
+  /**
+   * Notify this processor that work may be available
+   */
+
+  void notifyWork();
+
+  /**
+   * Determines if work is available by this connectable
+   * @return boolean if work is available.
+   */
+  virtual bool isWorkAvailable() = 0;
+
+ protected:
+
+  // Penalization Period in MilliSecond
+  std::atomic<uint64_t> _penalizationPeriodMsec;
+
+  uint8_t max_concurrent_tasks_;
+
+  // Supported relationships
+  std::map<std::string, core::Relationship> relationships_;
+  // Autoterminated relationships
+  std::map<std::string, core::Relationship> auto_terminated_relationships_;
+
+  // Incoming connection Iterator
+  std::set<std::shared_ptr<Connectable>>::iterator incoming_connections_Iter;
+  // Incoming connections
+  std::set<std::shared_ptr<Connectable>> _incomingConnections;
+  // Outgoing connections map based on Relationship name
+  std::map<std::string, std::set<std::shared_ptr<Connectable>>>_outGoingConnections;
+
+  // Mutex for protection
+  std::mutex relationship_mutex_;
+
+  ///// work conditionals and locking mechanisms
+
+  // Concurrent condition mutex for whether there is incoming work to do
+  std::mutex work_available_mutex_;
+  // Condition for whether there is incoming work to do
+  std::atomic<bool> has_work_;
+  // Scheduling Strategy
+  std::atomic<SchedulingStrategy> strategy_;
+  // Concurrent condition variable for whether there is incoming work to do
+  std::condition_variable work_condition_;
+
+};
+
+}
+/* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
new file mode 100644
index 0000000..c7eedd2
--- /dev/null
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_
+#define LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_
+
+#include "core/core.h"
+#include "Connection.h"
+#include "RemoteProcessorGroupPort.h"
+#include "provenance/Provenance.h"
+#include "processors/GetFile.h"
+#include "processors/PutFile.h"
+#include "processors/TailFile.h"
+#include "processors/ListenSyslog.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/RealTimeDataCollector.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+#include "processors/ExecuteProcess.h"
+#include "processors/AppendHostInfo.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessGroup.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Purpose: Flow configuration defines the mechanism
+ * by which we will configure our flow controller
+ */
+class FlowConfiguration : public CoreComponent {
+ public:
+  /**
+   * Constructor that will be used for configuring
+   * the flow controller.
+   */
+  FlowConfiguration(std::shared_ptr<core::Repository> repo,
+                    std::shared_ptr<core::Repository> flow_file_repo,
+                    const std::string path)
+      : CoreComponent(core::getClassName<FlowConfiguration>()),
+        flow_file_repo_(flow_file_repo),
+        config_path_(path) {
+
+  }
+
+  virtual ~FlowConfiguration();
+
+  // Create Processor (Node/Input/Output Port) based on the name
+  std::shared_ptr<core::Processor> createProcessor(std::string name,
+                                                   uuid_t uuid);
+  // Create Root Processor Group
+  std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name,
+                                                             uuid_t uuid);
+  // Create Remote Processor Group
+  std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name,
+                                                               uuid_t uuid);
+  // Create Connection
+  std::shared_ptr<minifi::Connection> createConnection(std::string name,
+                                                       uuid_t uuid);
+
+  /**
+   * Returns the configuration path string
+   * @return config_path_
+   */
+  const std::string &getConfigurationPath() {
+    return config_path_;
+  }
+
+  virtual std::unique_ptr<core::ProcessGroup> getRoot() {
+    return getRoot(config_path_);
+  }
+
+  /**
+   * Base implementation that returns a null root pointer.
+   * @return Extensions should return a non-null pointer in order to
+   * properly configure flow controller.
+   */
+  virtual std::unique_ptr<core::ProcessGroup> getRoot(
+      const std::string &from_config) {
+    return nullptr;
+  }
+
+ protected:
+  // configuration path
+  std::string config_path_;
+  // flow file repo
+  std::shared_ptr<core::Repository> flow_file_repo_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/FlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
new file mode 100644
index 0000000..247ad26
--- /dev/null
+++ b/libminifi/include/core/FlowFile.h
@@ -0,0 +1,283 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef RECORD_H
+#define RECORD_H
+
+#include "utils/TimeUtil.h"
+#include "ResourceClaim.h"
+#include "Connectable.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class FlowFile {
+ public:
+  FlowFile();
+  ~FlowFile();
+  FlowFile& operator=(const FlowFile& other);
+
+  /**
+   * Returns a pointer to this flow file record's
+   * claim
+   */
+  std::shared_ptr<ResourceClaim> getResourceClaim();
+  /**
+   * Sets _claim to the inbound claim argument
+   */
+  void setResourceClaim(std::shared_ptr<ResourceClaim> &claim);
+
+  /**
+   * clear the resource claim
+   */
+  void clearResourceClaim();
+
+  /**
+   * Get lineage identifiers
+   */
+  std::set<std::string> &getlineageIdentifiers();
+
+  /**
+   * Returns whether or not this flow file record
+   * is marked as deleted.
+   * @return marked deleted
+   */
+  bool isDeleted();
+
+  /**
+   * Sets whether to mark this flow file record
+   * as deleted
+   * @param deleted deleted flag
+   */
+  void setDeleted(const bool deleted);
+
+  /**
+   * Get entry date for this record
+   * @return entry date uint64_t
+   */
+  uint64_t getEntryDate();
+
+  /**
+   * Gets the event time.
+   * @return event time.
+   */
+  uint64_t getEventTime();
+  /**
+   * Get lineage start date
+   * @return lineage start date uint64_t
+   */
+  uint64_t getlineageStartDate();
+
+  /**
+   * Sets the lineage start date
+   * @param date new lineage start date
+   */
+  void setLineageStartDate(const uint64_t date);
+
+  void setLineageIdentifiers(std::set<std::string> lineage_Identifiers) {
+    lineage_Identifiers_ = lineage_Identifiers;
+  }
+  /**
+   * Obtains an attribute if it exists. If it does the value is
+   * copied into value
+   * @param key key to look for
+   * @param value value to set
+   * @return result of finding key
+   */
+  bool getAttribute(std::string key, std::string &value);
+
+  /**
+   * Updates the value in the attribute map that corresponds
+   * to key
+   * @param key attribute name
+   * @param value value to set to attribute name
+   * @return result of finding key
+   */
+  bool updateAttribute(const std::string key, const std::string value);
+
+  /**
+   * Removes the attribute
+   * @param key attribute name to remove
+   * @return result of finding key
+   */
+  bool removeAttribute(const std::string key);
+
+  /**
+   * setAttribute, if attribute already there, update it, else, add it
+   */
+  void setAttribute(const std::string &key, const std::string &value) {
+    attributes_[key] = value;
+  }
+
+  /**
+   * Returns the map of attributes
+   * @return attributes.
+   */
+  std::map<std::string, std::string> getAttributes() {
+    return attributes_;
+  }
+
+  /**
+   * adds an attribute if it does not exist
+   *
+   */
+  bool addAttribute(const std::string &key, const std::string &value);
+
+  /**
+   * Set the size of this record.
+   * @param size size of record to set.�
+   */
+  void setSize(const uint64_t size) {
+    size_ = size;
+  }
+  /**
+   * Returns the size of corresponding flow file
+   * @return size as a uint64_t
+   */
+  uint64_t getSize();
+
+  /**
+   * Sets the offset
+   * @param offset offset to apply to this record.
+   */
+  void setOffset(const uint64_t offset) {
+    offset_ = offset;
+  }
+
+  /**
+   * Sets the penalty expiration
+   * @param penaltyExp new penalty expiration
+   */
+  void setPenaltyExpiration(const uint64_t penaltyExp) {
+    penaltyExpiration_ms_ = penaltyExp;
+  }
+
+  uint64_t getPenaltyExpiration() {
+    return penaltyExpiration_ms_;
+  }
+
+  /**
+   * Gets the offset within the flow file
+   * @return size as a uint64_t
+   */
+  uint64_t getOffset();
+
+  // Get the UUID as string
+  std::string getUUIDStr() {
+    return uuid_str_;
+  }
+  
+  bool getUUID(uuid_t other)
+  {
+    uuid_copy(other,uuid_);
+    return true;
+  }
+
+  // Check whether it is still being penalized
+  bool isPenalized() {
+    return (
+        penaltyExpiration_ms_ > 0 ?
+            penaltyExpiration_ms_ > getTimeMillis() : false);
+  }
+
+  /**
+   * Sets the original connection with a shared pointer.
+   * @param connection shared connection.
+   */
+  void setConnection(std::shared_ptr<core::Connectable> &connection);
+
+  /**
+   * Sets the original connection with a shared pointer.
+   * @param connection shared connection.
+   */
+  void setConnection(std::shared_ptr<core::Connectable> &&connection);
+
+  /**
+   * Returns the connection referenced by this record.
+   * @return shared connection pointer.
+   */
+  std::shared_ptr<core::Connectable> getConnection();
+  /**
+   * Sets the original connection with a shared pointer.
+   * @param connection shared connection.
+   */
+  void setOriginalConnection(std::shared_ptr<core::Connectable> &connection);
+  /**
+   * Returns the original connection referenced by this record.
+   * @return shared original connection pointer.
+   */
+  std::shared_ptr<core::Connectable> getOriginalConnection();
+
+  void setStoredToRepository(bool storedInRepository) {
+    stored = storedInRepository;
+  }
+
+  bool isStored() {
+    return stored;
+  }
+
+ protected:
+  bool stored;
+  // Mark for deletion
+  bool marked_delete_;
+  // Date at which the flow file entered the flow
+  uint64_t entry_date_;
+  // event time
+  uint64_t event_time_;
+  // Date at which the origin of this flow file entered the flow
+  uint64_t lineage_start_date_;
+  // Date at which the flow file was queued
+  uint64_t last_queue_date_;
+  // Size in bytes of the data corresponding to this flow file
+  uint64_t size_;
+  // A global unique identifier
+  uuid_t uuid_;
+  // A local unique identifier
+  uint64_t id_;
+  // Offset to the content
+  uint64_t offset_;
+  // Penalty expiration
+  uint64_t penaltyExpiration_ms_;
+  // Attributes key/values pairs for the flow record
+  std::map<std::string, std::string> attributes_;
+  // Pointer to the associated content resource claim
+  std::shared_ptr<ResourceClaim> claim_;
+  // UUID string
+  std::string uuid_str_;
+  // UUID string for all parents
+  std::set<std::string> lineage_Identifiers_;
+
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  
+  // Connection queue that this flow file will be transfer or current in
+  std::shared_ptr<core::Connectable> connection_;
+  // Orginal connection queue that this flow file was dequeued from
+  std::shared_ptr<core::Connectable> original_connection_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif // RECORD_H

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
new file mode 100644
index 0000000..1da85cd
--- /dev/null
+++ b/libminifi/include/core/ProcessContext.h
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_CONTEXT_H__
+#define __PROCESS_CONTEXT_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+
+#include "Property.h"
+#include "core/logging/Logger.h"
+#include "ProcessorNode.h"
+#include "core/Repository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ProcessContext Class
+class ProcessContext {
+ public:
+  // Constructor
+  /*!
+   * Create a new process context associated with the processor/controller service/state manager
+   */
+  ProcessContext(ProcessorNode &processor,
+                 std::shared_ptr<core::Repository> repo)
+      : processor_node_(processor) {
+    logger_ = logging::Logger::getLogger();
+    repo_ = repo;
+  }
+  // Destructor
+  virtual ~ProcessContext() {
+  }
+  // Get Processor associated with the Process Context
+  ProcessorNode &getProcessorNode() {
+    return processor_node_;
+  }
+  bool getProperty(std::string name, std::string &value) {
+    return processor_node_.getProperty(name, value);
+  }
+  // Sets the property value using the property's string name
+  bool setProperty(std::string name, std::string value) {
+    return processor_node_.setProperty(name, value);
+  }
+  // Sets the property value using the Property object
+  bool setProperty(Property prop, std::string value) {
+    return processor_node_.setProperty(prop, value);
+  }
+  // Whether the relationship is supported
+  bool isSupportedRelationship(Relationship relationship) {
+    return processor_node_.isSupportedRelationship(relationship);
+  }
+
+  // Check whether the relationship is auto terminated
+  bool isAutoTerminated(Relationship relationship) {
+    return processor_node_.isAutoTerminated(relationship);
+  }
+  // Get ProcessContext Maximum Concurrent Tasks
+  uint8_t getMaxConcurrentTasks(void) {
+    return processor_node_.getMaxConcurrentTasks();
+  }
+  // Yield based on the yield period
+  void yield() {
+    processor_node_.yield();
+  }
+
+  std::shared_ptr<core::Repository> getProvenanceRepository() {
+    return repo_;
+  }
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ProcessContext(const ProcessContext &parent) = delete;
+  ProcessContext &operator=(const ProcessContext &parent) = delete;
+
+ private:
+
+  // repository shared pointer.
+  std::shared_ptr<core::Repository> repo_;
+  // Processor
+  ProcessorNode processor_node_;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
new file mode 100644
index 0000000..75bb0ba
--- /dev/null
+++ b/libminifi/include/core/ProcessGroup.h
@@ -0,0 +1,190 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_GROUP_H__
+#define __PROCESS_GROUP_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "Processor.h"
+#include "Exception.h"
+#include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// Process Group Type
+enum ProcessGroupType {
+  ROOT_PROCESS_GROUP = 0,
+  REMOTE_PROCESS_GROUP,
+  MAX_PROCESS_GROUP_TYPE
+};
+
+// ProcessGroup Class
+class ProcessGroup {
+ public:
+  // Constructor
+  /*!
+   * Create a new process group
+   */
+  ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL,
+               ProcessGroup *parent = NULL);
+  // Destructor
+  virtual ~ProcessGroup();
+  // Set Processor Name
+  void setName(std::string name) {
+    name_ = name;
+  }
+  // Get Process Name
+  std::string getName(void) {
+    return (name_);
+  }
+  // Set URL
+  void setURL(std::string url) {
+    url_ = url;
+  }
+  // Get URL
+  std::string getURL(void) {
+    return (url_);
+  }
+  // SetTransmitting
+  void setTransmitting(bool val) {
+    transmitting_ = val;
+  }
+  // Get Transmitting
+  bool getTransmitting() {
+    return transmitting_;
+  }
+  // setTimeOut
+  void setTimeOut(uint64_t time) {
+    timeOut_ = time;
+  }
+  uint64_t getTimeOut() {
+    return timeOut_;
+  }
+  // Set Processor yield period in MilliSecond
+  void setYieldPeriodMsec(uint64_t period) {
+    yield_period_msec_ = period;
+  }
+  // Get Processor yield period in MilliSecond
+  uint64_t getYieldPeriodMsec(void) {
+    return (yield_period_msec_);
+  }
+  // Set UUID
+  void setUUID(uuid_t uuid) {
+    uuid_copy(uuid_, uuid);
+  }
+  // Get UUID
+  bool getUUID(uuid_t uuid) {
+    if (uuid) {
+      uuid_copy(uuid, uuid_);
+      return true;
+    } else
+      return false;
+  }
+  // Start Processing
+  void startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+                       EventDrivenSchedulingAgent *eventScheduler);
+  // Stop Processing
+  void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+                      EventDrivenSchedulingAgent *eventScheduler);
+  // Whether it is root process group
+  bool isRootProcessGroup();
+  // set parent process group
+  void setParent(ProcessGroup *parent) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    parent_process_group_ = parent;
+  }
+  // get parent process group
+  ProcessGroup *getParent(void) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return parent_process_group_;
+  }
+  // Add processor
+  void addProcessor(std::shared_ptr<Processor> processor);
+  // Remove processor
+  void removeProcessor(std::shared_ptr<Processor> processor);
+  // Add child processor group
+  void addProcessGroup(ProcessGroup *child);
+  // Remove child processor group
+  void removeProcessGroup(ProcessGroup *child);
+  // ! Add connections
+  void addConnection(std::shared_ptr<Connection> connection);
+  // findProcessor based on UUID
+  std::shared_ptr<Processor> findProcessor(uuid_t uuid);
+  // findProcessor based on name
+  std::shared_ptr<Processor> findProcessor(const std::string &processorName);
+  // removeConnection
+  void removeConnection(std::shared_ptr<Connection> connection);
+  // update property value
+  void updatePropertyValue(std::string processorName, std::string propertyName,
+                           std::string propertyValue);
+
+  void getConnections(
+      std::map<std::string, std::shared_ptr<Connection>> &connectionMap);
+
+ protected:
+  // A global unique identifier
+  uuid_t uuid_;
+  // Processor Group Name
+  std::string name_;
+  // Process Group Type
+  ProcessGroupType type_;
+  // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
+  std::set<std::shared_ptr<Processor> > processors_;
+  std::set<ProcessGroup *> child_process_groups_;
+  // Connections between the processor inside the group;
+  std::set<std::shared_ptr<Connection> > connections_;
+  // Parent Process Group
+  ProcessGroup* parent_process_group_;
+  // Yield Period in Milliseconds
+  std::atomic<uint64_t> yield_period_msec_;
+  std::atomic<uint64_t> timeOut_;
+  // URL
+  std::string url_;
+  // Transmitting
+  std::atomic<bool> transmitting_;
+
+ private:
+
+  // Mutex for protection
+  std::mutex mutex_;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ProcessGroup(const ProcessGroup &parent);
+  ProcessGroup &operator=(const ProcessGroup &parent);
+};
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
new file mode 100644
index 0000000..b516817
--- /dev/null
+++ b/libminifi/include/core/ProcessSession.h
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_SESSION_H__
+#define __PROCESS_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "ProcessContext.h"
+#include "FlowFileRecord.h"
+#include "Exception.h"
+#include "core/logging/Logger.h"
+#include "FlowFile.h"
+#include "provenance/Provenance.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ProcessSession Class
+class ProcessSession {
+ public:
+  // Constructor
+  /*!
+   * Create a new process session
+   */
+  ProcessSession(ProcessContext *processContext = NULL)
+      : process_context_(processContext) {
+    logger_ = logging::Logger::getLogger();
+    logger_->log_trace("ProcessSession created for %s",
+                       process_context_->getProcessorNode().getName().c_str());
+    auto repo = processContext->getProvenanceRepository();
+    provenance_report_ = new provenance::ProvenanceReporter(
+        repo, process_context_->getProcessorNode().getUUIDStr(),
+        process_context_->getProcessorNode().getName());
+  }
+
+// Destructor
+  virtual ~ProcessSession() {
+    if (provenance_report_)
+      delete provenance_report_;
+  }
+// Commit the session
+  void commit();
+// Roll Back the session
+  void rollback();
+// Get Provenance Report
+  provenance::ProvenanceReporter *getProvenanceReporter() {
+    return provenance_report_;
+  }
+//
+// Get the FlowFile from the highest priority queue
+  std::shared_ptr<core::FlowFile> get();
+// Create a new UUID FlowFile with no content resource claim and without parent
+  std::shared_ptr<core::FlowFile> create();
+// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
+  std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent);
+// Clone a new UUID FlowFile from parent both for content resource claim and attributes
+  std::shared_ptr<core::FlowFile> clone(
+      std::shared_ptr<core::FlowFile> &parent);
+// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
+  std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent,
+                                        long offset, long size);
+// Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
+  std::shared_ptr<core::FlowFile> duplicate(
+      std::shared_ptr<core::FlowFile> &original);
+// Transfer the FlowFile to the relationship
+  void transfer(std::shared_ptr<core::FlowFile> &flow,
+                Relationship relationship);
+  void transfer(std::shared_ptr<core::FlowFile> &&flow,
+                Relationship relationship);
+// Put Attribute
+  void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key,
+                    std::string value);
+  void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key,
+                    std::string value);
+// Remove Attribute
+  void removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key);
+  void removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key);
+// Remove Flow File
+  void remove(std::shared_ptr<core::FlowFile> &flow);
+  void remove(std::shared_ptr<core::FlowFile> &&flow);
+// Execute the given read callback against the content
+  void read(std::shared_ptr<core::FlowFile> &flow,
+            InputStreamCallback *callback);
+  void read(std::shared_ptr<core::FlowFile> &&flow,
+            InputStreamCallback *callback);
+// Execute the given write callback against the content
+  void write(std::shared_ptr<core::FlowFile> &flow,
+             OutputStreamCallback *callback);
+  void write(std::shared_ptr<core::FlowFile> &&flow,
+             OutputStreamCallback *callback);
+// Execute the given write/append callback against the content
+  void append(std::shared_ptr<core::FlowFile> &flow,
+              OutputStreamCallback *callback);
+  void append(std::shared_ptr<core::FlowFile> &&flow,
+              OutputStreamCallback *callback);
+// Penalize the flow
+  void penalize(std::shared_ptr<core::FlowFile> &flow);
+  void penalize(std::shared_ptr<core::FlowFile> &&flow);
+// Import the existed file into the flow
+  void import(std::string source, std::shared_ptr<core::FlowFile> &flow,
+              bool keepSource = true, uint64_t offset = 0);
+  void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
+              bool keepSource = true, uint64_t offset = 0);
+
+// Prevent default copy constructor and assignment operation
+// Only support pass by reference or pointer
+  ProcessSession(const ProcessSession &parent) = delete;
+  ProcessSession &operator=(const ProcessSession &parent) = delete;
+
+ protected:
+// FlowFiles being modified by current process session
+  std::map<std::string, std::shared_ptr<core::FlowFile> > _updatedFlowFiles;
+// Copy of the original FlowFiles being modified by current process session as above
+  std::map<std::string, std::shared_ptr<core::FlowFile> > _originalFlowFiles;
+// FlowFiles being added by current process session
+  std::map<std::string, std::shared_ptr<core::FlowFile> > _addedFlowFiles;
+// FlowFiles being deleted by current process session
+  std::map<std::string, std::shared_ptr<core::FlowFile> > _deletedFlowFiles;
+// FlowFiles being transfered to the relationship
+  std::map<std::string, Relationship> _transferRelationship;
+// FlowFiles being cloned for multiple connections per relationship
+  std::map<std::string, std::shared_ptr<core::FlowFile> > _clonedFlowFiles;
+
+ private:
+// Clone the flow file during transfer to multiple connections for a relationship
+  std::shared_ptr<core::FlowFile> cloneDuringTransfer(
+      std::shared_ptr<core::FlowFile> &parent);
+// ProcessContext
+  ProcessContext *process_context_;
+// Logger
+  std::shared_ptr<logging::Logger> logger_;
+// Provenance Report
+  provenance::ProvenanceReporter *provenance_report_;
+
+}
+;
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessSessionFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSessionFactory.h b/libminifi/include/core/ProcessSessionFactory.h
new file mode 100644
index 0000000..e0ebe18
--- /dev/null
+++ b/libminifi/include/core/ProcessSessionFactory.h
@@ -0,0 +1,64 @@
+/**
+ * @file ProcessSessionFactory.h
+ * ProcessSessionFactory class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_SESSION_FACTORY_H__
+#define __PROCESS_SESSION_FACTORY_H__
+
+#include <memory>
+
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ProcessSessionFactory Class
+class ProcessSessionFactory {
+ public:
+  // Constructor
+  /*!
+   * Create a new process session factory
+   */
+  explicit ProcessSessionFactory(ProcessContext *processContext)
+      : process_context_(processContext) {
+  }
+
+  // Create the session
+  std::unique_ptr<ProcessSession> createSession();
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ProcessSessionFactory(const ProcessSessionFactory &parent) = delete;
+  ProcessSessionFactory &operator=(const ProcessSessionFactory &parent) = delete;
+
+ private:
+  // ProcessContext
+  ProcessContext *process_context_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
new file mode 100644
index 0000000..fd0411f
--- /dev/null
+++ b/libminifi/include/core/Processor.h
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESSOR_H__
+#define __PROCESSOR_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <memory>
+#include <condition_variable>
+#include <atomic>
+#include <algorithm>
+#include <set>
+#include <chrono>
+#include <functional>
+
+#include "Connectable.h"
+#include "ConfigurableComponent.h"
+#include "Property.h"
+#include "utils/TimeUtil.h"
+#include "Relationship.h"
+#include "Connection.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+#include "ProcessSessionFactory.h"
+#include "Scheduling.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// Minimum scheduling period in Nano Second
+#define MINIMUM_SCHEDULING_NANOS 30000
+
+// Default yield period in second
+#define DEFAULT_YIELD_PERIOD_SECONDS 1
+
+// Default penalization period in second
+#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
+
+// Processor Class
+class Processor : public Connectable, public ConfigurableComponent,
+    public std::enable_shared_from_this<Processor> {
+
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  Processor(std::string name, uuid_t uuid = NULL);
+  // Destructor
+  virtual ~Processor() {
+  }
+
+  bool isRunning();
+  // Set Processor Scheduled State
+  void setScheduledState(ScheduledState state);
+  // Get Processor Scheduled State
+  ScheduledState getScheduledState(void) {
+    return state_;
+  }
+  // Set Processor Scheduling Strategy
+  void setSchedulingStrategy(SchedulingStrategy strategy) {
+    strategy_ = strategy;
+  }
+  // Get Processor Scheduling Strategy
+  SchedulingStrategy getSchedulingStrategy(void) {
+    return strategy_;
+  }
+  // Set Processor Loss Tolerant
+  void setlossTolerant(bool lossTolerant) {
+    loss_tolerant_ = lossTolerant;
+  }
+  // Get Processor Loss Tolerant
+  bool getlossTolerant(void) {
+    return loss_tolerant_;
+  }
+  // Set Processor Scheduling Period in Nano Second
+  void setSchedulingPeriodNano(uint64_t period) {
+    uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS;
+    scheduling_period_nano_ = std::max(period, minPeriod);
+  }
+  // Get Processor Scheduling Period in Nano Second
+  uint64_t getSchedulingPeriodNano(void) {
+    return scheduling_period_nano_;
+  }
+  // Set Processor Run Duration in Nano Second
+  void setRunDurationNano(uint64_t period) {
+    run_durantion_nano_ = period;
+  }
+  // Get Processor Run Duration in Nano Second
+  uint64_t getRunDurationNano(void) {
+    return (run_durantion_nano_);
+  }
+  // Set Processor yield period in MilliSecond
+  void setYieldPeriodMsec(uint64_t period) {
+    yield_period_msec_ = period;
+  }
+  // Get Processor yield period in MilliSecond
+  uint64_t getYieldPeriodMsec(void) {
+    return (yield_period_msec_);
+  }
+  // Set Processor penalization period in MilliSecond
+  void setPenalizationPeriodMsec(uint64_t period) {
+    _penalizationPeriodMsec = period;
+  }
+  
+  
+  // Set Processor Maximum Concurrent Tasks
+  void setMaxConcurrentTasks(uint8_t tasks) {
+    max_concurrent_tasks_ = tasks;
+  }
+  // Get Processor Maximum Concurrent Tasks
+  uint8_t getMaxConcurrentTasks(void) {
+    return (max_concurrent_tasks_);
+  }
+  // Set Trigger when empty
+  void setTriggerWhenEmpty(bool value) {
+    _triggerWhenEmpty = value;
+  }
+  // Get Trigger when empty
+  bool getTriggerWhenEmpty(void) {
+    return (_triggerWhenEmpty);
+  }
+  // Get Active Task Counts
+  uint8_t getActiveTasks(void) {
+    return (active_tasks_);
+  }
+  // Increment Active Task Counts
+  void incrementActiveTasks(void) {
+    active_tasks_++;
+  }
+  // decrement Active Task Counts
+  void decrementActiveTask(void) {
+    active_tasks_--;
+  }
+  void clearActiveTask(void) {
+    active_tasks_ = 0;
+  }
+  // Yield based on the yield period
+  void yield() {
+    yield_expiration_ = (getTimeMillis() + yield_period_msec_);
+  }
+  // Yield based on the input time
+  void yield(uint64_t time) {
+    yield_expiration_ = (getTimeMillis() + time);
+  }
+  // whether need be to yield
+  bool isYield() {
+    if (yield_expiration_ > 0)
+      return (yield_expiration_ >= getTimeMillis());
+    else
+      return false;
+  }
+  // clear yield expiration
+  void clearYield() {
+    yield_expiration_ = 0;
+  }
+  // get yield time
+  uint64_t getYieldTime() {
+    uint64_t curTime = getTimeMillis();
+    if (yield_expiration_ > curTime)
+      return (yield_expiration_ - curTime);
+    else
+      return 0;;
+  }
+  // Whether flow file queued in incoming connection
+  bool flowFilesQueued();
+  // Whether flow file queue full in any of the outgoin connection
+  bool flowFilesOutGoingFull();
+
+  // Get outgoing connections based on relationship name
+  std::set<std::shared_ptr<Connection> > getOutGoingConnections(
+      std::string relationship);
+  // Add connection
+  bool addConnection(std::shared_ptr<Connectable> connection);
+  // Remove connection
+  void removeConnection(std::shared_ptr<Connectable> connection);
+  // Get the UUID as string
+  std::string getUUIDStr() {
+    return uuidStr_;
+  }
+  // Get the Next RoundRobin incoming connection
+  std::shared_ptr<Connection> getNextIncomingConnection();
+  // On Trigger
+  void onTrigger(ProcessContext *context,
+                 ProcessSessionFactory *sessionFactory);
+
+  virtual bool canEdit() {
+    return !isRunning();
+  }
+
+ public:
+
+
+  // OnTrigger method, implemented by NiFi Processor Designer
+  virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0;
+  // Initialize, overridden by NiFi Process Designer
+  virtual void initialize() {
+  }
+  // Scheduled event hook, overridden by NiFi Process Designer
+  virtual void onSchedule(ProcessContext *context,
+                          ProcessSessionFactory *sessionFactory) {
+  }
+
+ protected:
+
+  // Processor state
+  std::atomic<ScheduledState> state_;
+
+  // lossTolerant
+  std::atomic<bool> loss_tolerant_;
+  // SchedulePeriod in Nano Seconds
+  std::atomic<uint64_t> scheduling_period_nano_;
+  // Run Duration in Nano Seconds
+  std::atomic<uint64_t> run_durantion_nano_;
+  // Yield Period in Milliseconds
+  std::atomic<uint64_t> yield_period_msec_;
+  
+  // Active Tasks
+  std::atomic<uint8_t> active_tasks_;
+  // Trigger the Processor even if the incoming connection is empty
+  std::atomic<bool> _triggerWhenEmpty;
+
+private:
+
+  // Mutex for protection
+  std::mutex mutex_;
+  // Yield Expiration
+  std::atomic<uint64_t> yield_expiration_;
+  
+
+  // Check all incoming connections for work
+  bool isWorkAvailable();
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  Processor(const Processor &parent);
+  Processor &operator=(const Processor &parent);
+
+};
+
+}
+/* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessorConfig.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
new file mode 100644
index 0000000..6b4a00a
--- /dev/null
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
+#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
+
+#include "core/core.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+
+struct ProcessorConfig {
+  std::string id;
+  std::string name;
+  std::string javaClass;
+  std::string maxConcurrentTasks;
+  std::string schedulingStrategy;
+  std::string schedulingPeriod;
+  std::string penalizationPeriod;
+  std::string yieldPeriod;
+  std::string runDurationNanos;
+  std::vector<std::string> autoTerminatedRelationships;
+  std::vector<core::Property> properties;
+};
+
+
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif /* LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessorNode.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
new file mode 100644
index 0000000..8836f62
--- /dev/null
+++ b/libminifi/include/core/ProcessorNode.h
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_
+#define LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_
+
+#include "ConfigurableComponent.h"
+#include "Connectable.h"
+#include "Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Processor node functions as a pass through to the implementing Connectables
+ * ProcessorNode can be used by itself or with a pass through object, in which case
+ * we need to function as a passthrough or not.
+ */
+class ProcessorNode : public ConfigurableComponent, public Connectable {
+ public:
+  explicit ProcessorNode(const std::shared_ptr<Connectable> processor);
+
+  explicit ProcessorNode(const ProcessorNode &other);
+
+  /**
+   * Get property using the provided name.
+   * @param name property name.
+   * @param value value passed in by reference
+   * @return result of getting property.
+   */
+  std::shared_ptr<Connectable> getProcessor() const {
+    return processor_;
+  }
+
+  void yield() {
+    processor_->yield();
+  }
+
+  /**
+   * Get property using the provided name.
+   * @param name property name.
+   * @param value value passed in by reference
+   * @return result of getting property.
+   */
+  bool getProperty(const std::string name, std::string &value) {
+    const std::shared_ptr<ConfigurableComponent> processor_cast =
+        std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+    if (nullptr != processor_cast)
+      return processor_cast->getProperty(name, value);
+    else {
+      return ConfigurableComponent::getProperty(name, value);
+    }
+  }
+  /**
+   * Sets the property using the provided name
+   * @param property name
+   * @param value property value.
+   * @return result of setting property.
+   */
+  bool setProperty(const std::string name, std::string value) {
+    const std::shared_ptr<ConfigurableComponent> processor_cast =
+        std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+    bool ret = ConfigurableComponent::setProperty(name, value);
+    if (nullptr != processor_cast)
+      ret = processor_cast->setProperty(name, value);
+
+    return ret;
+
+  }
+
+  /**
+   * Sets the property using the provided name
+   * @param property name
+   * @param value property value.
+   * @return whether property was set or not
+   */
+  bool setProperty(Property &prop, std::string value) {
+    const std::shared_ptr<ConfigurableComponent> processor_cast =
+        std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+    bool ret = ConfigurableComponent::setProperty(prop, value);
+    if (nullptr != processor_cast)
+      ret = processor_cast->setProperty(prop, value);
+
+    return ret;
+  }
+
+  /**
+   * Sets supported properties for the ConfigurableComponent
+   * @param supported properties
+   * @return result of set operation.
+   */
+  bool setSupportedProperties(std::set<Property> properties) {
+    const std::shared_ptr<ConfigurableComponent> processor_cast =
+        std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+    bool ret = ConfigurableComponent::setSupportedProperties(properties);
+    if (nullptr != processor_cast)
+      ret = processor_cast->setSupportedProperties(properties);
+
+    return ret;
+  }
+  /**
+   * Sets supported properties for the ConfigurableComponent
+   * @param supported properties
+   * @return result of set operation.
+   */
+
+  bool setAutoTerminatedRelationships(std::set<Relationship> relationships) {
+    return processor_->setAutoTerminatedRelationships(relationships);
+  }
+
+  bool isAutoTerminated(Relationship relationship) {
+    return processor_->isAutoTerminated(relationship);
+  }
+
+  bool setSupportedRelationships(std::set<Relationship> relationships) {
+    return processor_->setSupportedRelationships(relationships);
+  }
+
+  bool isSupportedRelationship(Relationship relationship) {
+    return processor_->isSupportedRelationship(relationship);
+  }
+
+  /**
+   * Set name.
+   * @param name
+   */
+  void setName(const std::string name) {
+    Connectable::setName(name);
+    processor_->setName(name);
+  }
+
+  /**
+   * Set UUID in this instance
+   * @param uuid uuid to apply to the internal representation.
+   */
+  void setUUID(uuid_t uuid) {
+    Connectable::setUUID(uuid);
+    processor_->setUUID(uuid);
+  }
+
+// Get Processor penalization period in MilliSecond
+  uint64_t getPenalizationPeriodMsec(void) {
+    return processor_->getPenalizationPeriodMsec();
+  }
+
+  /**
+   * Get outgoing connection based on relationship
+   * @return set of outgoing connections.
+   */
+  std::set<std::shared_ptr<Connectable>> getOutGoingConnections(
+      std::string relationship) {
+    return processor_->getOutGoingConnections(relationship);
+  }
+
+  /**
+   * Get next incoming connection
+   * @return next incoming connection
+   */
+  std::shared_ptr<Connectable> getNextIncomingConnection() {
+    return processor_->getNextIncomingConnection();
+  }
+
+  /**
+   * @return true if incoming connections > 0
+   */
+  bool hasIncomingConnections() {
+    return processor_->hasIncomingConnections();
+  }
+
+  /**
+   * Returns the UUID through the provided object.
+   * @param uuid uuid struct to which we will copy the memory
+   * @return success of request
+   */
+  bool getUUID(uuid_t uuid) {
+    return processor_->getUUID(uuid);
+  }
+
+  unsigned const char *getUUID() {
+    return processor_->getUUID();
+  }
+  /**
+   * Return the UUID string
+   * @param constant reference to the UUID str
+   */
+  const std::string & getUUIDStr() const {
+    return processor_->getUUIDStr();
+  }
+
+// Get Process Name
+  std::string getName() const {
+    return processor_->getName();
+  }
+
+  uint8_t getMaxConcurrentTasks() {
+    return processor_->getMaxConcurrentTasks();
+  }
+
+  void setMaxConcurrentTasks(const uint8_t tasks) {
+    processor_->setMaxConcurrentTasks(tasks);
+  }
+
+  virtual bool isRunning();
+
+  virtual bool isWorkAvailable();
+
+  virtual ~ProcessorNode();
+
+ protected:
+
+  virtual bool canEdit() {
+    return !processor_->isRunning();
+  }
+
+  /**
+   * internal connectable.
+   */
+  std::shared_ptr<Connectable> processor_;
+
+}
+;
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_ */