You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/07/21 18:45:09 UTC

[50/52] [abbrv] [partial] nifi-minifi-cpp git commit: MINIFI-6: More infra works

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp
new file mode 100644
index 0000000..6aaa969
--- /dev/null
+++ b/src/FlowControlProtocol.cpp
@@ -0,0 +1,540 @@
+/**
+ * @file FlowControlProtocol.cpp
+ * FlowControlProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "FlowController.h"
+#include "FlowControlProtocol.h"
+
+int FlowControlProtocol::connectServer(const char *host, uint16_t port)
+{
+	in_addr_t addr;
+	int sock = 0;
+	struct hostent *h;
+#ifdef __MACH__
+	h = gethostbyname(host);
+#else
+	char buf[1024];
+	struct hostent he;
+	int hh_errno;
+	gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+	memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+	sock = socket(AF_INET, SOCK_STREAM, 0);
+	if (sock < 0)
+	{
+		_logger->log_error("Could not create socket to hostName %s", host);
+		return 0;
+	}
+
+#ifndef __MACH__
+	int opt = 1;
+	bool nagle_off = true;
+
+	if (nagle_off)
+	{
+		if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
+		{
+			_logger->log_error("setsockopt() TCP_NODELAY failed");
+			close(sock);
+			return 0;
+		}
+		if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+				(char *)&opt, sizeof(opt)) < 0)
+		{
+			_logger->log_error("setsockopt() SO_REUSEADDR failed");
+			close(sock);
+			return 0;
+		}
+	}
+
+	int sndsize = 256*1024;
+	if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
+	{
+		_logger->log_error("setsockopt() SO_SNDBUF failed");
+		close(sock);
+		return 0;
+	}
+#endif
+
+	struct sockaddr_in sa;
+	socklen_t socklen;
+	int status;
+
+	memset(&sa, 0, sizeof(sa));
+	sa.sin_family = AF_INET;
+	sa.sin_addr.s_addr = htonl(INADDR_ANY);
+	sa.sin_port = htons(0);
+	socklen = sizeof(sa);
+	if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+	{
+		_logger->log_error("socket bind failed");
+		close(sock);
+		return 0;
+	}
+
+	memset(&sa, 0, sizeof(sa));
+	sa.sin_family = AF_INET;
+	sa.sin_addr.s_addr = addr;
+	sa.sin_port = htons(port);
+	socklen = sizeof(sa);
+
+	status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+	if (status < 0)
+	{
+		_logger->log_error("socket connect failed to %s %d", host, port);
+		close(sock);
+		return 0;
+	}
+
+	_logger->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port);
+
+	return sock;
+}
+
+int FlowControlProtocol::sendData(uint8_t *buf, int buflen)
+{
+	int ret = 0, bytes = 0;
+
+	while (bytes < buflen)
+	{
+		ret = send(_socket, buf+bytes, buflen-bytes, 0);
+		//check for errors
+		if (ret == -1)
+		{
+			return ret;
+		}
+		bytes+=ret;
+	}
+
+	return bytes;
+}
+
+int FlowControlProtocol::selectClient(int msec)
+{
+	fd_set fds;
+	struct timeval tv;
+    int retval;
+    int fd = _socket;
+
+    FD_ZERO(&fds);
+    FD_SET(fd, &fds);
+
+    tv.tv_sec = msec/1000;
+    tv.tv_usec = (msec % 1000) * 1000;
+
+    if (msec > 0)
+       retval = select(fd+1, &fds, NULL, NULL, &tv);
+    else
+       retval = select(fd+1, &fds, NULL, NULL, NULL);
+
+    if (retval <= 0)
+      return retval;
+    if (FD_ISSET(fd, &fds))
+      return retval;
+    else
+      return 0;
+}
+
+int FlowControlProtocol::readData(uint8_t *buf, int buflen)
+{
+	int sendSize = buflen;
+
+	while (buflen)
+	{
+		int status;
+		status = selectClient(MAX_READ_TIMEOUT);
+		if (status <= 0)
+		{
+			return status;
+		}
+#ifndef __MACH__
+		status = read(_socket, buf, buflen);
+#else
+		status = recv(_socket, buf, buflen, 0);
+#endif
+		if (status <= 0)
+		{
+			return status;
+		}
+		buflen -= status;
+		buf += status;
+	}
+
+	return sendSize;
+}
+
+int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr)
+{
+	uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+	uint8_t *data = buffer;
+
+	int status = readData(buffer, sizeof(FlowControlProtocolHeader));
+	if (status <= 0)
+		return status;
+
+	uint32_t value;
+	data = this->decode(data, value);
+	hdr->msgType = value;
+
+	data = this->decode(data, value);
+	hdr->seqNumber = value;
+
+	data = this->decode(data, value);
+	hdr->status = value;
+
+	data = this->decode(data, value);
+	hdr->payloadLen = value;
+
+	return sizeof(FlowControlProtocolHeader);
+}
+
+void FlowControlProtocol::start()
+{
+	if (_running)
+		return;
+	_running = true;
+	_logger->log_info("FlowControl Protocol Start");
+	_thread = new std::thread(run, this);
+	_thread->detach();
+}
+
+void FlowControlProtocol::stop()
+{
+	if (!_running)
+		return;
+	_running = false;
+	_logger->log_info("FlowControl Protocol Stop");
+	delete _thread;
+}
+
+void FlowControlProtocol::run(FlowControlProtocol *protocol)
+{
+	while (protocol->_running)
+	{
+		std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
+		if (!protocol->_registered)
+		{
+			// if it is not register yet
+			protocol->sendRegisterReq();
+			// protocol->_controller->reload("flow.xml");
+		}
+		else
+			protocol->sendReportReq();
+	}
+	return;
+}
+
+int FlowControlProtocol::sendRegisterReq()
+{
+	if (_registered)
+	{
+		_logger->log_info("Already registered");
+		return -1;
+	}
+
+	uint16_t port = this->_serverPort;
+
+	if (this->_socket <= 0)
+		this->_socket = connectServer(_serverName.c_str(), port);
+
+	if (this->_socket <= 0)
+		return -1;
+
+	// Calculate the total payload msg size
+	uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) +
+			FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1);
+	uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+	uint8_t *data = new uint8_t[size];
+	uint8_t *start = data;
+
+	// encode the HDR
+	FlowControlProtocolHeader hdr;
+	hdr.msgType = REGISTER_REQ;
+	hdr.payloadLen = payloadSize;
+	hdr.seqNumber  = this->_seqNumber;
+	hdr.status = RESP_SUCCESS;
+	data = this->encode(data, hdr.msgType);
+	data = this->encode(data, hdr.seqNumber);
+	data = this->encode(data, hdr.status);
+	data = this->encode(data, hdr.payloadLen);
+
+	// encode the serial number
+	data = this->encode(data, FLOW_SERIAL_NUMBER);
+	data = this->encode(data, this->_serialNumber, 8);
+
+	// encode the XML name
+	data = this->encode(data, FLOW_XML_NAME);
+	data = this->encode(data, this->_controller->getName());
+
+	// send it
+	int status = sendData(start, size);
+	delete[] start;
+	if (status <= 0)
+	{
+		close(_socket);
+		_socket = 0;
+		_logger->log_error("Flow Control Protocol Send Register Req failed");
+		return -1;
+	}
+
+	// Looking for register respond
+	status = readHdr(&hdr);
+
+	if (status <= 0)
+	{
+		close(_socket);
+		_socket = 0;
+		_logger->log_error("Flow Control Protocol Read Register Resp header failed");
+		return -1;
+	}
+	_logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+	_logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
+	_logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+	_logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
+
+	if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+	{
+		this->_registered = true;
+		this->_seqNumber++;
+		_logger->log_info("Flow Control Protocol Register success");
+		uint8_t *payload = new uint8_t[hdr.payloadLen];
+		uint8_t *payloadPtr = payload;
+		status = readData(payload, hdr.payloadLen);
+		if (status <= 0)
+		{
+			delete[] payload;
+			_logger->log_info("Flow Control Protocol Register Read Payload fail");
+			close(_socket);
+			_socket = 0;
+			return -1;
+		}
+		while (payloadPtr < (payload + hdr.payloadLen))
+		{
+			uint32_t msgID;
+			payloadPtr = this->decode(payloadPtr, msgID);
+			if (((FlowControlMsgID) msgID) == REPORT_INTERVAL)
+			{
+				// Fixed 4 bytes
+				uint32_t reportInterval;
+				payloadPtr = this->decode(payloadPtr, reportInterval);
+				_logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval);
+				this->_reportInterval = reportInterval;
+			}
+			else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT)
+			{
+				uint32_t xmlLen;
+				payloadPtr = this->decode(payloadPtr, xmlLen);
+				_logger->log_info("Flow Control Protocol receive XML content length %d", xmlLen);
+				time_t rawtime;
+				struct tm *timeinfo;
+				time(&rawtime);
+				timeinfo = localtime(&rawtime);
+				std::string xmlFileName = "flow.";
+				xmlFileName += asctime(timeinfo);
+				xmlFileName += ".xml";
+				std::ofstream fs;
+				fs.open(xmlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+				if (fs.is_open())
+				{
+					fs.write((const char *)payloadPtr, xmlLen);
+					fs.close();
+					this->_controller->reload(xmlFileName.c_str());
+				}
+			}
+			else
+			{
+				break;
+			}
+		}
+		delete[] payload;
+		close(_socket);
+		_socket = 0;
+		return 0;
+	}
+	else
+	{
+		_logger->log_info("Flow Control Protocol Register fail");
+		close(_socket);
+		_socket = 0;
+		return -1;
+	}
+}
+
+
+int FlowControlProtocol::sendReportReq()
+{
+	uint16_t port = this->_serverPort;
+
+	if (this->_socket <= 0)
+		this->_socket = connectServer(_serverName.c_str(), port);
+
+	if (this->_socket <= 0)
+		return -1;
+
+	// Calculate the total payload msg size
+	uint32_t payloadSize =
+			FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1);
+	uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+	uint8_t *data = new uint8_t[size];
+	uint8_t *start = data;
+
+	// encode the HDR
+	FlowControlProtocolHeader hdr;
+	hdr.msgType = REPORT_REQ;
+	hdr.payloadLen = payloadSize;
+	hdr.seqNumber  = this->_seqNumber;
+	hdr.status = RESP_SUCCESS;
+	data = this->encode(data, hdr.msgType);
+	data = this->encode(data, hdr.seqNumber);
+	data = this->encode(data, hdr.status);
+	data = this->encode(data, hdr.payloadLen);
+
+	// encode the XML name
+	data = this->encode(data, FLOW_XML_NAME);
+	data = this->encode(data, this->_controller->getName());
+
+	// send it
+	int status = sendData(start, size);
+	delete[] start;
+	if (status <= 0)
+	{
+		close(_socket);
+		_socket = 0;
+		_logger->log_error("Flow Control Protocol Send Report Req failed");
+		return -1;
+	}
+
+	// Looking for report respond
+	status = readHdr(&hdr);
+
+	if (status <= 0)
+	{
+		close(_socket);
+		_socket = 0;
+		_logger->log_error("Flow Control Protocol Read Report Resp header failed");
+		return -1;
+	}
+	_logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+	_logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
+	_logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+	_logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
+
+	if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+	{
+		this->_seqNumber++;
+		uint8_t *payload = new uint8_t[hdr.payloadLen];
+		uint8_t *payloadPtr = payload;
+		status = readData(payload, hdr.payloadLen);
+		if (status <= 0)
+		{
+			delete[] payload;
+			_logger->log_info("Flow Control Protocol Report Resp Read Payload fail");
+			close(_socket);
+			_socket = 0;
+			return -1;
+		}
+		std::string processor;
+		std::string propertyName;
+		std::string propertyValue;
+		while (payloadPtr < (payload + hdr.payloadLen))
+		{
+			uint32_t msgID;
+			payloadPtr = this->decode(payloadPtr, msgID);
+			if (((FlowControlMsgID) msgID) == PROCESSOR_NAME)
+			{
+				uint32_t len;
+				payloadPtr = this->decode(payloadPtr, len);
+				processor = (const char *) payloadPtr;
+				payloadPtr += len;
+				_logger->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str());
+			}
+			else if (((FlowControlMsgID) msgID) == PROPERTY_NAME)
+			{
+				uint32_t len;
+				payloadPtr = this->decode(payloadPtr, len);
+				propertyName = (const char *) payloadPtr;
+				payloadPtr += len;
+				_logger->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str());
+			}
+			else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE)
+			{
+				uint32_t len;
+				payloadPtr = this->decode(payloadPtr, len);
+				propertyValue = (const char *) payloadPtr;
+				payloadPtr += len;
+				_logger->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str());
+				this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
+			}
+			else
+			{
+				break;
+			}
+		}
+		delete[] payload;
+		close(_socket);
+		_socket = 0;
+		return 0;
+	}
+	else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber)
+	{
+		_logger->log_info("Flow Control Protocol trigger reregister");
+		this->_registered = false;
+		this->_seqNumber++;
+		close(_socket);
+		_socket = 0;
+		return 0;
+	}
+	else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber)
+	{
+		_logger->log_info("Flow Control Protocol stop flow controller");
+		this->_controller->stop(true);
+		this->_seqNumber++;
+		close(_socket);
+		_socket = 0;
+		return 0;
+	}
+	else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber)
+	{
+		_logger->log_info("Flow Control Protocol start flow controller");
+		this->_controller->start();
+		this->_seqNumber++;
+		close(_socket);
+		_socket = 0;
+		return 0;
+	}
+	else
+	{
+		_logger->log_info("Flow Control Protocol Report fail");
+		close(_socket);
+		_socket = 0;
+		return -1;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index 0525ccc..a2cafbc 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -44,6 +44,7 @@ FlowController::FlowController(std::string name)
 	_initialized = false;
 	_root = NULL;
 	_logger = Logger::getLogger();
+	_protocol = new FlowControlProtocol(this);
 
 	// NiFi config properties
 	_configure = Configure::getConfigure();
@@ -58,6 +59,7 @@ FlowController::~FlowController()
 {
 	stop(true);
 	unload();
+	delete _protocol;
 }
 
 bool FlowController::isRunning()
@@ -75,6 +77,11 @@ void FlowController::stop(bool force)
 	if (_running)
 	{
 		_logger->log_info("Stop Flow Controller");
+		this->_timerScheduler.stop();
+		// Wait for sometime for thread stop
+		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+		if (this->_root)
+			this->_root->stopProcessing(&this->_timerScheduler);
 		_running = false;
 	}
 }
@@ -88,12 +95,36 @@ void FlowController::unload()
 	if (_initialized)
 	{
 		_logger->log_info("Unload Flow Controller");
+		if (_root)
+			delete _root;
+		_root = NULL;
 		_initialized = false;
+		_name = "";
 	}
 
 	return;
 }
 
+void FlowController::reload(std::string xmlFile)
+{
+	_logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str());
+	stop(true);
+	unload();
+	std::string oldxmlFile = this->_xmlFileName;
+	this->_xmlFileName = xmlFile;
+	load();
+	start();
+	if (!this->_root)
+	{
+		this->_xmlFileName = oldxmlFile;
+		_logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str());
+		stop(true);
+		unload();
+		load();
+		start();
+	}
+}
+
 Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
 {
 	Processor *processor = NULL;
@@ -291,6 +322,7 @@ void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node)
 					}
 					// Set the root process group
 					this->_root = group;
+					this->_name = name;
 					xmlFree(name);
 				}
 			}
@@ -504,6 +536,20 @@ void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, Pro
 					xmlFree(temp);
 				}
 			}
+			else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0)
+			{
+				char *temp = (char *) xmlNodeGetContent(currentNode);
+				if (temp)
+				{
+					int64_t maxConcurrentTasks;
+					if (Property::StringToInt(temp, maxConcurrentTasks))
+					{
+						_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+						processor->setMaxConcurrentTasks(maxConcurrentTasks);
+					}
+					xmlFree(temp);
+				}
+			}
 			else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0)
 			{
 				char *temp = (char *) xmlNodeGetContent(currentNode);
@@ -548,12 +594,13 @@ void FlowController::load()
 	}
 	if (!_initialized)
 	{
-		_logger->log_info("Load Flow Controller");
+		_logger->log_info("Load Flow Controller from file %s", _xmlFileName.c_str());
 
 		xmlDoc *doc = xmlReadFile(_xmlFileName.c_str(), NULL, XML_PARSE_NONET);
 		if (doc == NULL)
 		{
 			_logger->log_error("xmlReadFile returned NULL when reading [%s]", _xmlFileName.c_str());
+			_initialized = true;
 			return;
 		}
 
@@ -637,6 +684,7 @@ bool FlowController::start()
 			if (this->_root)
 				this->_root->startProcessing(&this->_timerScheduler);
 			_running = true;
+			this->_protocol->start();
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp
index 601dcde..2dda47a 100644
--- a/src/FlowFileRecord.cpp
+++ b/src/FlowFileRecord.cpp
@@ -22,6 +22,9 @@
 #include <map>
 #include <sys/time.h>
 #include <time.h>
+#include <iostream>
+#include <fstream>
+#include <cstdio>
 
 #include "FlowFileRecord.h"
 #include "Relationship.h"
@@ -83,6 +86,7 @@ FlowFileRecord::~FlowFileRecord()
 		if (_claim->getFlowFileRecordOwnedCount() == 0)
 		{
 			_logger->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str());
+			std::remove(_claim->getContentFullPath().c_str());
 			delete _claim;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp
index f4548af..67ed74e 100644
--- a/src/LogAttribute.cpp
+++ b/src/LogAttribute.cpp
@@ -25,6 +25,7 @@
 #include <time.h>
 #include <sstream>
 #include <string.h>
+#include <iostream>
 
 #include "TimeUtil.h"
 #include "LogAttribute.h"
@@ -103,6 +104,24 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
     {
     	message << "\n" << "Content Claim:" << claim->getContentFullPath();
     }
+    if (logPayload && flow->getSize() <= 1024*1024)
+    {
+    	message << "\n" << "Payload:" << "\n";
+    	ReadCallback callback(flow->getSize());
+    	session->read(flow, &callback);
+    	for (int i = 0, j = 0; i < callback._readSize; i++)
+    	{
+    		char temp[8];
+    		sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
+    		message << temp;
+    		j++;
+    		if (j == 16)
+    		{
+    			message << '\n';
+    			j = 0;
+    		}
+    	}
+    }
     message << "\n" << dashLine << std::ends;
     std::string output = message.str();
 
@@ -127,6 +146,12 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
     	break;
     }
 
+    // Test Import
+    FlowFileRecord *importRecord = session->create();
+    session->import(claim->getContentFullPath(), importRecord);
+    session->transfer(importRecord, Success);
+
+
     // Transfer to the relationship
     session->transfer(flow, Success);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp
index 78ae1a4..a5fd773 100644
--- a/src/ProcessGroup.cpp
+++ b/src/ProcessGroup.cpp
@@ -46,7 +46,24 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
 
 ProcessGroup::~ProcessGroup()
 {
+	for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it)
+	{
+		Connection *connection = *it;
+		connection->drain();
+		delete connection;
+	}
+
+	for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+	{
+		ProcessGroup *processGroup(*it);
+		delete processGroup;
+	}
 
+	for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+	{
+		Processor *processor(*it);
+		delete processor;
+	}
 }
 
 bool ProcessGroup::isRootProcessGroup()
@@ -198,6 +215,28 @@ Processor *ProcessGroup::findProcessor(uuid_t uuid)
 	return ret;
 }
 
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+	{
+		Processor *processor(*it);
+		if (processor->getName() == processorName)
+		{
+			processor->setProperty(propertyName, propertyValue);
+		}
+	}
+
+	for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+	{
+		ProcessGroup *processGroup(*it);
+		processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
+	}
+
+	return;
+}
+
 void ProcessGroup::addConnection(Connection *connection)
 {
 	std::lock_guard<std::mutex> lock(_mtx);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp
index cf8249d..32d8920 100644
--- a/src/ProcessSession.cpp
+++ b/src/ProcessSession.cpp
@@ -207,8 +207,9 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
 				}
 				flow->_claim = claim;
 				claim->increaseFlowFileRecordOwnedCount();
+				/*
 				_logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
-						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
 				fs.close();
 			}
 			else
@@ -273,8 +274,9 @@ void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback
 			{
 				uint64_t appendSize = fs.tellp() - oldPos;
 				flow->_size += appendSize;
+				/*
 				_logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
-						flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+						flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
 				fs.close();
 			}
 			else
@@ -321,8 +323,9 @@ void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
 			if (fs.good())
 			{
 				callback->process(&fs);
+				/*
 				_logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
-						flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+						flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
 				fs.close();
 			}
 			else
@@ -348,6 +351,94 @@ void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
 	}
 }
 
+void ProcessSession::import(std::string source, FlowFileRecord *flow)
+{
+	ResourceClaim *claim = NULL;
+
+	claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+	char *buf = NULL;
+	int size = 4096;
+	buf = new char [size];
+
+	try
+	{
+		std::ofstream fs;
+		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+		std::ifstream input;
+		input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+		if (fs.is_open() && input.is_open())
+		{
+			// Open the source file and stream to the flow file
+			while (input.good())
+			{
+				input.read(buf, size);
+				if (input)
+					fs.write(buf, size);
+				else
+					fs.write(buf, input.gcount());
+			}
+
+			if (fs.good() && fs.tellp() >= 0)
+			{
+				flow->_size = fs.tellp();
+				flow->_offset = 0;
+				if (flow->_claim)
+				{
+					// Remove the old claim
+					flow->_claim->decreaseFlowFileRecordOwnedCount();
+					flow->_claim = NULL;
+				}
+				flow->_claim = claim;
+				claim->increaseFlowFileRecordOwnedCount();
+				/*
+				_logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s",
+						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+				fs.close();
+				input.close();
+			}
+			else
+			{
+				fs.close();
+				input.close();
+				throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+			}
+		}
+		else
+		{
+			throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+		}
+
+		delete[] buf;
+	}
+	catch (std::exception &exception)
+	{
+		if (flow && flow->_claim == claim)
+		{
+			flow->_claim->decreaseFlowFileRecordOwnedCount();
+			flow->_claim = NULL;
+		}
+		if (claim)
+			delete claim;
+		_logger->log_debug("Caught Exception %s", exception.what());
+		delete[] buf;
+		throw;
+	}
+	catch (...)
+	{
+		if (flow && flow->_claim == claim)
+		{
+			flow->_claim->decreaseFlowFileRecordOwnedCount();
+			flow->_claim = NULL;
+		}
+		if (claim)
+			delete claim;
+		_logger->log_debug("Caught Exception during process session write");
+		delete[] buf;
+		throw;
+	}
+}
+
 void ProcessSession::commit()
 {
 	try

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/src/Processor.cpp b/src/Processor.cpp
index 5c695af..d5e1a6a 100644
--- a/src/Processor.cpp
+++ b/src/Processor.cpp
@@ -40,7 +40,7 @@ Processor::Processor(std::string name, uuid_t uuid)
 		uuid_copy(_uuid, uuid);
 
 	char uuidStr[37];
-	uuid_parse(uuidStr, _uuid);
+	uuid_unparse(_uuid, uuidStr);
 	_uuidStr = uuidStr;
 
 	// Setup the default values
@@ -58,7 +58,7 @@ Processor::Processor(std::string name, uuid_t uuid)
 	_incomingConnectionsIter = this->_incomingConnections.begin();
 	_logger = Logger::getLogger();
 
-	_logger->log_info("Processor %s created", _name.c_str());
+	_logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str());
 }
 
 Processor::~Processor()
@@ -208,13 +208,6 @@ bool Processor::getProperty(std::string name, std::string &value)
 
 bool Processor::setProperty(std::string name, std::string value)
 {
-	if (isRunning())
-	{
-		_logger->log_info("Can not set processor property while the process %s is running",
-				_name.c_str());
-		return false;
-	}
-
 	std::lock_guard<std::mutex> lock(_mtx);
 	std::map<std::string, Property>::iterator it = _properties.find(name);
 	if (it != _properties.end())

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/SchedulingAgent.cpp b/src/SchedulingAgent.cpp
index 45f5d10..211c328 100644
--- a/src/SchedulingAgent.cpp
+++ b/src/SchedulingAgent.cpp
@@ -60,6 +60,7 @@ bool SchedulingAgent::onTrigger(Processor *processor)
 	try
 	{
 		processor->onTrigger();
+		processor->decrementActiveTask();
 	}
 	catch (Exception &exception)
 	{

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/TimerDrivenSchedulingAgent.cpp b/src/TimerDrivenSchedulingAgent.cpp
index a5f8f3c..3ce57ae 100644
--- a/src/TimerDrivenSchedulingAgent.cpp
+++ b/src/TimerDrivenSchedulingAgent.cpp
@@ -74,7 +74,7 @@ void TimerDrivenSchedulingAgent::schedule(Processor *processor)
 		_logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(),
 				processor->getName().c_str());
 	}
-	_threads[processor->getName().c_str()] = threads;
+	_threads[processor->getUUIDStr().c_str()] = threads;
 
 	return;
 }
@@ -105,6 +105,7 @@ void TimerDrivenSchedulingAgent::unschedule(Processor *processor)
 		delete thread;
 	}
 	_threads.erase(processor->getUUIDStr());
+	processor->clearActiveTask();
 
 	return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/target/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/target/conf/nifi.properties b/target/conf/nifi.properties
index c4f7dff..b1902b1 100644
--- a/target/conf/nifi.properties
+++ b/target/conf/nifi.properties
@@ -183,3 +183,9 @@ nifi.cluster.manager.safemode.duration=0 sec
 
 # kerberos #
 nifi.kerberos.krb5.file=
+
+# Server
+nifi.server.name=localhost
+nifi.server.port=9000
+nifi.server.report.interval=1000 ms
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/test/Server.cpp
----------------------------------------------------------------------
diff --git a/test/Server.cpp b/test/Server.cpp
new file mode 100644
index 0000000..e7b3452
--- /dev/null
+++ b/test/Server.cpp
@@ -0,0 +1,607 @@
+/* A simple server in the internet domain using TCP
+   The port number is passed as an argument */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/types.h> 
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <iostream>     // std::cout
+#include <fstream>      // std::ifstream
+#include <signal.h>
+
+#define DEFAULT_NIFI_SERVER_PORT 9000
+#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
+#define MAX_READ_TIMEOUT 30000 // 30 seconds
+
+//! FlowControl Protocol Msg Type
+typedef enum {
+	REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
+	REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
+	REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
+	REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
+	MAX_FLOW_CONTROL_MSG_TYPE
+} FlowControlMsgType;
+
+//! FlowControl Protocol Msg Type String
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
+{
+		"REGISTER_REQ",
+		"REGISTER_RESP",
+		"REPORT_REQ",
+		"REPORT_RESP"
+};
+
+//! Flow Control Msg Type to String
+inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
+{
+	if (type < MAX_FLOW_CONTROL_MSG_TYPE)
+		return FlowControlMsgTypeStr[type];
+	else
+		return NULL;
+}
+
+//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
+typedef enum {
+	//Fix length 8 bytes: client to server in register request, required field
+	FLOW_SERIAL_NUMBER,
+	// Flow XML name TLV: client to server in register request and report request, required field
+	FLOW_XML_NAME,
+	// Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
+	FLOW_XML_CONTENT,
+	// Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
+	REPORT_INTERVAL,
+	// Processor Name TLV:  server to client in report respond, option field in case server want to ask client to update processor property
+	PROCESSOR_NAME,
+	// Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
+	PROPERTY_NAME,
+	// Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
+	PROPERTY_VALUE,
+	// Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
+	REPORT_BLOB,
+	MAX_FLOW_MSG_ID
+} FlowControlMsgID;
+
+//! FlowControl Protocol Msg ID String
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
+{
+		"FLOW_SERIAL_NUMBER",
+		"FLOW_XML_NAME",
+		"FLOW_XML_CONTENT",
+		"REPORT_INTERVAL",
+		"PROCESSOR_NAME"
+		"PROPERTY_NAME",
+		"PROPERTY_VALUE",
+		"REPORT_BLOB"
+};
+
+#define TYPE_HDR_LEN 4 // Fix Hdr Type
+#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
+
+//! FlowControl Protocol Msg Len
+inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
+{
+	if (id == FLOW_SERIAL_NUMBER)
+		return (TYPE_HDR_LEN + 8);
+	else if (id == REPORT_INTERVAL)
+		return (TYPE_HDR_LEN + 4);
+	else if (id < MAX_FLOW_MSG_ID)
+		return (TLV_HDR_LEN + payLoadLen);
+	else
+		return -1;
+}
+
+//! Flow Control Msg Id to String
+inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
+{
+	if (id < MAX_FLOW_MSG_ID)
+		return FlowControlMsgIDStr[id];
+	else
+		return NULL;
+}
+
+//! Flow Control Respond status code
+typedef enum {
+	RESP_SUCCESS,
+	RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
+	RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
+	RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
+	RESP_FAILURE,
+	MAX_RESP_CODE
+} FlowControlRespCode;
+
+//! FlowControl Resp Code str
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
+{
+		"RESP_SUCCESS",
+		"RESP_TRIGGER_REGISTER",
+		"RESP_START_FLOW_CONTROLLER",
+		"RESP_STOP_FLOW_CONTROLLER",
+		"RESP_FAILURE"
+};
+
+//! Flow Control Resp Code to String
+inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
+{
+	if (code < MAX_RESP_CODE)
+		return FlowControlRespCodeStr[code];
+	else
+		return NULL;
+}
+
+//! Common FlowControlProtocol Header
+typedef struct {
+	uint32_t msgType; //! Msg Type
+	uint32_t seqNumber; //! Seq Number to match Req with Resp
+	uint32_t status; //! Resp Code, see FlowControlRespCode
+	uint32_t payloadLen; //! Msg Payload length
+} FlowControlProtocolHeader;
+
+
+//! encode uint32_t
+uint8_t *encode(uint8_t *buf, uint32_t value)
+{
+		*buf++ = (value & 0xFF000000) >> 24;
+		*buf++ = (value & 0x00FF0000) >> 16;
+		*buf++ = (value & 0x0000FF00) >> 8;
+		*buf++ = (value & 0x000000FF);
+		return buf;
+}
+
+//! encode uint32_t
+uint8_t *decode(uint8_t *buf, uint32_t &value)
+{
+		value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
+		return (buf + 4);
+}
+
+//! encode byte array
+uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
+{
+		memcpy(buf, bufArray, size);
+		buf += size;
+		return buf;
+}
+
+//! encode std::string
+uint8_t *encode(uint8_t *buf, std::string value)
+{
+		// add the \0 for size
+		buf = encode(buf, value.size()+1);
+		buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
+		return buf;
+}
+
+int sendData(int socket, uint8_t *buf, int buflen)
+{
+	int ret = 0, bytes = 0;
+
+	while (bytes < buflen)
+	{
+		ret = send(socket, buf+bytes, buflen-bytes, 0);
+		//check for errors
+		if (ret == -1)
+		{
+			return ret;
+		}
+		bytes+=ret;
+	}
+
+	return bytes;
+}
+
+void error(const char *msg)
+{
+    perror(msg);
+    exit(1);
+}
+
+/* readline - read a '\n' terminated line from socket fd 
+              into buffer bufptr of size len. The line in the
+              buffer is terminated with '\0'.
+              It returns -1 in case of error or if
+              the capacity of the buffer is exceeded.
+	      It returns 0 if EOF is encountered before reading '\n'.
+ */
+int readline( int fd, char *bufptr, size_t len )
+{
+  /* Note that this function is very tricky.  It uses the
+     static variables bp, cnt, and b to establish a local buffer.
+     The recv call requests large chunks of data (the size of the buffer).
+     Then if the recv call reads more than one line, the overflow
+     remains in the buffer and it is made available to the next call
+     to readline. 
+     Notice also that this routine reads up to '\n' and overwrites
+     it with '\0'. Thus if the line is really terminated with
+     "\r\n", the '\r' will remain unchanged.
+  */
+  char *bufx = bufptr;
+  static char *bp;
+  static int cnt = 0;
+  static char b[ 4096 ];
+  char c;
+  
+  while ( --len > 0 )
+    {
+      if ( --cnt <= 0 )
+	{
+	  cnt = recv( fd, b, sizeof( b ), 0 );
+	  if ( cnt < 0 )
+	    {
+	      if ( errno == EINTR )
+		{
+		  len++;		/* the while will decrement */
+		  continue;
+		}
+	      return -1;
+	    }
+	  if ( cnt == 0 )
+	    return 0;
+	  bp = b;
+	}
+      c = *bp++;
+      *bufptr++ = c;
+      if ( c == '\n' )
+	{
+	  *bufptr = '\0';
+	  return bufptr - bufx;
+	}
+    }
+  return -1;
+}
+
+int readData(int socket, uint8_t *buf, int buflen)
+{
+	int sendSize = buflen;
+	int status;
+
+	while (buflen)
+	{
+#ifndef __MACH__
+		status = read(socket, buf, buflen);
+#else
+		status = recv(socket, buf, buflen, 0);
+#endif
+		if (status <= 0)
+		{
+			return status;
+		}
+		buflen -= status;
+		buf += status;
+	}
+
+	return sendSize;
+}
+
+int readHdr(int socket, FlowControlProtocolHeader *hdr)
+{
+	uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+	uint8_t *data = buffer;
+
+	int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader));
+	if (status <= 0)
+		return status;
+
+	uint32_t value;
+	data = decode(data, value);
+	hdr->msgType = value;
+
+	data = decode(data, value);
+	hdr->seqNumber = value;
+
+	data = decode(data, value);
+	hdr->status = value;
+
+	data = decode(data, value);
+	hdr->payloadLen = value;
+
+	return sizeof(FlowControlProtocolHeader);
+}
+
+int readXML(char **xmlContent)
+{
+	  std::ifstream is ("conf/flowServer.xml", std::ifstream::binary);
+	  if (is) {
+	    // get length of file:
+	    is.seekg (0, is.end);
+	    int length = is.tellg();
+	    is.seekg (0, is.beg);
+
+	    char * buffer = new char [length];
+
+	    printf("Reading %s len %d\n", "conf/flowServer.xml", length);
+	    // read data as a block:
+	    is.read (buffer,length);
+
+	    is.close();
+
+	    // ...buffer contains the entire file...
+	    *xmlContent = buffer;
+
+	    return length;
+	  }
+	  return 0;
+}
+
+static int sockfd = 0, newsockfd = 0;
+void sigHandler(int signal)
+{
+	if (signal == SIGINT || signal == SIGTERM)
+	{
+		close(newsockfd);
+		close(sockfd);
+		exit(1);
+	}
+}
+
+int main(int argc, char *argv[])
+{
+     int portno;
+     socklen_t clilen;
+     struct sockaddr_in serv_addr, cli_addr;
+     char buffer[4096];
+     int flag = 0;
+     int number = 0;
+     
+     int n;
+     if (argc < 2) {
+         fprintf(stderr,"ERROR, no port provided\n");
+         exit(1);
+     }
+
+ 	 if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR)
+ 	 {
+
+ 		return -1;
+ 	 }
+     sockfd = socket(AF_INET, SOCK_STREAM, 0);
+     if (sockfd < 0) 
+        error("ERROR opening socket");
+     bzero((char *) &serv_addr, sizeof(serv_addr));
+     portno = atoi(argv[1]);
+     serv_addr.sin_family = AF_INET;
+     serv_addr.sin_addr.s_addr = INADDR_ANY;
+     serv_addr.sin_port = htons(portno);
+     if (bind(sockfd, (struct sockaddr *) &serv_addr,
+              sizeof(serv_addr)) < 0) 
+              error("ERROR on binding");
+     listen(sockfd,5);
+     if (portno == DEFAULT_NIFI_SERVER_PORT)
+     {
+    	 while (true)
+    	 {
+    		 clilen = sizeof(cli_addr);
+    		 newsockfd = accept(sockfd,
+                 (struct sockaddr *) &cli_addr, 
+                 &clilen);
+    		 if (newsockfd < 0)
+    		 {
+    			 error("ERROR on accept");
+    			 break;
+    		 }
+    		 // process request
+    		 FlowControlProtocolHeader hdr;
+    		 int status = readHdr(newsockfd, &hdr);
+    		 if (status > 0)
+    		 {
+    			 printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+    		     printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber);
+    		     printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+    		     printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen);
+    		 	 if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ)
+    		 	 {
+    		 		printf("Flow Control Protocol Register Req receive\n");
+    		 		uint8_t *payload = new uint8_t[hdr.payloadLen];
+    		 		uint8_t *payloadPtr = payload;
+    		 		status = readData(newsockfd, payload, hdr.payloadLen);
+    		 		while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
+    		 		{
+    		 			uint32_t msgID = 0xFFFFFFFF;
+    		 			payloadPtr = decode(payloadPtr, msgID);
+    		 			if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER)
+    		 			{
+    		 				// Fixed 8 bytes
+    		 				uint8_t seqNum[8];
+    		 				memcpy(seqNum, payloadPtr, 8);
+    		 				printf("Flow Control Protocol Register Req receive serial num\n");
+    		 				payloadPtr += 8;
+    		 			}
+    		 			else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME)
+    		 			{
+    		 				uint32_t len;
+    		 				payloadPtr = decode(payloadPtr, len);
+    		 				printf("Flow Control Protocol receive XML name length %d\n", len);
+    		 				std::string flowName = (const char *) payloadPtr;
+    		 				payloadPtr += len;
+    		 				printf("Flow Control Protocol receive XML name %s\n", flowName.c_str());
+    		 			}
+    		 			else
+    		 			{
+    		 				break;
+    		 			}
+    		 		}
+    		 		delete[] payload;
+    		 		// Send Register Respond
+    		 		// Calculate the total payload msg size
+    		 		char *xmlContent;
+    		 		uint32_t xmlLen = readXML(&xmlContent);
+    		 		uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
+    		 		if (xmlLen > 0)
+    		 			payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen);
+
+    		 		uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+    		 		uint8_t *data = new uint8_t[size];
+    		 		uint8_t *start = data;
+
+    		 		// encode the HDR
+    		 		hdr.msgType = REGISTER_RESP;
+    		 		hdr.payloadLen = payloadSize;
+    		 		hdr.status = RESP_SUCCESS;
+    		 		data = encode(data, hdr.msgType);
+    		 		data = encode(data, hdr.seqNumber);
+    		 		data = encode(data, hdr.status);
+    		 		data = encode(data, hdr.payloadLen);
+
+    		 		// encode the report interval
+    		 		data = encode(data, REPORT_INTERVAL);
+    		 		data = encode(data, DEFAULT_REPORT_INTERVAL);
+
+    		 		// encode the XML content
+    		 		if (xmlLen > 0)
+    		 		{
+    		 			data = encode(data, FLOW_XML_CONTENT);
+    		 			data = encode(data, xmlLen);
+    		 			data = encode(data, (uint8_t *) xmlContent, xmlLen);
+    		 			delete[] xmlContent;
+    		 		}
+
+    		 		// send it
+    		 		status = sendData(newsockfd, start, size);
+    		 		delete[] start;
+    		 	 }
+    		 	 else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ)
+        		 {
+        		 		printf("Flow Control Protocol Report Req receive\n");
+        		 		uint8_t *payload = new uint8_t[hdr.payloadLen];
+        		 		uint8_t *payloadPtr = payload;
+        		 		status = readData(newsockfd, payload, hdr.payloadLen);
+        		 		while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
+        		 		{
+        		 			uint32_t msgID = 0xFFFFFFFF;
+        		 			payloadPtr = decode(payloadPtr, msgID);
+        		 			if (((FlowControlMsgID) msgID) == FLOW_XML_NAME)
+        		 			{
+        		 				uint32_t len;
+        		 				payloadPtr = decode(payloadPtr, len);
+        		 				printf("Flow Control Protocol receive XML name length %d\n", len);
+        		 				std::string flowName = (const char *) payloadPtr;
+        		 				payloadPtr += len;
+        		 				printf("Flow Control Protocol receive XML name %s\n", flowName.c_str());
+        		 			}
+        		 			else
+        		 			{
+        		 				break;
+        		 			}
+        		 		}
+        		 		delete[] payload;
+        		 		// Send Register Respond
+        		 		// Calculate the total payload msg size
+        		 		std::string processor = "RealTimeDataCollector";
+        		 		std::string propertyName1 = "real Time Message ID";
+        		 		std::string propertyValue1 = "41";
+        		 		std::string propertyName2 = "Batch Message ID";
+        		 		std::string propertyValue2 = "172,30,48";
+        		 		if (flag == 0)
+        		 		{
+              		 		propertyName1 = "Real Time Message ID";
+              		 		propertyValue1 = "41";
+                		    propertyName2 = "Batch Message ID";
+                		    propertyValue2 = "172,48";
+        		 			flag = 1;
+        		 		}
+        		 		else if (flag == 1)
+        		 		{
+        		 			propertyName1 = "Real Time Message ID";
+        		 			propertyValue1 = "172,48";
+        		 			propertyName2 = "Batch Message ID";
+        		 			propertyValue2 = "41";
+        		 			flag = 0;
+        		 		}
+        		 		uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1);
+        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1);
+        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1);
+        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1);
+        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1);
+
+        		 		uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+        		 		uint8_t *data = new uint8_t[size];
+        		 		uint8_t *start = data;
+
+        		 		// encode the HDR
+        		 		hdr.msgType = REPORT_RESP;
+        		 		hdr.payloadLen = payloadSize;
+        		 		hdr.status = RESP_SUCCESS;
+
+        		 		if (number >= 10 && number < 20)
+        		 	    {
+        		 	        // After 10 second report, stop the flow controller for 10 second
+        		 	       hdr.status = RESP_STOP_FLOW_CONTROLLER;
+        		 	    }
+        		 		else if (number == 20)
+        		 		{
+        		 			// restart the flow controller after 10 second
+        		 			hdr.status = RESP_START_FLOW_CONTROLLER;
+        		 		}
+        		 		else if (number == 30)
+        		 		{
+        		 			// retrigger register
+        		 			hdr.status = RESP_TRIGGER_REGISTER;
+        		 			number = 0;
+        		 		}
+
+        		 	    number++;
+
+        		 		data = encode(data, hdr.msgType);
+        		 		data = encode(data, hdr.seqNumber);
+        		 		data = encode(data, hdr.status);
+        		 		data = encode(data, hdr.payloadLen);
+
+        		 		// encode the processorName
+        		 		data = encode(data, PROCESSOR_NAME);
+        		 		data = encode(data, processor);
+
+        		 		// encode the propertyName and value TLV
+        		 		data = encode(data, PROPERTY_NAME);
+            		 	data = encode(data, propertyName1);
+            		 	data = encode(data, PROPERTY_VALUE);
+            		 	data = encode(data, propertyValue1);
+            		 	data = encode(data, PROPERTY_NAME);
+            		 	data = encode(data, propertyName2);
+            		 	data = encode(data, PROPERTY_VALUE);
+            		 	data = encode(data, propertyValue2);
+        		 		// send it
+        		 		status = sendData(newsockfd, start, size);
+        		 		delete[] start;
+        		 	 }
+    		 }
+    		 close(newsockfd);
+    	 }
+    	 close(sockfd);
+     }
+     else
+     {
+    	 clilen = sizeof(cli_addr);
+    	 newsockfd = accept(sockfd,
+    	                (struct sockaddr *) &cli_addr,
+    	                 &clilen);
+    	 if (newsockfd < 0)
+    	    	error("ERROR on accept");
+    	 while (1)
+    	 {
+    	    	bzero(buffer,4096);
+    	    	n = readline(newsockfd,buffer,4095);
+    	    	if (n <= 0 )
+    	        {
+    	    		close(newsockfd);
+    	    		newsockfd = accept(sockfd,
+    	    		    	                (struct sockaddr *) &cli_addr,
+    	    		    	                 &clilen);
+    	    		continue;
+    	    	}
+    	    	printf("%s",buffer);
+    	  }
+    	  close(newsockfd);
+    	  close(sockfd);
+     }
+     return 0; 
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/thirdparty/libxml2/AUTHORS
----------------------------------------------------------------------
diff --git a/thirdparty/libxml2/AUTHORS b/thirdparty/libxml2/AUTHORS
new file mode 100644
index 0000000..cf2e9a6
--- /dev/null
+++ b/thirdparty/libxml2/AUTHORS
@@ -0,0 +1,5 @@
+Daniel Veillard <da...@veillard.com>
+Bjorn Reese <br...@users.sourceforge.net>
+William Brack <wb...@mmm.com.hk>
+Igor Zlatkovic <ig...@zlatkovic.com> for the Windows port
+Aleksey Sanin <al...@aleksey.com>