You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/10/13 15:07:24 UTC

[06/18] nifi-minifi-cpp git commit: MINIFI-34 Establishing CMake build system to provide build functionality equivalent to pre-existing Makefile.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenSyslog.cpp b/libminifi/src/ListenSyslog.cpp
new file mode 100644
index 0000000..ace37d7
--- /dev/null
+++ b/libminifi/src/ListenSyslog.cpp
@@ -0,0 +1,342 @@
+/**
+ * @file ListenSyslog.cpp
+ * ListenSyslog class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <queue>
+#include <stdio.h>
+#include <string>
+#include "TimeUtil.h"
+#include "ListenSyslog.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ListenSyslog::ProcessorName("ListenSyslog");
+Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
+Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
+Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
+Property ListenSyslog::MaxBatchSize("Max Batch Size",
+		"The maximum number of Syslog events to add to a single FlowFile.", "1");
+Property ListenSyslog::MessageDelimiter("Message Delimiter",
+		"Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n");
+Property ListenSyslog::ParseMessages("Parse Messages",
+		"Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
+Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
+Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
+Relationship ListenSyslog::Success("success", "All files are routed to success");
+Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
+
+void ListenSyslog::initialize()
+{
+	//! Set the supported properties
+	std::set<Property> properties;
+	properties.insert(RecvBufSize);
+	properties.insert(MaxSocketBufSize);
+	properties.insert(MaxConnections);
+	properties.insert(MaxBatchSize);
+	properties.insert(MessageDelimiter);
+	properties.insert(ParseMessages);
+	properties.insert(Protocol);
+	properties.insert(Port);
+	setSupportedProperties(properties);
+	//! Set the supported relationships
+	std::set<Relationship> relationships;
+	relationships.insert(Success);
+	relationships.insert(Invalid);
+	setSupportedRelationships(relationships);
+}
+
+void ListenSyslog::startSocketThread()
+{
+	if (_thread != NULL)
+		return;
+
+	_logger->log_info("ListenSysLog Socket Thread Start");
+	_serverTheadRunning = true;
+	_thread = new std::thread(run, this);
+	_thread->detach();
+}
+
+void ListenSyslog::run(ListenSyslog *process)
+{
+	process->runThread();
+}
+
+void ListenSyslog::runThread()
+{
+	while (_serverTheadRunning)
+	{
+		if (_resetServerSocket)
+		{
+			_resetServerSocket = false;
+			// need to reset the socket
+			std::vector<int>::iterator it;
+			for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+			{
+				int clientSocket = *it;
+				close(clientSocket);
+			}
+			_clientSockets.clear();
+			if (_serverSocket > 0)
+			{
+				close(_serverSocket);
+				_serverSocket = 0;
+			}
+		}
+
+		if (_serverSocket <= 0)
+		{
+			uint16_t portno = _port;
+			struct sockaddr_in serv_addr;
+			int sockfd;
+			if (_protocol == "TCP")
+				sockfd = socket(AF_INET, SOCK_STREAM, 0);
+			else
+				sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+			if (sockfd < 0)
+			{
+				_logger->log_info("ListenSysLog Server socket creation failed");
+				break;
+			}
+			bzero((char *) &serv_addr, sizeof(serv_addr));
+			serv_addr.sin_family = AF_INET;
+			serv_addr.sin_addr.s_addr = INADDR_ANY;
+			serv_addr.sin_port = htons(portno);
+			if (bind(sockfd, (struct sockaddr *) &serv_addr,
+					sizeof(serv_addr)) < 0)
+			{
+				_logger->log_error("ListenSysLog Server socket bind failed");
+				break;
+			}
+			if (_protocol == "TCP")
+				listen(sockfd,5);
+			_serverSocket = sockfd;
+			_logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
+		}
+		FD_ZERO(&_readfds);
+		FD_SET(_serverSocket, &_readfds);
+		_maxFds = _serverSocket;
+		std::vector<int>::iterator it;
+		for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+		{
+			int clientSocket = *it;
+			if (clientSocket >= _maxFds)
+				_maxFds = clientSocket;
+			FD_SET(clientSocket, &_readfds);
+		}
+		fd_set fds;
+		struct timeval tv;
+		int retval;
+		fds = _readfds;
+		tv.tv_sec = 0;
+		// 100 msec
+		tv.tv_usec = 100000;
+		retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
+		if (retval < 0)
+			break;
+		if (retval == 0)
+			continue;
+		if (FD_ISSET(_serverSocket, &fds))
+		{
+			// server socket, either we have UDP datagram or TCP connection request
+			if (_protocol == "TCP")
+			{
+				socklen_t clilen;
+				struct sockaddr_in cli_addr;
+				clilen = sizeof(cli_addr);
+				int newsockfd = accept(_serverSocket,
+						(struct sockaddr *) &cli_addr,
+						&clilen);
+				if (newsockfd > 0)
+				{
+					if (_clientSockets.size() < _maxConnections)
+					{
+						_clientSockets.push_back(newsockfd);
+						_logger->log_info("ListenSysLog new client socket %d connection", newsockfd);
+						continue;
+					}
+					else
+					{
+						close(newsockfd);
+					}
+				}
+			}
+			else
+			{
+				socklen_t clilen;
+				struct sockaddr_in cli_addr;
+				clilen = sizeof(cli_addr);
+				int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
+						(struct sockaddr *)&cli_addr, &clilen);
+				if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize)
+				{
+					uint8_t *payload = new uint8_t[recvlen];
+					memcpy(payload, _buffer, recvlen);
+					putEvent(payload, recvlen);
+				}
+			}
+		}
+		it = _clientSockets.begin();
+		while (it != _clientSockets.end())
+		{
+			int clientSocket = *it;
+			if (FD_ISSET(clientSocket, &fds))
+			{
+				int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer));
+				if (recvlen <= 0)
+				{
+					close(clientSocket);
+					_logger->log_info("ListenSysLog client socket %d close", clientSocket);
+					it = _clientSockets.erase(it);
+				}
+				else
+				{
+					if ((recvlen + getEventQueueByteSize()) <= _recvBufSize)
+					{
+						uint8_t *payload = new uint8_t[recvlen];
+						memcpy(payload, _buffer, recvlen);
+						putEvent(payload, recvlen);
+					}
+					++it;
+				}
+			}
+		}
+	}
+	return;
+}
+
+
+int ListenSyslog::readline( int fd, char *bufptr, size_t len )
+{
+	char *bufx = bufptr;
+	static char *bp;
+	static int cnt = 0;
+	static char b[ 2048 ];
+	char c;
+
+	while ( --len > 0 )
+    {
+      if ( --cnt <= 0 )
+      {
+    	  cnt = recv( fd, b, sizeof( b ), 0 );
+    	  if ( cnt < 0 )
+    	  {
+    		  if ( errno == EINTR )
+    		  {
+    			  len++;		/* the while will decrement */
+    			  continue;
+    		  }
+    		  return -1;
+    	  }
+    	  if ( cnt == 0 )
+    		  return 0;
+    	  bp = b;
+      }
+      c = *bp++;
+      *bufptr++ = c;
+      if ( c == '\n' )
+      {
+    	  *bufptr = '\n';
+    	  return bufptr - bufx + 1;
+      }
+    }
+	return -1;
+}
+
+void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+	std::string value;
+	bool needResetServerSocket = false;
+	if (context->getProperty(Protocol.getName(), value))
+	{
+		if (_protocol != value)
+			needResetServerSocket = true;
+		_protocol = value;
+	}
+	if (context->getProperty(RecvBufSize.getName(), value))
+	{
+		Property::StringToInt(value, _recvBufSize);
+	}
+	if (context->getProperty(MaxSocketBufSize.getName(), value))
+	{
+		Property::StringToInt(value, _maxSocketBufSize);
+	}
+	if (context->getProperty(MaxConnections.getName(), value))
+	{
+		Property::StringToInt(value, _maxConnections);
+	}
+	if (context->getProperty(MessageDelimiter.getName(), value))
+	{
+		_messageDelimiter = value;
+	}
+	if (context->getProperty(ParseMessages.getName(), value))
+	{
+		Property::StringToBool(value, _parseMessages);
+	}
+	if (context->getProperty(Port.getName(), value))
+	{
+		int64_t oldPort = _port;
+		Property::StringToInt(value, _port);
+		if (_port != oldPort)
+			needResetServerSocket = true;
+	}
+	if (context->getProperty(MaxBatchSize.getName(), value))
+	{
+		Property::StringToInt(value, _maxBatchSize);
+	}
+
+	if (needResetServerSocket)
+		_resetServerSocket = true;
+
+	startSocketThread();
+
+	// read from the event queue
+	if (isEventQueueEmpty())
+	{
+		context->yield();
+		return;
+	}
+
+	std::queue<SysLogEvent> eventQueue;
+	pollEvent(eventQueue, _maxBatchSize);
+	bool firstEvent = true;
+	FlowFileRecord *flowFile = NULL;
+	while(!eventQueue.empty())
+	{
+		SysLogEvent event = eventQueue.front();
+		eventQueue.pop();
+		if (firstEvent)
+		{
+			flowFile = session->create();
+			if (!flowFile)
+				return;
+			ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+			session->write(flowFile, &callback);
+			delete[] event.payload;
+			firstEvent = false;
+		}
+		else
+		{
+			ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+			session->append(flowFile, &callback);
+			delete[] event.payload;
+		}
+	}
+	flowFile->addAttribute("syslog.protocol", _protocol);
+	flowFile->addAttribute("syslog.port", std::to_string(_port));
+	session->transfer(flowFile, Success);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/LogAttribute.cpp b/libminifi/src/LogAttribute.cpp
new file mode 100644
index 0000000..82130f8
--- /dev/null
+++ b/libminifi/src/LogAttribute.cpp
@@ -0,0 +1,158 @@
+/**
+ * @file LogAttribute.cpp
+ * LogAttribute class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "TimeUtil.h"
+#include "LogAttribute.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string LogAttribute::ProcessorName("LogAttribute");
+Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info");
+Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", "");
+Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", "");
+Property LogAttribute::LogPayload("Log Payload",
+		"If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", "false");
+Property LogAttribute::LogPrefix("Log prefix",
+		"Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", "");
+Relationship LogAttribute::Success("success", "success operational on the flow record");
+
+void LogAttribute::initialize()
+{
+	//! Set the supported properties
+	std::set<Property> properties;
+	properties.insert(LogLevel);
+	properties.insert(AttributesToLog);
+	properties.insert(AttributesToIgnore);
+	properties.insert(LogPayload);
+	properties.insert(LogPrefix);
+	setSupportedProperties(properties);
+	//! Set the supported relationships
+	std::set<Relationship> relationships;
+	relationships.insert(Success);
+	setSupportedRelationships(relationships);
+}
+
+void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+	std::string dashLine = "--------------------------------------------------";
+	LogAttrLevel level = LogAttrLevelInfo;
+	bool logPayload = false;
+	std::ostringstream message;
+
+	FlowFileRecord *flow = session->get();
+
+	if (!flow)
+		return;
+
+	std::string value;
+	if (context->getProperty(LogLevel.getName(), value))
+	{
+		logLevelStringToEnum(value, level);
+	}
+	if (context->getProperty(LogPrefix.getName(), value))
+	{
+		dashLine = "-----" + value + "-----";
+	}
+	if (context->getProperty(LogPayload.getName(), value))
+	{
+		Property::StringToBool(value, logPayload);
+	}
+
+	message << "Logging for flow file " << "\n";
+	message << dashLine;
+	message << "\nStandard FlowFile Attributes";
+	message << "\n" << "UUID:" << flow->getUUIDStr();
+	message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
+	message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate());
+	message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
+	message << "\nFlowFile Attributes Map Content";
+	std::map<std::string, std::string> attrs = flow->getAttributes();
+    std::map<std::string, std::string>::iterator it;
+    for (it = attrs.begin(); it!= attrs.end(); it++)
+    {
+    	message << "\n" << "key:" << it->first << " value:" << it->second;
+    }
+    message << "\nFlowFile Resource Claim Content";
+    ResourceClaim *claim = flow->getResourceClaim();
+    if (claim)
+    {
+    	message << "\n" << "Content Claim:" << claim->getContentFullPath();
+    }
+    if (logPayload && flow->getSize() <= 1024*1024)
+    {
+    	message << "\n" << "Payload:" << "\n";
+    	ReadCallback callback(flow->getSize());
+    	session->read(flow, &callback);
+    	for (unsigned int i = 0, j = 0; i < callback._readSize; i++)
+    	{
+    		char temp[8];
+    		sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
+    		message << temp;
+    		j++;
+    		if (j == 16)
+    		{
+    			message << '\n';
+    			j = 0;
+    		}
+    	}
+    }
+    message << "\n" << dashLine << std::ends;
+    std::string output = message.str();
+
+    switch (level)
+    {
+    case LogAttrLevelInfo:
+    	_logger->log_info("%s", output.c_str());
+		break;
+    case LogAttrLevelDebug:
+    	_logger->log_debug("%s", output.c_str());
+		break;
+    case LogAttrLevelError:
+    	_logger->log_error("%s", output.c_str());
+		break;
+    case LogAttrLevelTrace:
+    	_logger->log_trace("%s", output.c_str());
+    	break;
+    case LogAttrLevelWarn:
+    	_logger->log_warn("%s", output.c_str());
+    	break;
+    default:
+    	break;
+    }
+
+    // Test Import
+    /*
+    FlowFileRecord *importRecord = session->create();
+    session->import(claim->getContentFullPath(), importRecord);
+    session->transfer(importRecord, Success); */
+
+
+    // Transfer to the relationship
+    session->transfer(flow, Success);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Logger.cpp b/libminifi/src/Logger.cpp
new file mode 100644
index 0000000..984f609
--- /dev/null
+++ b/libminifi/src/Logger.cpp
@@ -0,0 +1,27 @@
+/**
+ * @file Logger.cpp
+ * Logger class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "Logger.h"
+
+Logger *Logger::_logger(NULL);
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
new file mode 100644
index 0000000..70ee9d7
--- /dev/null
+++ b/libminifi/src/ProcessGroup.cpp
@@ -0,0 +1,314 @@
+/**
+ * @file ProcessGroup.cpp
+ * ProcessGroup class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "ProcessGroup.h"
+#include "Processor.h"
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent)
+: _name(name),
+  _type(type),
+  _parentProcessGroup(parent)
+{
+	if (!uuid)
+		// Generate the global UUID for the flow record
+		uuid_generate(_uuid);
+	else
+		uuid_copy(_uuid, uuid);
+
+	_yieldPeriodMsec = 0;
+	_transmitting = false;
+
+	_logger = Logger::getLogger();
+	_logger->log_info("ProcessGroup %s created", _name.c_str());
+}
+
+ProcessGroup::~ProcessGroup()
+{
+	for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it)
+	{
+		Connection *connection = *it;
+		connection->drain();
+		delete connection;
+	}
+
+	for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+	{
+		ProcessGroup *processGroup(*it);
+		delete processGroup;
+	}
+
+	for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+	{
+		Processor *processor(*it);
+		delete processor;
+	}
+}
+
+bool ProcessGroup::isRootProcessGroup()
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+	return (_type == ROOT_PROCESS_GROUP);
+}
+
+void ProcessGroup::addProcessor(Processor *processor)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_processors.find(processor) == _processors.end())
+	{
+		// We do not have the same processor in this process group yet
+		_processors.insert(processor);
+		_logger->log_info("Add processor %s into process group %s",
+				processor->getName().c_str(), _name.c_str());
+	}
+}
+
+void ProcessGroup::removeProcessor(Processor *processor)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_processors.find(processor) != _processors.end())
+	{
+		// We do have the same processor in this process group yet
+		_processors.erase(processor);
+		_logger->log_info("Remove processor %s from process group %s",
+				processor->getName().c_str(), _name.c_str());
+	}
+}
+
+void ProcessGroup::addProcessGroup(ProcessGroup *child)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_childProcessGroups.find(child) == _childProcessGroups.end())
+	{
+		// We do not have the same child process group in this process group yet
+		_childProcessGroups.insert(child);
+		_logger->log_info("Add child process group %s into process group %s",
+				child->getName().c_str(), _name.c_str());
+	}
+}
+
+void ProcessGroup::removeProcessGroup(ProcessGroup *child)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_childProcessGroups.find(child) != _childProcessGroups.end())
+	{
+		// We do have the same child process group in this process group yet
+		_childProcessGroups.erase(child);
+		_logger->log_info("Remove child process group %s from process group %s",
+				child->getName().c_str(), _name.c_str());
+	}
+}
+
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	try
+	{
+		// Start all the processor node, input and output ports
+		for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+		{
+			Processor *processor(*it);
+			if (!processor->isRunning() && processor->getScheduledState() != DISABLED)
+			{
+				if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+					timeScheduler->schedule(processor);
+			}
+		}
+
+		for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+		{
+			ProcessGroup *processGroup(*it);
+			processGroup->startProcessing(timeScheduler);
+		}
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		throw;
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception during process group start processing");
+		throw;
+	}
+}
+
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	try
+	{
+		// Stop all the processor node, input and output ports
+		for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+		{
+			Processor *processor(*it);
+			if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+					timeScheduler->unschedule(processor);
+		}
+
+		for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+		{
+			ProcessGroup *processGroup(*it);
+			processGroup->stopProcessing(timeScheduler);
+		}
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		throw;
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception during process group stop processing");
+		throw;
+	}
+}
+
+Processor *ProcessGroup::findProcessor(uuid_t uuid)
+{
+	Processor *ret = NULL;
+	// std::lock_guard<std::mutex> lock(_mtx);
+
+	for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+	{
+		Processor *processor(*it);
+		uuid_t processorUUID;
+		if (processor->getUUID(processorUUID) && uuid_compare(processorUUID, uuid) == 0)
+			return processor;
+	}
+
+	for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+	{
+		ProcessGroup *processGroup(*it);
+		Processor *processor = processGroup->findProcessor(uuid);
+		if (processor)
+			return processor;
+	}
+
+	return ret;
+}
+
+Processor *ProcessGroup::findProcessor(std::string processorName)
+{
+	Processor *ret = NULL;
+
+	for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+	{
+		Processor *processor(*it);
+		_logger->log_debug("Current processor is %s", processor->getName().c_str());
+		if (processor->getName() == processorName)
+			return processor;
+	}
+
+	for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+	{
+		ProcessGroup *processGroup(*it);
+		Processor *processor = processGroup->findProcessor(processorName);
+		if (processor)
+			return processor;
+	}
+
+	return ret;
+}
+
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+	{
+		Processor *processor(*it);
+		if (processor->getName() == processorName)
+		{
+			processor->setProperty(propertyName, propertyValue);
+		}
+	}
+
+	for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+	{
+		ProcessGroup *processGroup(*it);
+		processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
+	}
+
+	return;
+}
+
+void ProcessGroup::addConnection(Connection *connection)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_connections.find(connection) == _connections.end())
+	{
+		// We do not have the same connection in this process group yet
+		_connections.insert(connection);
+		_logger->log_info("Add connection %s into process group %s",
+				connection->getName().c_str(), _name.c_str());
+		uuid_t sourceUUID;
+		Processor *source = NULL;
+		connection->getSourceProcessorUUID(sourceUUID);
+		source = this->findProcessor(sourceUUID);
+		if (source)
+			source->addConnection(connection);
+		Processor *destination = NULL;
+		uuid_t destinationUUID;
+		connection->getDestinationProcessorUUID(destinationUUID);
+		destination = this->findProcessor(destinationUUID);
+		if (destination && destination != source)
+			destination->addConnection(connection);
+	}
+}
+
+void ProcessGroup::removeConnection(Connection *connection)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_connections.find(connection) != _connections.end())
+	{
+		// We do not have the same connection in this process group yet
+		_connections.erase(connection);
+		_logger->log_info("Remove connection %s into process group %s",
+				connection->getName().c_str(), _name.c_str());
+		uuid_t sourceUUID;
+		Processor *source = NULL;
+		connection->getSourceProcessorUUID(sourceUUID);
+		source = this->findProcessor(sourceUUID);
+		if (source)
+			source->removeConnection(connection);
+		Processor *destination = NULL;
+		uuid_t destinationUUID;
+		connection->getDestinationProcessorUUID(destinationUUID);
+		destination = this->findProcessor(destinationUUID);
+		if (destination && destination != source)
+			destination->removeConnection(connection);
+	}
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
new file mode 100644
index 0000000..4f526c3
--- /dev/null
+++ b/libminifi/src/ProcessSession.cpp
@@ -0,0 +1,731 @@
+/**
+ * @file ProcessSession.cpp
+ * ProcessSession class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "ProcessSession.h"
+
+FlowFileRecord* ProcessSession::create()
+{
+	std::map<std::string, std::string> empty;
+	FlowFileRecord *record = new FlowFileRecord(empty);
+
+	if (record)
+	{
+		_addedFlowFiles[record->getUUIDStr()] = record;
+		_logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+	}
+
+	return record;
+}
+
+FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
+{
+	FlowFileRecord *record = this->create();
+	if (record)
+	{
+		// Copy attributes
+		std::map<std::string, std::string> parentAttributes = parent->getAttributes();
+	    std::map<std::string, std::string>::iterator it;
+	    for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
+	    {
+	    	if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
+	    			it->first == FlowAttributeKey(DISCARD_REASON) ||
+					it->first == FlowAttributeKey(UUID))
+	    		// Do not copy special attributes from parent
+	    		continue;
+	    	record->setAttribute(it->first, it->second);
+	    }
+	    record->_lineageStartDate = parent->_lineageStartDate;
+	    record->_lineageIdentifiers = parent->_lineageIdentifiers;
+	    record->_lineageIdentifiers.insert(parent->_uuidStr);
+
+	}
+	return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
+{
+	FlowFileRecord *record = this->create(parent);
+	if (record)
+	{
+		// Copy Resource Claim
+		record->_claim = parent->_claim;
+		if (record->_claim)
+		{
+			record->_offset = parent->_offset;
+			record->_size = parent->_size;
+			record->_claim->increaseFlowFileRecordOwnedCount();
+		}
+	}
+	return record;
+}
+
+FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
+{
+	std::map<std::string, std::string> empty;
+	FlowFileRecord *record = new FlowFileRecord(empty);
+
+	if (record)
+	{
+		this->_clonedFlowFiles[record->getUUIDStr()] = record;
+		_logger->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
+		// Copy attributes
+		std::map<std::string, std::string> parentAttributes = parent->getAttributes();
+		std::map<std::string, std::string>::iterator it;
+		for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
+		{
+			if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
+	    			it->first == FlowAttributeKey(DISCARD_REASON) ||
+					it->first == FlowAttributeKey(UUID))
+	    		// Do not copy special attributes from parent
+	    		continue;
+	    	record->setAttribute(it->first, it->second);
+	    }
+	    record->_lineageStartDate = parent->_lineageStartDate;
+	    record->_lineageIdentifiers = parent->_lineageIdentifiers;
+	    record->_lineageIdentifiers.insert(parent->_uuidStr);
+
+	    // Copy Resource Claim
+	    record->_claim = parent->_claim;
+	    if (record->_claim)
+	    {
+	    	record->_offset = parent->_offset;
+	    	record->_size = parent->_size;
+	    	record->_claim->increaseFlowFileRecordOwnedCount();
+	    }
+	}
+
+	return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size)
+{
+	FlowFileRecord *record = this->create(parent);
+	if (record)
+	{
+		if (parent->_claim)
+		{
+			if ((offset + size) > (long) parent->_size)
+			{
+				// Set offset and size
+				_logger->log_error("clone offset %d and size %d exceed parent size %d",
+						offset, size, parent->_size);
+				// Remove the Add FlowFile for the session
+				std::map<std::string, FlowFileRecord *>::iterator it =
+						this->_addedFlowFiles.find(record->getUUIDStr());
+				if (it != this->_addedFlowFiles.end())
+					this->_addedFlowFiles.erase(record->getUUIDStr());
+				delete record;
+				return NULL;
+			}
+			record->_offset = parent->_offset + parent->_offset;
+			record->_size = size;
+			// Copy Resource Claim
+			record->_claim = parent->_claim;
+			record->_claim->increaseFlowFileRecordOwnedCount();
+		}
+	}
+	return record;
+}
+
+void ProcessSession::remove(FlowFileRecord *flow)
+{
+	flow->_markedDelete = true;
+	_deletedFlowFiles[flow->getUUIDStr()] = flow;
+}
+
+void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
+{
+	flow->setAttribute(key, value);
+}
+
+void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
+{
+	flow->removeAttribute(key);
+}
+
+void ProcessSession::penalize(FlowFileRecord *flow)
+{
+	flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec();
+}
+
+void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
+{
+	_transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
+{
+	ResourceClaim *claim = NULL;
+
+	claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+
+	try
+	{
+		std::ofstream fs;
+		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+		if (fs.is_open())
+		{
+			// Call the callback to write the content
+			callback->process(&fs);
+			if (fs.good() && fs.tellp() >= 0)
+			{
+				flow->_size = fs.tellp();
+				flow->_offset = 0;
+				if (flow->_claim)
+				{
+					// Remove the old claim
+					flow->_claim->decreaseFlowFileRecordOwnedCount();
+					flow->_claim = NULL;
+				}
+				flow->_claim = claim;
+				claim->increaseFlowFileRecordOwnedCount();
+				/*
+				_logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+				fs.close();
+			}
+			else
+			{
+				fs.close();
+				throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+			}
+		}
+		else
+		{
+			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+		}
+	}
+	catch (std::exception &exception)
+	{
+		if (flow && flow->_claim == claim)
+		{
+			flow->_claim->decreaseFlowFileRecordOwnedCount();
+			flow->_claim = NULL;
+		}
+		if (claim)
+			delete claim;
+		_logger->log_debug("Caught Exception %s", exception.what());
+		throw;
+	}
+	catch (...)
+	{
+		if (flow && flow->_claim == claim)
+		{
+			flow->_claim->decreaseFlowFileRecordOwnedCount();
+			flow->_claim = NULL;
+		}
+		if (claim)
+			delete claim;
+		_logger->log_debug("Caught Exception during process session write");
+		throw;
+	}
+}
+
+void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback)
+{
+	ResourceClaim *claim = NULL;
+
+	if (flow->_claim == NULL)
+	{
+		// No existed claim for append, we need to create new claim
+		return write(flow, callback);
+	}
+
+	claim = flow->_claim;
+
+	try
+	{
+		std::ofstream fs;
+		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
+		if (fs.is_open())
+		{
+			// Call the callback to write the content
+			std::streampos oldPos = fs.tellp();
+			callback->process(&fs);
+			if (fs.good() && fs.tellp() >= 0)
+			{
+				uint64_t appendSize = fs.tellp() - oldPos;
+				flow->_size += appendSize;
+				/*
+				_logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
+						flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+				fs.close();
+			}
+			else
+			{
+				fs.close();
+				throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+			}
+		}
+		else
+		{
+			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+		}
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		throw;
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception during process session append");
+		throw;
+	}
+}
+
+void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
+{
+	try
+	{
+		ResourceClaim *claim = NULL;
+		if (flow->_claim == NULL)
+		{
+			// No existed claim for read, we throw exception
+			throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
+		}
+
+		claim = flow->_claim;
+		std::ifstream fs;
+		fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
+		if (fs.is_open())
+		{
+			fs.seekg(flow->_offset, fs.beg);
+
+			if (fs.good())
+			{
+				callback->process(&fs);
+				/*
+				_logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
+						flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+				fs.close();
+			}
+			else
+			{
+				fs.close();
+				throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
+			}
+		}
+		else
+		{
+			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+		}
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		throw;
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception during process session read");
+		throw;
+	}
+}
+
+void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset)
+{
+	ResourceClaim *claim = NULL;
+
+	claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+	char *buf = NULL;
+	int size = 4096;
+	buf = new char [size];
+
+	try
+	{
+		std::ofstream fs;
+		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+		std::ifstream input;
+		input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+		if (fs.is_open() && input.is_open())
+		{
+			// Open the source file and stream to the flow file
+			input.seekg(offset, fs.beg);
+			while (input.good())
+			{
+				input.read(buf, size);
+				if (input)
+					fs.write(buf, size);
+				else
+					fs.write(buf, input.gcount());
+			}
+
+			if (fs.good() && fs.tellp() >= 0)
+			{
+				flow->_size = fs.tellp();
+				flow->_offset = 0;
+				if (flow->_claim)
+				{
+					// Remove the old claim
+					flow->_claim->decreaseFlowFileRecordOwnedCount();
+					flow->_claim = NULL;
+				}
+				flow->_claim = claim;
+				claim->increaseFlowFileRecordOwnedCount();
+				/*
+				_logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s",
+						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+				fs.close();
+				input.close();
+				if (!keepSource)
+					std::remove(source.c_str());
+			}
+			else
+			{
+				fs.close();
+				input.close();
+				throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+			}
+		}
+		else
+		{
+			throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+		}
+
+		delete[] buf;
+	}
+	catch (std::exception &exception)
+	{
+		if (flow && flow->_claim == claim)
+		{
+			flow->_claim->decreaseFlowFileRecordOwnedCount();
+			flow->_claim = NULL;
+		}
+		if (claim)
+			delete claim;
+		_logger->log_debug("Caught Exception %s", exception.what());
+		delete[] buf;
+		throw;
+	}
+	catch (...)
+	{
+		if (flow && flow->_claim == claim)
+		{
+			flow->_claim->decreaseFlowFileRecordOwnedCount();
+			flow->_claim = NULL;
+		}
+		if (claim)
+			delete claim;
+		_logger->log_debug("Caught Exception during process session write");
+		delete[] buf;
+		throw;
+	}
+}
+
+void ProcessSession::commit()
+{
+	try
+	{
+		// First we clone the flow record based on the transfered relationship for updated flow record
+		std::map<std::string, FlowFileRecord *>::iterator it;
+		for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			if (record->_markedDelete)
+				continue;
+			std::map<std::string, Relationship>::iterator itRelationship =
+					this->_transferRelationship.find(record->getUUIDStr());
+			if (itRelationship != _transferRelationship.end())
+			{
+				Relationship relationship = itRelationship->second;
+				// Find the relationship, we need to find the connections for that relationship
+				std::set<Connection *> connections =
+						_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+				if (connections.empty())
+				{
+					// No connection
+					if (!_processContext->getProcessor()->isAutoTerminated(relationship))
+					{
+						// Not autoterminate, we should have the connect
+						std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
+						throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+					}
+					else
+					{
+						// Autoterminated
+						remove(record);
+					}
+				}
+				else
+				{
+					// We connections, clone the flow and assign the connection accordingly
+					for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
+					{
+						Connection *connection(*itConnection);
+						if (itConnection == connections.begin())
+						{
+							// First connection which the flow need be routed to
+							record->_connection = connection;
+						}
+						else
+						{
+							// Clone the flow file and route to the connection
+							FlowFileRecord *cloneRecord;
+							cloneRecord = this->cloneDuringTransfer(record);
+							if (cloneRecord)
+								cloneRecord->_connection = connection;
+							else
+								throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
+						}
+					}
+				}
+			}
+			else
+			{
+				// Can not find relationship for the flow
+				throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+			}
+		}
+
+		// Do the samething for added flow file
+		for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			if (record->_markedDelete)
+				continue;
+			std::map<std::string, Relationship>::iterator itRelationship =
+					this->_transferRelationship.find(record->getUUIDStr());
+			if (itRelationship != _transferRelationship.end())
+			{
+				Relationship relationship = itRelationship->second;
+				// Find the relationship, we need to find the connections for that relationship
+				std::set<Connection *> connections =
+						_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+				if (connections.empty())
+				{
+					// No connection
+					if (!_processContext->getProcessor()->isAutoTerminated(relationship))
+					{
+						// Not autoterminate, we should have the connect
+						std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
+						throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+					}
+					else
+					{
+						// Autoterminated
+						remove(record);
+					}
+				}
+				else
+				{
+					// We connections, clone the flow and assign the connection accordingly
+					for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
+					{
+						Connection *connection(*itConnection);
+						if (itConnection == connections.begin())
+						{
+							// First connection which the flow need be routed to
+							record->_connection = connection;
+						}
+						else
+						{
+							// Clone the flow file and route to the connection
+							FlowFileRecord *cloneRecord;
+							cloneRecord = this->cloneDuringTransfer(record);
+							if (cloneRecord)
+								cloneRecord->_connection = connection;
+							else
+								throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
+						}
+					}
+				}
+			}
+			else
+			{
+				// Can not find relationship for the flow
+				throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+			}
+		}
+
+		// Complete process the added and update flow files for the session, send the flow file to its queue
+		for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			if (record->_markedDelete)
+			{
+				continue;
+			}
+			if (record->_connection)
+				record->_connection->put(record);
+			else
+				delete record;
+		}
+		for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			if (record->_markedDelete)
+			{
+				continue;
+			}
+			if (record->_connection)
+				record->_connection->put(record);
+			else
+				delete record;
+		}
+		// Process the clone flow files
+		for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			if (record->_markedDelete)
+			{
+				continue;
+			}
+			if (record->_connection)
+				record->_connection->put(record);
+			else
+				delete record;
+		}
+		// Delete the deleted flow files
+		for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			delete record;
+		}
+		// Delete the snapshot
+		for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			delete record;
+		}
+		// All done
+		_updatedFlowFiles.clear();
+		_addedFlowFiles.clear();
+		_clonedFlowFiles.clear();
+		_deletedFlowFiles.clear();
+		_originalFlowFiles.clear();
+		_logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		throw;
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception during process session commit");
+		throw;
+	}
+}
+
+
+void ProcessSession::rollback()
+{
+	try
+	{
+		std::map<std::string, FlowFileRecord *>::iterator it;
+		// Requeue the snapshot of the flowfile back
+		for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			if (record->_orginalConnection)
+			{
+				record->_snapshot = false;
+				record->_orginalConnection->put(record);
+			}
+			else
+				delete record;
+		}
+		_originalFlowFiles.clear();
+		// Process the clone flow files
+		for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			delete record;
+		}
+		_clonedFlowFiles.clear();
+		for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			delete record;
+		}
+		_addedFlowFiles.clear();
+		for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
+		{
+			FlowFileRecord *record = it->second;
+			delete record;
+		}
+		_updatedFlowFiles.clear();
+		_deletedFlowFiles.clear();
+		_logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str());
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		throw;
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception during process session roll back");
+		throw;
+	}
+}
+
+FlowFileRecord *ProcessSession::get()
+{
+	Connection *first = _processContext->getProcessor()->getNextIncomingConnection();
+
+	if (first == NULL)
+		return NULL;
+
+	Connection *current = first;
+
+	do
+	{
+		std::set<FlowFileRecord *> expired;
+		FlowFileRecord *ret = current->poll(expired);
+		if (expired.size() > 0)
+		{
+			// Remove expired flow record
+			for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it)
+			{
+				delete (*it);
+			}
+		}
+		if (ret)
+		{
+			// add the flow record to the current process session update map
+			ret->_markedDelete = false;
+			_updatedFlowFiles[ret->getUUIDStr()] = ret;
+			std::map<std::string, std::string> empty;
+			FlowFileRecord *snapshot = new FlowFileRecord(empty);
+			_logger->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
+			snapshot->duplicate(ret);
+			// save a snapshot
+			_originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
+			return ret;
+		}
+		current = _processContext->getProcessor()->getNextIncomingConnection();
+	}
+	while (current != NULL && current != first);
+
+	return NULL;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
new file mode 100644
index 0000000..cc136dc
--- /dev/null
+++ b/libminifi/src/Processor.cpp
@@ -0,0 +1,451 @@
+/**
+ * @file Processor.cpp
+ * Processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+Processor::Processor(std::string name, uuid_t uuid)
+: _name(name)
+{
+	if (!uuid)
+		// Generate the global UUID for the flow record
+		uuid_generate(_uuid);
+	else
+		uuid_copy(_uuid, uuid);
+
+	char uuidStr[37];
+	uuid_unparse(_uuid, uuidStr);
+	_uuidStr = uuidStr;
+
+	// Setup the default values
+	_state = DISABLED;
+	_strategy = TIMER_DRIVEN;
+	_lossTolerant = false;
+	_triggerWhenEmpty = false;
+	_schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
+	_runDurantionNano = 0;
+	_yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+	_penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+	_maxConcurrentTasks = 1;
+	_activeTasks = 0;
+	_yieldExpiration = 0;
+	_incomingConnectionsIter = this->_incomingConnections.begin();
+	_logger = Logger::getLogger();
+
+	_logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str());
+}
+
+Processor::~Processor()
+{
+
+}
+
+bool Processor::isRunning()
+{
+	return (_state == RUNNING && _activeTasks > 0);
+}
+
+bool Processor::setSupportedProperties(std::set<Property> properties)
+{
+	if (isRunning())
+	{
+		_logger->log_info("Can not set processor property while the process %s is running",
+				_name.c_str());
+		return false;
+	}
+
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	_properties.clear();
+	for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it)
+	{
+		Property item(*it);
+		_properties[item.getName()] = item;
+		_logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str());
+	}
+
+	return true;
+}
+
+bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
+{
+	if (isRunning())
+	{
+		_logger->log_info("Can not set processor supported relationship while the process %s is running",
+				_name.c_str());
+		return false;
+	}
+
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	_relationships.clear();
+	for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
+	{
+		Relationship item(*it);
+		_relationships[item.getName()] = item;
+		_logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str());
+	}
+
+	return true;
+}
+
+bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships)
+{
+	if (isRunning())
+	{
+		_logger->log_info("Can not set processor auto terminated relationship while the process %s is running",
+				_name.c_str());
+		return false;
+	}
+
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	_autoTerminatedRelationships.clear();
+	for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
+	{
+		Relationship item(*it);
+		_autoTerminatedRelationships[item.getName()] = item;
+		_logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str());
+	}
+
+	return true;
+}
+
+bool Processor::isAutoTerminated(Relationship relationship)
+{
+	bool isRun = isRunning();
+
+	if (!isRun)
+		_mtx.lock();
+
+	std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName());
+	if (it != _autoTerminatedRelationships.end())
+	{
+		if (!isRun)
+			_mtx.unlock();
+		return true;
+	}
+	else
+	{
+		if (!isRun)
+			_mtx.unlock();
+		return false;
+	}
+}
+
+bool Processor::isSupportedRelationship(Relationship relationship)
+{
+	bool isRun = isRunning();
+
+	if (!isRun)
+		_mtx.lock();
+
+	std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName());
+	if (it != _relationships.end())
+	{
+		if (!isRun)
+			_mtx.unlock();
+		return true;
+	}
+	else
+	{
+		if (!isRun)
+			_mtx.unlock();
+		return false;
+	}
+}
+
+bool Processor::getProperty(std::string name, std::string &value)
+{
+	bool isRun = isRunning();
+
+	if (!isRun)
+		// Because set property only allowed in non running state, we need to obtain lock avoid rack condition
+		_mtx.lock();
+
+	std::map<std::string, Property>::iterator it = _properties.find(name);
+	if (it != _properties.end())
+	{
+		Property item = it->second;
+		value = item.getValue();
+		if (!isRun)
+			_mtx.unlock();
+		return true;
+	}
+	else
+	{
+		if (!isRun)
+			_mtx.unlock();
+		return false;
+	}
+}
+
+bool Processor::setProperty(std::string name, std::string value)
+{
+
+	std::lock_guard<std::mutex> lock(_mtx);
+	std::map<std::string, Property>::iterator it = _properties.find(name);
+
+	if (it != _properties.end())
+	{
+		Property item = it->second;
+		item.setValue(value);
+		_properties[item.getName()] = item;
+		_logger->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str());
+		return true;
+	}
+	else
+	{
+		return false;
+	}
+}
+
+std::set<Connection *> Processor::getOutGoingConnections(std::string relationship)
+{
+	std::set<Connection *> empty;
+
+	std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship);
+	if (it != _outGoingConnections.end())
+	{
+		return _outGoingConnections[relationship];
+	}
+	else
+	{
+		return empty;
+	}
+}
+
+bool Processor::addConnection(Connection *connection)
+{
+	bool ret = false;
+
+	if (isRunning())
+	{
+		_logger->log_info("Can not add connection while the process %s is running",
+				_name.c_str());
+		return false;
+	}
+
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	uuid_t srcUUID;
+	uuid_t destUUID;
+
+	connection->getSourceProcessorUUID(srcUUID);
+	connection->getDestinationProcessorUUID(destUUID);
+
+	if (uuid_compare(_uuid, destUUID) == 0)
+	{
+		// Connection is destination to the current processor
+		if (_incomingConnections.find(connection) == _incomingConnections.end())
+		{
+			_incomingConnections.insert(connection);
+			connection->setDestinationProcessor(this);
+			_logger->log_info("Add connection %s into Processor %s incoming connection",
+					connection->getName().c_str(), _name.c_str());
+			_incomingConnectionsIter = this->_incomingConnections.begin();
+			ret = true;
+		}
+	}
+
+	if (uuid_compare(_uuid, srcUUID) == 0)
+	{
+		std::string relationship = connection->getRelationship().getName();
+		// Connection is source from the current processor
+		std::map<std::string, std::set<Connection *>>::iterator it =
+				_outGoingConnections.find(relationship);
+		if (it != _outGoingConnections.end())
+		{
+			// We already has connection for this relationship
+			std::set<Connection *> existedConnection = it->second;
+			if (existedConnection.find(connection) == existedConnection.end())
+			{
+				// We do not have the same connection for this relationship yet
+				existedConnection.insert(connection);
+				connection->setSourceProcessor(this);
+				_outGoingConnections[relationship] = existedConnection;
+				_logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
+												connection->getName().c_str(), _name.c_str(), relationship.c_str());
+				ret = true;
+			}
+		}
+		else
+		{
+			// We do not have any outgoing connection for this relationship yet
+			std::set<Connection *> newConnection;
+			newConnection.insert(connection);
+			connection->setSourceProcessor(this);
+			_outGoingConnections[relationship] = newConnection;
+			_logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
+								connection->getName().c_str(), _name.c_str(), relationship.c_str());
+			ret = true;
+		}
+	}
+
+	return ret;
+}
+
+void Processor::removeConnection(Connection *connection)
+{
+	if (isRunning())
+	{
+		_logger->log_info("Can not remove connection while the process %s is running",
+				_name.c_str());
+		return;
+	}
+
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	uuid_t srcUUID;
+	uuid_t destUUID;
+
+	connection->getSourceProcessorUUID(srcUUID);
+	connection->getDestinationProcessorUUID(destUUID);
+
+	if (uuid_compare(_uuid, destUUID) == 0)
+	{
+		// Connection is destination to the current processor
+		if (_incomingConnections.find(connection) != _incomingConnections.end())
+		{
+			_incomingConnections.erase(connection);
+			connection->setDestinationProcessor(NULL);
+			_logger->log_info("Remove connection %s into Processor %s incoming connection",
+					connection->getName().c_str(), _name.c_str());
+			_incomingConnectionsIter = this->_incomingConnections.begin();
+		}
+	}
+
+	if (uuid_compare(_uuid, srcUUID) == 0)
+	{
+		std::string relationship = connection->getRelationship().getName();
+		// Connection is source from the current processor
+		std::map<std::string, std::set<Connection *>>::iterator it =
+				_outGoingConnections.find(relationship);
+		if (it == _outGoingConnections.end())
+		{
+			return;
+		}
+		else
+		{
+			if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end())
+			{
+				_outGoingConnections[relationship].erase(connection);
+				connection->setSourceProcessor(NULL);
+				_logger->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s",
+								connection->getName().c_str(), _name.c_str(), relationship.c_str());
+			}
+		}
+	}
+}
+
+Connection *Processor::getNextIncomingConnection()
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_incomingConnections.size() == 0)
+		return NULL;
+
+	if (_incomingConnectionsIter == _incomingConnections.end())
+		_incomingConnectionsIter = _incomingConnections.begin();
+
+	Connection *ret = *_incomingConnectionsIter;
+	_incomingConnectionsIter++;
+
+	if (_incomingConnectionsIter == _incomingConnections.end())
+		_incomingConnectionsIter = _incomingConnections.begin();
+
+	return ret;
+}
+
+bool Processor::flowFilesQueued()
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	if (_incomingConnections.size() == 0)
+		return false;
+
+	for (std::set<Connection *>::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it)
+	{
+		Connection *connection = *it;
+		if (connection->getQueueSize() > 0)
+			return true;
+	}
+
+	return false;
+}
+
+bool Processor::flowFilesOutGoingFull()
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	std::map<std::string, std::set<Connection *>>::iterator it;
+
+	for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it)
+	{
+		// We already has connection for this relationship
+		std::set<Connection *> existedConnection = it->second;
+		for (std::set<Connection *>::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection)
+		{
+			Connection *connection = *itConnection;
+			if (connection->isFull())
+				return true;
+		}
+	}
+
+	return false;
+}
+
+void Processor::onTrigger()
+{
+	ProcessContext *context = new ProcessContext(this);
+	ProcessSession *session = new ProcessSession(context);
+	try {
+		// Call the child onTrigger function
+		this->onTrigger(context, session);
+		session->commit();
+		delete session;
+		delete context;
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		session->rollback();
+		delete session;
+		delete context;
+		throw;
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception Processor::onTrigger");
+		session->rollback();
+		delete session;
+		delete context;
+		throw;
+	}
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RealTimeDataCollector.cpp b/libminifi/src/RealTimeDataCollector.cpp
new file mode 100644
index 0000000..c7118ff
--- /dev/null
+++ b/libminifi/src/RealTimeDataCollector.cpp
@@ -0,0 +1,482 @@
+/**
+ * @file RealTimeDataCollector.cpp
+ * RealTimeDataCollector class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+
+#include "RealTimeDataCollector.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
+Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real time processor to process", "data.osp");
+Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", "Real Time Server Name", "localhost");
+Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", "Real Time Server Port", "10000");
+Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch Server Name", "localhost");
+Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch Server Port", "10001");
+Property RealTimeDataCollector::ITERATION("Iteration",
+		"If true, sample osp file will be iterated", "true");
+Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real Time Message ID", "41");
+Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message ID", "172, 30, 48");
+Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real Time Data Collection Interval in msec", "10 ms");
+Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch Processing Interval in msec", "100 ms");
+Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144");
+Relationship RealTimeDataCollector::Success("success", "success operational on the flow record");
+
+void RealTimeDataCollector::initialize()
+{
+	//! Set the supported properties
+	std::set<Property> properties;
+	properties.insert(FILENAME);
+	properties.insert(REALTIMESERVERNAME);
+	properties.insert(REALTIMESERVERPORT);
+	properties.insert(BATCHSERVERNAME);
+	properties.insert(BATCHSERVERPORT);
+	properties.insert(ITERATION);
+	properties.insert(REALTIMEMSGID);
+	properties.insert(BATCHMSGID);
+	properties.insert(REALTIMEINTERVAL);
+	properties.insert(BATCHINTERVAL);
+	properties.insert(BATCHMAXBUFFERSIZE);
+
+	setSupportedProperties(properties);
+	//! Set the supported relationships
+	std::set<Relationship> relationships;
+	relationships.insert(Success);
+	setSupportedRelationships(relationships);
+
+}
+
+int RealTimeDataCollector::connectServer(const char *host, uint16_t port)
+{
+	in_addr_t addr;
+	int sock = 0;
+	struct hostent *h;
+#ifdef __MACH__
+	h = gethostbyname(host);
+#else
+	char buf[1024];
+	struct hostent he;
+	int hh_errno;
+	gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+	memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+	sock = socket(AF_INET, SOCK_STREAM, 0);
+	if (sock < 0)
+	{
+		_logger->log_error("Could not create socket to hostName %s", host);
+		return 0;
+	}
+
+#ifndef __MACH__
+	int opt = 1;
+	bool nagle_off = true;
+
+	if (nagle_off)
+	{
+		if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
+		{
+			_logger->log_error("setsockopt() TCP_NODELAY failed");
+			close(sock);
+			return 0;
+		}
+		if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+				(char *)&opt, sizeof(opt)) < 0)
+		{
+			_logger->log_error("setsockopt() SO_REUSEADDR failed");
+			close(sock);
+			return 0;
+		}
+	}
+
+	int sndsize = 256*1024;
+	if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
+	{
+		_logger->log_error("setsockopt() SO_SNDBUF failed");
+		close(sock);
+		return 0;
+	}
+#endif
+
+	struct sockaddr_in sa;
+	socklen_t socklen;
+	int status;
+
+	//TODO bind socket to the interface
+	memset(&sa, 0, sizeof(sa));
+	sa.sin_family = AF_INET;
+	sa.sin_addr.s_addr = htonl(INADDR_ANY);
+	sa.sin_port = htons(0);
+	socklen = sizeof(sa);
+	if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+	{
+		_logger->log_error("socket bind failed");
+		close(sock);
+		return 0;
+	}
+
+	memset(&sa, 0, sizeof(sa));
+	sa.sin_family = AF_INET;
+	sa.sin_addr.s_addr = addr;
+	sa.sin_port = htons(port);
+	socklen = sizeof(sa);
+
+	status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+	if (status < 0)
+	{
+		_logger->log_error("socket connect failed to %s %d", host, port);
+		close(sock);
+		return 0;
+	}
+
+	_logger->log_info("socket %d connect to server %s port %d success", sock, host, port);
+
+	return sock;
+}
+
+int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen)
+{
+	int ret = 0, bytes = 0;
+
+	while (bytes < buflen)
+	{
+		ret = send(socket, buf+bytes, buflen-bytes, 0);
+		//check for errors
+		if (ret == -1)
+		{
+			return ret;
+		}
+		bytes+=ret;
+	}
+
+	if (ret)
+		_logger->log_debug("Send data size %d over socket %d", buflen, socket);
+
+	return ret;
+}
+
+void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, ProcessSession *session)
+{
+	if (_realTimeAccumulated >= this->_realTimeInterval)
+	{
+		std::string value;
+		if (this->getProperty(REALTIMEMSGID.getName(), value))
+		{
+			this->_realTimeMsgID.clear();
+			this->_logger->log_info("Real Time Msg IDs %s", value.c_str());
+			std::stringstream lineStream(value);
+			std::string cell;
+
+			while(std::getline(lineStream, cell, ','))
+		    {
+		        this->_realTimeMsgID.push_back(cell);
+		        // this->_logger->log_debug("Real Time Msg ID %s", cell.c_str());
+		    }
+		}
+		if (this->getProperty(BATCHMSGID.getName(), value))
+		{
+			this->_batchMsgID.clear();
+			this->_logger->log_info("Batch Msg IDs %s", value.c_str());
+			std::stringstream lineStream(value);
+			std::string cell;
+
+			while(std::getline(lineStream, cell, ','))
+		    {
+				cell = Property::trim(cell);
+		        this->_batchMsgID.push_back(cell);
+		        // this->_logger->log_debug("Batch Msg ID %s", cell.c_str());
+		    }
+		}
+		// _logger->log_info("onTriggerRealTime");
+		// Open the file
+		if (!this->_fileStream.is_open())
+		{
+			_fileStream.open(this->_fileName.c_str(), std::ifstream::in);
+			if (this->_fileStream.is_open())
+				_logger->log_debug("open %s", _fileName.c_str());
+		}
+		if (!_fileStream.good())
+		{
+			_logger->log_error("load data file failed %s", _fileName.c_str());
+			return;
+		}
+		if (this->_fileStream.is_open())
+		{
+			std::string line;
+
+			while (std::getline(_fileStream, line))
+			{
+				line += "\n";
+				std::stringstream lineStream(line);
+				std::string cell;
+				if (std::getline(lineStream, cell, ','))
+				{
+					cell = Property::trim(cell);
+					// Check whether it match to the batch traffic
+					for (std::vector<std::string>::iterator it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it)
+					{
+						if (cell == *it)
+						{
+							// push the batch data to the queue
+							std::lock_guard<std::mutex> lock(_mtx);
+							while ((_queuedDataSize + line.size()) > _batchMaxBufferSize)
+							{
+								std::string item = _queue.front();
+								_queuedDataSize -= item.size();
+								_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize);
+								_queue.pop();
+							}
+							_queue.push(line);
+							_queuedDataSize += line.size();
+							_logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize);
+						}
+					}
+					bool findRealTime = false;
+					// Check whether it match to the real time traffic
+					for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it)
+					{
+						if (cell == *it)
+						{
+							int status = 0;
+							if (this->_realTimeSocket <= 0)
+							{
+								// Connect the LTE socket
+								uint16_t port = _realTimeServerPort;
+								this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
+							}
+							if (this->_realTimeSocket)
+							{
+								// try to send the data
+								status = sendData(_realTimeSocket, line.data(), line.size());
+								if (status < 0)
+								{
+									close(_realTimeSocket);
+									_realTimeSocket = 0;
+								}
+							}
+							if (this->_realTimeSocket <= 0 || status < 0)
+							{
+								// push the batch data to the queue
+								std::lock_guard<std::mutex> lock(_mtx);
+								while ((_queuedDataSize + line.size()) > _batchMaxBufferSize)
+								{
+									std::string item = _queue.front();
+									_queuedDataSize -= item.size();
+									_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize);
+									_queue.pop();
+								}
+								_queue.push(line);
+								_queuedDataSize += line.size();
+								_logger->log_debug("Push real time msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize);
+							}
+							// find real time
+							findRealTime = true;
+						} // cell
+					} // for real time pattern
+					if (findRealTime)
+						// we break the while once we find the first real time
+						break;
+				}  // if get line
+			} // while
+			if (_fileStream.eof())
+			{
+				_fileStream.close();
+			}
+		} // if open
+		_realTimeAccumulated = 0;
+	}
+	_realTimeAccumulated += context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, ProcessSession *session)
+{
+	if (_batchAcccumulated >= this->_batchInterval)
+	{
+		// _logger->log_info("onTriggerBatch");
+		// dequeue the batch and send over WIFI
+		int status = 0;
+		if (this->_batchSocket <= 0)
+		{
+			// Connect the WIFI socket
+			uint16_t port = _batchServerPort;
+			this->_batchSocket = connectServer(_batchServerName.c_str(), port);
+		}
+		if (this->_batchSocket)
+		{
+			std::lock_guard<std::mutex> lock(_mtx);
+
+			while (!_queue.empty())
+			{
+				std::string line = _queue.front();
+				status = sendData(_batchSocket, line.data(), line.size());
+				_queue.pop();
+				_queuedDataSize -= line.size();
+				if (status < 0)
+				{
+					close(_batchSocket);
+					_batchSocket = 0;
+					break;
+				}
+			}
+		}
+		_batchAcccumulated = 0;
+	}
+	_batchAcccumulated += context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+	std::thread::id id = std::this_thread::get_id();
+
+	if (id == _realTimeThreadId)
+		return onTriggerRealTime(context, session);
+	else if (id == _batchThreadId)
+		return onTriggerBatch(context, session);
+	else
+	{
+		std::lock_guard<std::mutex> lock(_mtx);
+		if (!this->_firstInvoking)
+		{
+			this->_fileName = "data.osp";
+			std::string value;
+			if (this->getProperty(FILENAME.getName(), value))
+			{
+				this->_fileName = value;
+				this->_logger->log_info("Data Collector File Name %s", _fileName.c_str());
+			}
+			this->_realTimeServerName = "localhost";
+			if (this->getProperty(REALTIMESERVERNAME.getName(), value))
+			{
+				this->_realTimeServerName = value;
+				this->_logger->log_info("Real Time Server Name %s", this->_realTimeServerName.c_str());
+			}
+			this->_realTimeServerPort = 10000;
+			if (this->getProperty(REALTIMESERVERPORT.getName(), value))
+			{
+				Property::StringToInt(value, _realTimeServerPort);
+				this->_logger->log_info("Real Time Server Port %d", _realTimeServerPort);
+			}
+			if (this->getProperty(BATCHSERVERNAME.getName(), value))
+			{
+				this->_batchServerName = value;
+				this->_logger->log_info("Batch Server Name %s", this->_batchServerName.c_str());
+			}
+			this->_batchServerPort = 10001;
+			if (this->getProperty(BATCHSERVERPORT.getName(), value))
+			{
+				Property::StringToInt(value, _batchServerPort);
+				this->_logger->log_info("Batch Server Port %d", _batchServerPort);
+			}
+			if (this->getProperty(ITERATION.getName(), value))
+			{
+				Property::StringToBool(value, this->_iteration);
+				_logger->log_info("Iteration %d", _iteration);
+			}
+			this->_realTimeInterval = 10000000; //10 msec
+			if (this->getProperty(REALTIMEINTERVAL.getName(), value))
+			{
+				TimeUnit unit;
+				if (Property::StringToTime(value, _realTimeInterval, unit) &&
+								Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval))
+				{
+					_logger->log_info("Real Time Interval: [%d] ns", _realTimeInterval);
+				}
+			}
+			this->_batchInterval = 100000000; //100 msec
+			if (this->getProperty(BATCHINTERVAL.getName(), value))
+			{
+				TimeUnit unit;
+				if (Property::StringToTime(value, _batchInterval, unit) &&
+								Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval))
+				{
+					_logger->log_info("Batch Time Interval: [%d] ns", _batchInterval);
+				}
+			}
+			this->_batchMaxBufferSize = 256*1024;
+			if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value))
+			{
+				Property::StringToInt(value, _batchMaxBufferSize);
+				this->_logger->log_info("Batch Max Buffer Size %d", _batchMaxBufferSize);
+			}
+			if (this->getProperty(REALTIMEMSGID.getName(), value))
+			{
+				this->_logger->log_info("Real Time Msg IDs %s", value.c_str());
+				std::stringstream lineStream(value);
+				std::string cell;
+
+				while(std::getline(lineStream, cell, ','))
+			    {
+			        this->_realTimeMsgID.push_back(cell);
+			        this->_logger->log_info("Real Time Msg ID %s", cell.c_str());
+			    }
+			}
+			if (this->getProperty(BATCHMSGID.getName(), value))
+			{
+				this->_logger->log_info("Batch Msg IDs %s", value.c_str());
+				std::stringstream lineStream(value);
+				std::string cell;
+
+				while(std::getline(lineStream, cell, ','))
+			    {
+					cell = Property::trim(cell);
+			        this->_batchMsgID.push_back(cell);
+			        this->_logger->log_info("Batch Msg ID %s", cell.c_str());
+			    }
+			}
+			// Connect the LTE socket
+			uint16_t port = _realTimeServerPort;
+
+			this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
+
+			// Connect the WIFI socket
+			port = _batchServerPort;
+
+			this->_batchSocket = connectServer(_batchServerName.c_str(), port);
+
+			// Open the file
+			_fileStream.open(this->_fileName.c_str(), std::ifstream::in);
+			if (!_fileStream.good())
+			{
+				_logger->log_error("load data file failed %s", _fileName.c_str());
+				return;
+			}
+			else
+			{
+				_logger->log_debug("open %s", _fileName.c_str());
+			}
+			_realTimeThreadId = id;
+			this->_firstInvoking = true;
+		}
+		else
+		{
+			if (id != _realTimeThreadId)
+				_batchThreadId = id;
+			this->_firstInvoking = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
new file mode 100644
index 0000000..9d849ae
--- /dev/null
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -0,0 +1,100 @@
+/**
+ * @file RemoteProcessorGroupPort.cpp
+ * RemoteProcessorGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "TimeUtil.h"
+#include "RemoteProcessorGroupPort.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
+Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost");
+Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+Relationship RemoteProcessorGroupPort::relation;
+
+void RemoteProcessorGroupPort::initialize()
+{
+	//! Set the supported properties
+	std::set<Property> properties;
+	properties.insert(hostName);
+	properties.insert(port);
+	setSupportedProperties(properties);
+	//! Set the supported relationships
+	std::set<Relationship> relationships;
+	relationships.insert(relation);
+	setSupportedRelationships(relationships);
+}
+
+void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+	std::string value;
+
+	if (!_transmitting)
+		return;
+
+	std::string host = _peer->getHostName();
+	uint16_t sport = _peer->getPort();
+	int64_t lvalue;
+	bool needReset = false;
+
+	if (context->getProperty(hostName.getName(), value))
+	{
+		host = value;
+	}
+	if (context->getProperty(port.getName(), value) && Property::StringToInt(value, lvalue))
+	{
+		sport = (uint16_t) lvalue;
+	}
+	if (host != _peer->getHostName())
+	{
+		_peer->setHostName(host);
+		needReset= true;
+	}
+	if (sport != _peer->getPort())
+	{
+		_peer->setPort(sport);
+		needReset = true;
+	}
+	if (needReset)
+		_protocol->tearDown();
+
+	if (!_protocol->bootstrap())
+	{
+		// bootstrap the client protocol if needeed
+		context->yield();
+		_logger->log_error("Site2Site bootstrap failed yield period %d peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), _protocol->getPeer()->getTimeOut());
+		return;
+	}
+
+	if (_direction == RECEIVE)
+		_protocol->receiveFlowFiles(context, session);
+	else
+		_protocol->transferFlowFiles(context, session);
+
+	return;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
new file mode 100644
index 0000000..3c22ac9
--- /dev/null
+++ b/libminifi/src/ResourceClaim.cpp
@@ -0,0 +1,45 @@
+/**
+ * @file ResourceClaim.cpp
+ * ResourceClaim class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "ResourceClaim.h"
+
+std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
+
+ResourceClaim::ResourceClaim(const std::string contentDirectory)
+: _id(_localResourceClaimNumber.load()),
+  _flowFileRecordOwnedCount(0)
+{
+	char uuidStr[37];
+
+	// Generate the global UUID for the resource claim
+	uuid_generate(_uuid);
+	// Increase the local ID for the resource claim
+	++_localResourceClaimNumber;
+	uuid_unparse(_uuid, uuidStr);
+	// Create the full content path for the content
+	_contentFullPath = contentDirectory + "/" + uuidStr;
+
+	_configure = Configure::getConfigure();
+	_logger = Logger::getLogger();
+	_logger->log_debug("Resource Claim created %s", _contentFullPath.c_str());
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
new file mode 100644
index 0000000..211c328
--- /dev/null
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -0,0 +1,86 @@
+/**
+ * @file SchedulingAgent.cpp
+ * SchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <iostream>
+#include "Exception.h"
+#include "SchedulingAgent.h"
+
+bool SchedulingAgent::hasWorkToDo(Processor *processor)
+{
+	// Whether it has work to do
+	if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() ||
+			processor->flowFilesQueued())
+		return true;
+	else
+		return false;
+}
+
+bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor)
+{
+	return processor->flowFilesOutGoingFull();
+}
+
+bool SchedulingAgent::onTrigger(Processor *processor)
+{
+	if (processor->isYield())
+		return false;
+
+	// No need to yield, reset yield expiration to 0
+	processor->clearYield();
+
+	if (!hasWorkToDo(processor))
+		// No work to do, yield
+		return true;
+
+	if(hasTooMuchOutGoing(processor))
+		// need to apply backpressure
+		return true;
+
+	//TODO runDuration
+
+	processor->incrementActiveTasks();
+	try
+	{
+		processor->onTrigger();
+		processor->decrementActiveTask();
+	}
+	catch (Exception &exception)
+	{
+		// Normal exception
+		_logger->log_debug("Caught Exception %s", exception.what());
+		processor->decrementActiveTask();
+	}
+	catch (std::exception &exception)
+	{
+		_logger->log_debug("Caught Exception %s", exception.what());
+		processor->yield(_administrativeYieldDuration);
+		processor->decrementActiveTask();
+	}
+	catch (...)
+	{
+		_logger->log_debug("Caught Exception during SchedulingAgent::onTrigger");
+		processor->yield(_administrativeYieldDuration);
+		processor->decrementActiveTask();
+	}
+
+	return false;
+}
+