You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/10/13 15:07:20 UTC

[02/18] nifi-minifi-cpp git commit: MINIFI-34 Establishing CMake build system to provide build functionality equivalent to pre-existing Makefile.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/src/Site2SiteClientProtocol.cpp b/src/Site2SiteClientProtocol.cpp
deleted file mode 100644
index 88ea78a..0000000
--- a/src/Site2SiteClientProtocol.cpp
+++ /dev/null
@@ -1,1313 +0,0 @@
-/**
- * @file Site2SiteProtocol.cpp
- * Site2SiteProtocol class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sys/time.h>
-#include <stdio.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include <netinet/tcp.h>
-#include <iostream>
-#include "Site2SitePeer.h"
-#include "Site2SiteClientProtocol.h"
-
-bool Site2SiteClientProtocol::establish()
-{
-	if (_peerState != IDLE)
-	{
-		_logger->log_error("Site2Site peer state is not idle while try to establish");
-		return false;
-	}
-
-	bool ret = _peer->Open();
-
-	if (!ret)
-	{
-		_logger->log_error("Site2Site peer socket open failed");
-		return false;
-	}
-
-	// Negotiate the version
-	ret = initiateResourceNegotiation();
-
-	if (!ret)
-	{
-		_logger->log_error("Site2Site Protocol Version Negotiation failed");
-		/*
-		_peer->yield();
-		tearDown(); */
-		return false;
-	}
-
-	_logger->log_info("Site2Site socket established");
-	_peerState = ESTABLISHED;
-
-	return true;
-}
-
-bool Site2SiteClientProtocol::initiateResourceNegotiation()
-{
-	// Negotiate the version
-	if (_peerState != IDLE)
-	{
-		_logger->log_error("Site2Site peer state is not idle while initiateResourceNegotiation");
-		return false;
-	}
-
-	_logger->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion);
-
-	int ret = _peer->writeUTF(this->getResourceName());
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	ret = _peer->write(_currentVersion);
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	uint8_t statusCode;
-	ret = _peer->read(statusCode);
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	switch (statusCode)
-	{
-	case RESOURCE_OK:
-		_logger->log_info("Site2Site Protocol Negotiate protocol version OK");
-		return true;
-	case DIFFERENT_RESOURCE_VERSION:
-		uint32_t serverVersion;
-		ret = _peer->read(serverVersion);
-		if (ret <= 0)
-		{
-			// tearDown();
-			return false;
-		}
-		_logger->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion);
-		for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion)/sizeof(uint32_t); i++)
-		{
-			if (serverVersion >= _supportedVersion[i])
-			{
-				_currentVersion = _supportedVersion[i];
-				_currentVersionIndex = i;
-				return initiateResourceNegotiation();
-			}
-		}
-		ret = -1;
-		// tearDown();
-		return false;
-	case NEGOTIATED_ABORT:
-		_logger->log_info("Site2Site Negotiate protocol response ABORT");
-		ret = -1;
-		// tearDown();
-		return false;
-	default:
-		_logger->log_info("Negotiate protocol response unknown code %d", statusCode);
-		return true;
-	}
-
-	return true;
-}
-
-bool Site2SiteClientProtocol::initiateCodecResourceNegotiation()
-{
-	// Negotiate the version
-	if (_peerState != HANDSHAKED)
-	{
-		_logger->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation");
-		return false;
-	}
-
-	_logger->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion);
-
-	int ret = _peer->writeUTF(this->getCodecResourceName());
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	ret = _peer->write(_currentCodecVersion);
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	uint8_t statusCode;
-	ret = _peer->read(statusCode);
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	switch (statusCode)
-	{
-	case RESOURCE_OK:
-		_logger->log_info("Site2Site Codec Negotiate version OK");
-		return true;
-	case DIFFERENT_RESOURCE_VERSION:
-		uint32_t serverVersion;
-		ret = _peer->read(serverVersion);
-		if (ret <= 0)
-		{
-			// tearDown();
-			return false;
-		}
-		_logger->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion);
-		for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++)
-		{
-			if (serverVersion >= _supportedCodecVersion[i])
-			{
-				_currentCodecVersion = _supportedCodecVersion[i];
-				_currentCodecVersionIndex = i;
-				return initiateCodecResourceNegotiation();
-			}
-		}
-		ret = -1;
-		// tearDown();
-		return false;
-	case NEGOTIATED_ABORT:
-		_logger->log_info("Site2Site Codec Negotiate response ABORT");
-		ret = -1;
-		// tearDown();
-		return false;
-	default:
-		_logger->log_info("Negotiate Codec response unknown code %d", statusCode);
-		return true;
-	}
-
-	return true;
-}
-
-bool Site2SiteClientProtocol::handShake()
-{
-	if (_peerState != ESTABLISHED)
-	{
-		_logger->log_error("Site2Site peer state is not established while handshake");
-		return false;
-	}
-	_logger->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str());
-	uuid_t uuid;
-	// Generate the global UUID for the com identify
-	uuid_generate(uuid);
-	char uuidStr[37];
-	uuid_unparse(uuid, uuidStr);
-	_commsIdentifier = uuidStr;
-
-	int ret = _peer->writeUTF(_commsIdentifier);
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	std::map<std::string, std::string> properties;
-	properties[HandShakePropertyStr[GZIP]] = "false";
-	properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
-	properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut);
-	if (this->_currentVersion >= 5)
-	{
-		if (this->_batchCount > 0)
-			properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount);
-		if (this->_batchSize > 0)
-			properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize);
-		if (this->_batchDuration > 0)
-			properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration);
-	}
-
-	if (_currentVersion >= 3)
-	{
-		ret = _peer->writeUTF(_peer->getURL());
-		if (ret <= 0)
-		{
-			// tearDown();
-			return false;
-		}
-	}
-
-	uint32_t size = properties.size();
-	ret = _peer->write(size);
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	std::map<std::string, std::string>::iterator it;
-	for (it = properties.begin(); it!= properties.end(); it++)
-	{
-		ret = _peer->writeUTF(it->first);
-		if (ret <= 0)
-		{
-			// tearDown();
-			return false;
-		}
-		ret = _peer->writeUTF(it->second);
-		if (ret <= 0)
-		{
-			// tearDown();
-			return false;
-		}
-		_logger->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str());
-	}
-
-	RespondCode code;
-	std::string message;
-
-	ret = this->readRespond(code, message);
-
-	if (ret <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	switch (code)
-	{
-	case PROPERTIES_OK:
-		_logger->log_info("Site2Site HandShake Completed");
-		_peerState = HANDSHAKED;
-		return true;
-	case PORT_NOT_IN_VALID_STATE:
-    case UNKNOWN_PORT:
-    case PORTS_DESTINATION_FULL:
-    	_logger->log_error("Site2Site HandShake Failed because destination port is either invalid or full");
-		ret = -1;
-		/*
-		_peer->yield();
-		tearDown(); */
-		return false;
-	default:
-		_logger->log_info("HandShake Failed because of unknown respond code %d", code);
-		ret = -1;
-		/*
-		_peer->yield();
-		tearDown(); */
-		return false;
-	}
-
-	return false;
-}
-
-void Site2SiteClientProtocol::tearDown()
-{
-	if (_peerState >= ESTABLISHED)
-	{
-		_logger->log_info("Site2Site Protocol tearDown");
-		// need to write shutdown request
-		writeRequestType(SHUTDOWN);
-	}
-
-	std::map<std::string, Transaction *>::iterator it;
-	for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++)
-	{
-		delete it->second;
-	}
-	_transactionMap.clear();
-	_peer->Close();
-	_peerState = IDLE;
-}
-
-int Site2SiteClientProtocol::writeRequestType(RequestType type)
-{
-	if (type >= MAX_REQUEST_TYPE)
-		return -1;
-
-	return _peer->writeUTF(RequestTypeStr[type]);
-}
-
-int Site2SiteClientProtocol::readRequestType(RequestType &type)
-{
-	std::string requestTypeStr;
-
-	int ret = _peer->readUTF(requestTypeStr);
-
-	if (ret <= 0)
-		return ret;
-
-	for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++)
-	{
-		if (RequestTypeStr[i] == requestTypeStr)
-		{
-			type = (RequestType) i;
-			return ret;
-		}
-	}
-
-	return -1;
-}
-
-int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message)
-{
-	uint8_t firstByte;
-
-	int ret = _peer->read(firstByte);
-
-	if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
-		return -1;
-
-	uint8_t secondByte;
-
-	ret = _peer->read(secondByte);
-
-	if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
-		return -1;
-
-	uint8_t thirdByte;
-
-	ret = _peer->read(thirdByte);
-
-	if (ret <= 0)
-		return ret;
-
-	code = (RespondCode) thirdByte;
-
-	RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-	if ( resCode == NULL)
-	{
-		// Not a valid respond code
-		return -1;
-	}
-	if (resCode->hasDescription)
-	{
-		ret = _peer->readUTF(message);
-		if (ret <= 0)
-			return -1;
-	}
-	return 3 + message.size();
-}
-
-int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message)
-{
-	RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-	if (resCode == NULL)
-	{
-		// Not a valid respond code
-		return -1;
-	}
-
-	uint8_t codeSeq[3];
-	codeSeq[0] = CODE_SEQUENCE_VALUE_1;
-	codeSeq[1] = CODE_SEQUENCE_VALUE_2;
-	codeSeq[2] = (uint8_t) code;
-
-	int ret = _peer->write(codeSeq, 3);
-
-	if (ret != 3)
-		return -1;
-
-	if (resCode->hasDescription)
-	{
-		ret = _peer->writeUTF(message);
-		if (ret > 0)
-			return (3 + ret);
-		else
-			return ret;
-	}
-	else
-		return 3;
-}
-
-bool Site2SiteClientProtocol::negotiateCodec()
-{
-	if (_peerState != HANDSHAKED)
-	{
-		_logger->log_error("Site2Site peer state is not handshaked while negotiate codec");
-		return false;
-	}
-
-	_logger->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str());
-
-	int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
-
-	if (status <= 0)
-	{
-		// tearDown();
-		return false;
-	}
-
-	// Negotiate the codec version
-	bool ret = initiateCodecResourceNegotiation();
-
-	if (!ret)
-	{
-		_logger->log_error("Site2Site Codec Version Negotiation failed");
-		/*
-		_peer->yield();
-		tearDown(); */
-		return false;
-	}
-
-	_logger->log_info("Site2Site Codec Completed and move to READY state for data transfer");
-	_peerState = READY;
-
-	return true;
-}
-
-bool Site2SiteClientProtocol::bootstrap()
-{
-	if (_peerState == READY)
-		return true;
-
-	tearDown();
-
-	if (establish() && handShake() && negotiateCodec())
-	{
-		_logger->log_info("Site2Site Ready For data transaction");
-		return true;
-	}
-	else
-	{
-		_peer->yield();
-		tearDown();
-		return false;
-	}
-}
-
-Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction)
-{
-	int ret;
-	bool dataAvailable;
-	Transaction *transaction = NULL;
-
-	if (_peerState != READY)
-	{
-		bootstrap();
-	}
-
-	if (_peerState != READY)
-	{
-		return NULL;
-	}
-
-	if (direction == RECEIVE)
-	{
-		ret = writeRequestType(RECEIVE_FLOWFILES);
-
-		if (ret <= 0)
-		{
-			// tearDown();
-			return NULL;
-		}
-
-		RespondCode code;
-		std::string message;
-
-		ret = readRespond(code, message);
-
-		if (ret <= 0)
-		{
-			// tearDown();
-			return NULL;
-		}
-
-		switch (code)
-		{
-		case MORE_DATA:
-			dataAvailable = true;
-			_logger->log_info("Site2Site peer indicates that data is available");
-			transaction = new Transaction(direction);
-			_transactionMap[transaction->getUUIDStr()] = transaction;
-			transactionID = transaction->getUUIDStr();
-			transaction->setDataAvailable(dataAvailable);
-			_logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-			return transaction;
-		case NO_MORE_DATA:
-			dataAvailable = false;
-			_logger->log_info("Site2Site peer indicates that no data is available");
-			transaction = new Transaction(direction);
-			_transactionMap[transaction->getUUIDStr()] = transaction;
-			transactionID = transaction->getUUIDStr();
-			transaction->setDataAvailable(dataAvailable);
-			_logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-			return transaction;
-		default:
-			_logger->log_info("Site2Site got unexpected response %d when asking for data", code);
-			// tearDown();
-			return NULL;
-		}
-	}
-	else
-	{
-		ret = writeRequestType(SEND_FLOWFILES);
-
-		if (ret <= 0)
-		{
-			// tearDown();
-			return NULL;
-		}
-		else
-		{
-			transaction = new Transaction(direction);
-			_transactionMap[transaction->getUUIDStr()] = transaction;
-			transactionID = transaction->getUUIDStr();
-			_logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-			return transaction;
-		}
-	}
-}
-
-bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof)
-{
-	int ret;
-	Transaction *transaction = NULL;
-
-	if (_peerState != READY)
-	{
-		bootstrap();
-	}
-
-	if (_peerState != READY)
-	{
-		return false;
-	}
-
-	std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-	if (it == _transactionMap.end())
-	{
-		return false;
-	}
-	else
-	{
-		transaction = it->second;
-	}
-
-	if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED)
-	{
-		_logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
-		return false;
-	}
-
-	if (transaction->getDirection() != RECEIVE)
-	{
-		_logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
-		return false;
-	}
-
-	if (!transaction->isDataAvailable())
-	{
-		eof = true;
-		return true;
-	}
-
-	if (transaction->_transfers > 0)
-	{
-		// if we already has transfer before, check to see whether another one is available
-		RespondCode code;
-		std::string message;
-
-		ret = readRespond(code, message);
-
-		if (ret <= 0)
-		{
-			return false;
-		}
-		if (code == CONTINUE_TRANSACTION)
-		{
-			_logger->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str());
-			transaction->_dataAvailable = true;
-		}
-		else if (code == FINISH_TRANSACTION)
-		{
-			_logger->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str());
-			transaction->_dataAvailable = false;
-		}
-		else
-		{
-			_logger->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code);
-			return false;
-		}
-	}
-
-	if (!transaction->isDataAvailable())
-	{
-		eof = true;
-		return true;
-	}
-
-	// start to read the packet
-	uint32_t numAttributes;
-	ret = _peer->read(numAttributes, &transaction->_crc);
-	if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES)
-	{
-		return false;
-	}
-
-	// read the attributes
-	for (unsigned int i = 0; i < numAttributes; i++)
-	{
-		std::string key;
-		std::string value;
-		ret = _peer->readUTF(key, true, &transaction->_crc);
-		if (ret <= 0)
-		{
-			return false;
-		}
-		ret = _peer->readUTF(value, true, &transaction->_crc);
-		if (ret <= 0)
-		{
-			return false;
-		}
-		packet->_attributes[key] = value;
-		_logger->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str());
-	}
-
-	uint64_t len;
-	ret = _peer->read(len, &transaction->_crc);
-	if (ret <= 0)
-	{
-		return false;
-	}
-
-	packet->_size = len;
-	transaction->_transfers++;
-	transaction->_state = DATA_EXCHANGED;
-	transaction->_bytes += len;
-	_logger->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(),
-			transaction->_transfers, transaction->_bytes);
-
-	return true;
-}
-
-bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session)
-{
-	int ret;
-	Transaction *transaction = NULL;
-
-	if (_peerState != READY)
-	{
-		bootstrap();
-	}
-
-	if (_peerState != READY)
-	{
-		return false;
-	}
-
-	std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-	if (it == _transactionMap.end())
-	{
-		return false;
-	}
-	else
-	{
-		transaction = it->second;
-	}
-
-	if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED)
-	{
-		_logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
-		return false;
-	}
-
-	if (transaction->getDirection() != SEND)
-	{
-		_logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
-		return false;
-	}
-
-	if (transaction->_transfers > 0)
-	{
-		ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
-		if (ret <= 0)
-		{
-			return false;
-		}
-	}
-
-	// start to read the packet
-	uint32_t numAttributes = packet->_attributes.size();
-	ret = _peer->write(numAttributes, &transaction->_crc);
-	if (ret != 4)
-	{
-		return false;
-	}
-
-	std::map<std::string, std::string>::iterator itAttribute;
-	for (itAttribute = packet->_attributes.begin(); itAttribute!= packet->_attributes.end(); itAttribute++)
-	{
-		ret = _peer->writeUTF(itAttribute->first, true, &transaction->_crc);
-		if (ret <= 0)
-		{
-			return false;
-		}
-		ret = _peer->writeUTF(itAttribute->second, true, &transaction->_crc);
-		if (ret <= 0)
-		{
-			return false;
-		}
-		_logger->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(),
-				itAttribute->first.c_str(), itAttribute->second.c_str());
-	}
-
-	uint64_t len = flowFile->getSize() ;
-	ret = _peer->write(len, &transaction->_crc);
-	if (ret != 8)
-	{
-		return false;
-	}
-
-	if (flowFile->getSize())
-	{
-		Site2SiteClientProtocol::ReadCallback callback(packet);
-		session->read(flowFile, &callback);
-		if (flowFile->getSize() != packet->_size)
-		{
-			return false;
-		}
-	}
-
-	transaction->_transfers++;
-	transaction->_state = DATA_EXCHANGED;
-	transaction->_bytes += len;
-	_logger->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(),
-				transaction->_transfers, transaction->_bytes);
-
-	return true;
-}
-
-void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessSession *session)
-{
-	uint64_t bytes = 0;
-	int transfers = 0;
-	Transaction *transaction = NULL;
-
-	if (_peerState != READY)
-	{
-		bootstrap();
-	}
-
-	if (_peerState != READY)
-	{
-		context->yield();
-		tearDown();
-		throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-		return;
-	}
-
-	// Create the transaction
-	std::string transactionID;
-	transaction = createTransaction(transactionID, RECEIVE);
-
-	if (transaction == NULL)
-	{
-		context->yield();
-		tearDown();
-		throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-		return;
-	}
-
-	try
-	{
-		while (true)
-		{
-			std::map<std::string, std::string> empty;
-			DataPacket packet(this, transaction, empty);
-			bool eof = false;
-
-			if (!receive(transactionID, &packet, eof))
-			{
-				throw Exception(SITE2SITE_EXCEPTION, "Receive Failed");
-				return;
-			}
-			if (eof)
-			{
-				// transaction done
-				break;
-			}
-			FlowFileRecord *flowFile = session->create();
-			if (!flowFile)
-			{
-				throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
-				return;
-			}
-			std::map<std::string, std::string>::iterator it;
-			for (it = packet._attributes.begin(); it!= packet._attributes.end(); it++)
-			{
-				flowFile->addAttribute(it->first, it->second);
-			}
-
-			if (packet._size > 0)
-			{
-				Site2SiteClientProtocol::WriteCallback callback(&packet);
-				session->write(flowFile, &callback);
-				if (flowFile->getSize() != packet._size)
-				{
-					throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right");
-					return;
-				}
-			}
-			Relationship relation; // undefined relationship
-			session->transfer(flowFile, relation);
-			// receive the transfer for the flow record
-			bytes += packet._size;
-			transfers++;
-		} // while true
-
-		if (!confirm(transactionID))
-		{
-			throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
-			return;
-		}
-		if (!complete(transactionID))
-		{
-			throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed");
-			return;
-		}
-		_logger->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d",
-				transactionID.c_str(), transfers, bytes);
-		// we yield the receive if we did not get anything
-		if (transfers == 0)
-			context->yield();
-	}
-	catch (std::exception &exception)
-	{
-		if (transaction)
-			deleteTransaction(transactionID);
-		context->yield();
-		tearDown();
-		_logger->log_debug("Caught Exception %s", exception.what());
-		throw;
-	}
-	catch (...)
-	{
-		if (transaction)
-			deleteTransaction(transactionID);
-		context->yield();
-		tearDown();
-		_logger->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles");
-		throw;
-	}
-
-	deleteTransaction(transactionID);
-
-	return;
-}
-
-bool Site2SiteClientProtocol::confirm(std::string transactionID)
-{
-	int ret;
-	Transaction *transaction = NULL;
-
-	if (_peerState != READY)
-	{
-		bootstrap();
-	}
-
-	if (_peerState != READY)
-	{
-		return false;
-	}
-
-	std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-	if (it == _transactionMap.end())
-	{
-		return false;
-	}
-	else
-	{
-		transaction = it->second;
-	}
-
-	if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() &&
-			transaction->getDirection() == RECEIVE)
-	{
-		transaction->_state = TRANSACTION_CONFIRMED;
-		return true;
-	}
-
-	if (transaction->getState() != DATA_EXCHANGED)
-		return false;
-
-	if (transaction->getDirection() == RECEIVE)
-	{
-		if (transaction->isDataAvailable())
-			return false;
-		// we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
-		// to peer so that we can verify that the connection is still open. This is a two-phase commit,
-		// which helps to prevent the chances of data duplication. Without doing this, we may commit the
-		// session and then when we send the response back to the peer, the peer may have timed out and may not
-		// be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
-		// Critical Section involved in this transaction so that rather than the Critical Section being the
-		// time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
-		long crcValue = transaction->getCRC();
-		std::string crc = std::to_string(crcValue);
-		_logger->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(),
-						transactionID.c_str());
-		ret = writeRespond(CONFIRM_TRANSACTION, crc);
-		if (ret <= 0)
-			return false;
-		RespondCode code;
-		std::string message;
-		readRespond(code, message);
-		if (ret <= 0)
-			return false;
-
-		if (code == CONFIRM_TRANSACTION)
-		{
-			_logger->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str());
-			transaction->_state = TRANSACTION_CONFIRMED;
-			return true;
-		}
-		else if (code == BAD_CHECKSUM)
-		{
-			_logger->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str());
-			/*
-			transaction->_state = TRANSACTION_CONFIRMED;
-			return true; */
-			return false;
-		}
-		else
-		{
-			_logger->log_info("Site2Site transaction %s peer unknown respond code %d",
-					transactionID.c_str(), code);
-			return false;
-		}
-	}
-	else
-	{
-		_logger->log_info("Site2Site Send FINISH TRANSACTION for transaction %s",
-								transactionID.c_str());
-		ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
-		if (ret <= 0)
-			return false;
-		RespondCode code;
-		std::string message;
-		readRespond(code, message);
-		if (ret <= 0)
-			return false;
-
-		// we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
-		if (code == CONFIRM_TRANSACTION)
-		{
-			_logger->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str());
-			if (this->_currentVersion > 3)
-			{
-				long crcValue = transaction->getCRC();
-				std::string crc = std::to_string(crcValue);
-				if (message == crc)
-				{
-					_logger->log_info("Site2Site transaction %s CRC matched", transactionID.c_str());
-					ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
-					if (ret <= 0)
-						return false;
-					transaction->_state = TRANSACTION_CONFIRMED;
-					return true;
-				}
-				else
-				{
-					_logger->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
-					ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
-					/*
-					ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
-										if (ret <= 0)
-											return false;
-										transaction->_state = TRANSACTION_CONFIRMED;
-					return true; */
-					return false;
-				}
-			}
-			ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
-			if (ret <= 0)
-				return false;
-			transaction->_state = TRANSACTION_CONFIRMED;
-			return true;
-		}
-		else
-		{
-			_logger->log_info("Site2Site transaction %s peer unknown respond code %d",
-					transactionID.c_str(), code);
-			return false;
-		}
-		return false;
-	}
-}
-
-void Site2SiteClientProtocol::cancel(std::string transactionID)
-{
-	Transaction *transaction = NULL;
-
-	if (_peerState != READY)
-	{
-		return;
-	}
-
-	std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-	if (it == _transactionMap.end())
-	{
-		return;
-	}
-	else
-	{
-		transaction = it->second;
-	}
-
-	if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED
-			|| transaction->getState() == TRANSACTION_ERROR)
-	{
-		return;
-	}
-
-	this->writeRespond(CANCEL_TRANSACTION, "Cancel");
-	transaction->_state = TRANSACTION_CANCELED;
-
-	tearDown();
-	return;
-}
-
-void Site2SiteClientProtocol::deleteTransaction(std::string transactionID)
-{
-	Transaction *transaction = NULL;
-
-	std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-	if (it == _transactionMap.end())
-	{
-		return;
-	}
-	else
-	{
-		transaction = it->second;
-	}
-
-	_logger->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str());
-	delete transaction;
-	_transactionMap.erase(transactionID);
-}
-
-void Site2SiteClientProtocol::error(std::string transactionID)
-{
-	Transaction *transaction = NULL;
-
-	std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-	if (it == _transactionMap.end())
-	{
-		return;
-	}
-	else
-	{
-		transaction = it->second;
-	}
-
-	transaction->_state = TRANSACTION_ERROR;
-	tearDown();
-	return;
-}
-
-//! Complete the transaction
-bool Site2SiteClientProtocol::complete(std::string transactionID)
-{
-	int ret;
-	Transaction *transaction = NULL;
-
-	if (_peerState != READY)
-	{
-		bootstrap();
-	}
-
-	if (_peerState != READY)
-	{
-		return false;
-	}
-
-	std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-	if (it == _transactionMap.end())
-	{
-		return false;
-	}
-	else
-	{
-		transaction = it->second;
-	}
-
-	if (transaction->getState() != TRANSACTION_CONFIRMED)
-	{
-		return false;
-	}
-
-	if (transaction->getDirection() == RECEIVE)
-	{
-		if (transaction->_transfers == 0)
-		{
-			transaction->_state = TRANSACTION_COMPLETED;
-			return true;
-		}
-		else
-		{
-			_logger->log_info("Site2Site transaction %s send finished", transactionID.c_str());
-			ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
-			if (ret <= 0)
-				return false;
-			else
-			{
-				transaction->_state = TRANSACTION_COMPLETED;
-				return true;
-			}
-		}
-	}
-	else
-	{
-		RespondCode code;
-		std::string message;
-		int ret;
-
-		ret = readRespond(code, message);
-
-		if (ret <= 0)
-			return false;
-
-		if (code == TRANSACTION_FINISHED)
-		{
-			_logger->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str());
-			transaction->_state = TRANSACTION_COMPLETED;
-			return true;
-		}
-		else
-		{
-			_logger->log_info("Site2Site transaction %s peer unknown respond code %d",
-					transactionID.c_str(), code);
-			return false;
-		}
-	}
-}
-
-void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, ProcessSession *session)
-{
-	FlowFileRecord *flow = session->get();
-	Transaction *transaction = NULL;
-
-	if (!flow)
-		return;
-
-	if (_peerState != READY)
-	{
-		bootstrap();
-	}
-
-	if (_peerState != READY)
-	{
-		context->yield();
-		tearDown();
-		throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-		return;
-	}
-
-	// Create the transaction
-	std::string transactionID;
-	transaction = createTransaction(transactionID, SEND);
-
-	if (transaction == NULL)
-	{
-		context->yield();
-		tearDown();
-		throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-		return;
-	}
-
-	bool continueTransaction = true;
-	uint64_t startSendingNanos = getTimeNano();
-
-	try
-	{
-		while (continueTransaction)
-		{
-			DataPacket packet(this, transaction, flow->getAttributes());
-
-			if (!send(transactionID, &packet, flow, session))
-			{
-				throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
-				return;
-			}
-			_logger->log_info("Site2Site transaction %s send flow record %s",
-							transactionID.c_str(), flow->getUUIDStr().c_str());
-			session->remove(flow);
-
-			uint64_t transferNanos = getTimeNano() - startSendingNanos;
-			if (transferNanos > _batchSendNanos)
-				break;
-
-			flow = session->get();
-			if (!flow)
-			{
-				continueTransaction = false;
-			}
-		} // while true
-
-		if (!confirm(transactionID))
-		{
-			throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
-			return;
-		}
-		if (!complete(transactionID))
-		{
-			throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
-			return;
-		}
-		_logger->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d",
-				transactionID.c_str(), transaction->_transfers, transaction->_bytes);
-	}
-	catch (std::exception &exception)
-	{
-		if (transaction)
-			deleteTransaction(transactionID);
-		context->yield();
-		tearDown();
-		_logger->log_debug("Caught Exception %s", exception.what());
-		throw;
-	}
-	catch (...)
-	{
-		if (transaction)
-			deleteTransaction(transactionID);
-		context->yield();
-		tearDown();
-		_logger->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles");
-		throw;
-	}
-
-	deleteTransaction(transactionID);
-
-	return;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/src/Site2SitePeer.cpp b/src/Site2SitePeer.cpp
deleted file mode 100644
index 48e19d0..0000000
--- a/src/Site2SitePeer.cpp
+++ /dev/null
@@ -1,435 +0,0 @@
-/**
- * @file Site2SitePeer.cpp
- * Site2SitePeer class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sys/time.h>
-#include <stdio.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include <netinet/tcp.h>
-#include <iostream>
-#include "Site2SitePeer.h"
-
-//! CRC tables
-std::atomic<bool> CRC32::tableInit(false);
-unsigned int CRC32::table[256];
-
-bool Site2SitePeer::Open()
-{
-	in_addr_t addr;
-	int sock = 0;
-	struct hostent *h;
-	const char *host;
-	uint16_t port;
-
-	host = this->_host.c_str();
-	port = this->_port;
-
-	if (strlen(host) == 0)
-		return false;
-
-#ifdef __MACH__
-	h = gethostbyname(host);
-#else
-	char buf[1024];
-	struct hostent he;
-	int hh_errno;
-	gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
-	memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
-	sock = socket(AF_INET, SOCK_STREAM, 0);
-	if (sock < 0)
-	{
-		_logger->log_error("Could not create socket to hostName %s", host);
-		this->yield();
-		return false;
-	}
-
-#ifndef __MACH__
-	int opt = 1;
-	bool nagle_off = true;
-
-	if (nagle_off)
-	{
-		if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
-		{
-			_logger->log_error("setsockopt() TCP_NODELAY failed");
-			close(sock);
-			this->yield();
-			return false;
-		}
-		if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-				(char *)&opt, sizeof(opt)) < 0)
-		{
-			_logger->log_error("setsockopt() SO_REUSEADDR failed");
-			close(sock);
-			this->yield();
-			return false;
-		}
-	}
-
-	int sndsize = 256*1024;
-	if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
-	{
-		_logger->log_error("setsockopt() SO_SNDBUF failed");
-		close(sock);
-		this->yield();
-		return false;
-	}
-	int rcvsize = 256*1024;
-	if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvsize, (int)sizeof(rcvsize)) < 0)
-	{
-		_logger->log_error("setsockopt() SO_RCVBUF failed");
-		close(sock);
-		this->yield();
-		return false;
-	}
-#endif
-
-	struct sockaddr_in sa;
-	socklen_t socklen;
-	int status;
-
-	memset(&sa, 0, sizeof(sa));
-	sa.sin_family = AF_INET;
-	sa.sin_addr.s_addr = htonl(INADDR_ANY);
-	sa.sin_port = htons(0);
-	socklen = sizeof(sa);
-	if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
-	{
-		_logger->log_error("socket bind failed");
-		close(sock);
-		this->yield();
-		return false;
-	}
-
-	memset(&sa, 0, sizeof(sa));
-	sa.sin_family = AF_INET;
-	sa.sin_addr.s_addr = addr;
-	sa.sin_port = htons(port);
-	socklen = sizeof(sa);
-
-	status = connect(sock, (struct sockaddr *)&sa, socklen);
-
-	if (status < 0)
-	{
-		_logger->log_error("socket connect failed to %s %d", host, port);
-		close(sock);
-		this->yield();
-		return false;
-	}
-
-	_logger->log_info("Site2Site Peer socket %d connect to server %s port %d success", sock, host, port);
-
-	_socket = sock;
-
-	status = sendData((uint8_t *) MAGIC_BYTES, sizeof(MAGIC_BYTES));
-
-	if (status <= 0)
-	{
-		Close();
-		return false;
-	}
-
-	return true;
-}
-
-void Site2SitePeer::Close()
-{
-	if (_socket)
-	{
-		_logger->log_info("Site2Site Peer socket %d close", _socket);
-		close(_socket);
-		_socket = 0;
-	}
-}
-
-int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc)
-{
-	int ret = 0, bytes = 0;
-
-	if (_socket <= 0)
-	{
-		// this->yield();
-		return -1;
-	}
-
-	while (bytes < buflen)
-	{
-		ret = send(_socket, buf+bytes, buflen-bytes, 0);
-		//check for errors
-		if (ret == -1)
-		{
-			_logger->log_error("Site2Site Peer socket %d send failed %s", _socket, strerror(errno));
-			Close();
-			// this->yield();
-			return ret;
-		}
-		bytes+=ret;
-	}
-
-	if (crc)
-		crc->update(buf, buflen);
-
-	return bytes;
-}
-
-int Site2SitePeer::Select(int msec)
-{
-	fd_set fds;
-	struct timeval tv;
-    int retval;
-    int fd = _socket;
-
-    FD_ZERO(&fds);
-    FD_SET(fd, &fds);
-
-    tv.tv_sec = msec/1000;
-    tv.tv_usec = (msec % 1000) * 1000;
-
-    if (msec > 0)
-       retval = select(fd+1, &fds, NULL, NULL, &tv);
-    else
-       retval = select(fd+1, &fds, NULL, NULL, NULL);
-
-    if (retval <= 0)
-      return retval;
-    if (FD_ISSET(fd, &fds))
-      return retval;
-    else
-      return 0;
-}
-
-int Site2SitePeer::readData(uint8_t *buf, int buflen, CRC32 *crc)
-{
-	int sendSize = buflen;
-	uint8_t *start = buf;
-
-	if (_socket <= 0)
-	{
-		// this->yield();
-		return -1;
-	}
-
-	while (buflen)
-	{
-		int status;
-		status = Select((int) _timeOut);
-		if (status <= 0)
-		{
-			Close();
-			return status;
-		}
-		status = recv(_socket, buf, buflen, 0);
-		if (status <= 0)
-		{
-			Close();
-			// this->yield();
-			return status;
-		}
-		buflen -= status;
-		buf += status;
-	}
-
-	if (crc)
-		crc->update(start, sendSize);
-
-	return sendSize;
-}
-
-int Site2SitePeer::writeUTF(std::string str, bool widen, CRC32 *crc)
-{
-	int strlen = str.length();
-	int utflen = 0;
-	int c, count = 0;
-
-	/* use charAt instead of copying String to char array */
-	for (int i = 0; i < strlen; i++) {
-		c = str.at(i);
-		if ((c >= 0x0001) && (c <= 0x007F)) {
-			utflen++;
-		} else if (c > 0x07FF) {
-			utflen += 3;
-		} else {
-			utflen += 2;
-		}
-	}
-
-	if (utflen > 65535)
-		return -1;
-
-	uint8_t *bytearr = NULL;
-	if (!widen)
-	{
-		bytearr = new uint8_t[utflen+2];
-		bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
-		bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
-	}
-	else
-	{
-		bytearr = new uint8_t[utflen+4];
-		bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF);
-		bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF);
-		bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
-		bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
-	}
-
-	int i=0;
-	for (i=0; i<strlen; i++) {
-		c = str.at(i);
-		if (!((c >= 0x0001) && (c <= 0x007F))) break;
-		bytearr[count++] = (uint8_t) c;
-	}
-
-	for (;i < strlen; i++){
-		c = str.at(i);
-		if ((c >= 0x0001) && (c <= 0x007F)) {
-			bytearr[count++] = (uint8_t) c;
-		} else if (c > 0x07FF) {
-			bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 0x0F));
-			bytearr[count++] = (uint8_t) (0x80 | ((c >>  6) & 0x3F));
-			bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 0x3F));
-		} else {
-			bytearr[count++] = (uint8_t) (0xC0 | ((c >>  6) & 0x1F));
-			bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 0x3F));
-		}
-	}
-	int ret;
-	if (!widen)
-	{
-		ret = sendData(bytearr, utflen+2, crc);
-	}
-	else
-	{
-		ret = sendData(bytearr, utflen+4, crc);
-	}
-	delete[] bytearr;
-	return ret;
-}
-
-int Site2SitePeer::readUTF(std::string &str, bool widen, CRC32 *crc)
-{
-    uint16_t utflen;
-    int ret;
-
-    if (!widen)
-    {
-    	ret = read(utflen, crc);
-    	if (ret <= 0)
-    		return ret;
-    }
-    else
-    {
-    	uint32_t len;
-       	ret = read(len, crc);
-        if (ret <= 0)
-        	return ret;
-        utflen = len;
-    }
-
-    uint8_t *bytearr = NULL;
-    char *chararr = NULL;
-    bytearr = new uint8_t[utflen];
-    chararr = new char[utflen];
-    memset(chararr, 0, utflen);
-
-    int c, char2, char3;
-    int count = 0;
-    int chararr_count=0;
-
-    ret = read(bytearr, utflen, crc);
-    if (ret <= 0)
-    {
-    	delete[] bytearr;
-    	delete[] chararr;
-    	return ret;
-    }
-
-    while (count < utflen) {
-        c = (int) bytearr[count] & 0xff;
-        if (c > 127) break;
-        count++;
-        chararr[chararr_count++]=(char)c;
-    }
-
-    while (count < utflen) {
-        c = (int) bytearr[count] & 0xff;
-        switch (c >> 4) {
-            case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
-                /* 0xxxxxxx*/
-                count++;
-                chararr[chararr_count++]=(char)c;
-                break;
-            case 12: case 13:
-                /* 110x xxxx   10xx xxxx*/
-                count += 2;
-                if (count > utflen)
-                {
-                	delete[] bytearr;
-                	delete[] chararr;
-                	return -1;
-                }
-                char2 = (int) bytearr[count-1];
-                if ((char2 & 0xC0) != 0x80)
-                {
-                	delete[] bytearr;
-                	delete[] chararr;
-                	return -1;
-                }
-                chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
-                                                (char2 & 0x3F));
-                break;
-            case 14:
-                /* 1110 xxxx  10xx xxxx  10xx xxxx */
-                count += 3;
-                if (count > utflen)
-                {
-                	delete[] bytearr;
-                	delete[] chararr;
-                	return -1;
-                }
-                char2 = (int) bytearr[count-2];
-                char3 = (int) bytearr[count-1];
-                if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
-                {
-                	delete[] bytearr;
-                	delete[] chararr;
-                	return -1;
-                }
-                chararr[chararr_count++]=(char)(((c     & 0x0F) << 12) |
-                                                ((char2 & 0x3F) << 6)  |
-                                                ((char3 & 0x3F) << 0));
-                break;
-            default:
-            	delete[] bytearr;
-            	delete[] chararr;
-            	return -1;
-        }
-    }
-    // The number of chars produced may be less than utflen
-    std::string value(chararr, chararr_count);
-    str = value;
-    delete[] bytearr;
-    delete[] chararr;
-    if (!widen)
-    	return (2 + utflen);
-    else
-    	return (4 + utflen);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/TailFile.cpp
----------------------------------------------------------------------
diff --git a/src/TailFile.cpp b/src/TailFile.cpp
deleted file mode 100644
index 445255b..0000000
--- a/src/TailFile.cpp
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * @file TailFile.cpp
- * TailFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <dirent.h>
-#include <limits.h>
-#include <unistd.h>
-
-#include "TimeUtil.h"
-#include "TailFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string TailFile::ProcessorName("TailFile");
-Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", "");
-Property TailFile::StateFile("State File",
-		"Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off", "");
-Relationship TailFile::Success("success", "All files are routed to success");
-
-void TailFile::initialize()
-{
-	//! Set the supported properties
-	std::set<Property> properties;
-	properties.insert(FileName);
-	properties.insert(StateFile);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<Relationship> relationships;
-	relationships.insert(Success);
-	setSupportedRelationships(relationships);
-}
-
-std::string TailFile::trimLeft(const std::string& s)
-{
-	const char *WHITESPACE = " \n\r\t";
-    size_t startpos = s.find_first_not_of(WHITESPACE);
-    return (startpos == std::string::npos) ? "" : s.substr(startpos);
-}
-
-std::string TailFile::trimRight(const std::string& s)
-{
-	const char *WHITESPACE = " \n\r\t";
-    size_t endpos = s.find_last_not_of(WHITESPACE);
-    return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1);
-}
-
-void TailFile::parseStateFileLine(char *buf)
-{
-	char *line = buf;
-
-    while ((line[0] == ' ') || (line[0] =='\t'))
-    	++line;
-
-    char first = line[0];
-    if ((first == '\0') || (first == '#')  || (first == '\r') || (first == '\n') || (first == '='))
-    {
-    	return;
-    }
-
-    char *equal = strchr(line, '=');
-    if (equal == NULL)
-    {
-    	return;
-    }
-
-    equal[0] = '\0';
-    std::string key = line;
-
-    equal++;
-    while ((equal[0] == ' ') || (equal[0] == '\t'))
-    	++equal;
-
-    first = equal[0];
-    if ((first == '\0') || (first == '\r') || (first== '\n'))
-    {
-    	return;
-    }
-
-    std::string value = equal;
-    key = trimRight(key);
-    value = trimRight(value);
-
-    if (key == "FILENAME")
-    	this->_currentTailFileName = value;
-    if (key == "POSITION")
-    	this->_currentTailFilePosition = std::stoi(value);
-
-    return;
-}
-
-void TailFile::recoverState()
-{
-	std::ifstream file(_stateFile.c_str(), std::ifstream::in);
-	if (!file.good())
-	{
-		_logger->log_error("load state file failed %s", _stateFile.c_str());
-		return;
-	}
-	const unsigned int bufSize = 512;
-	char buf[bufSize];
-	for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize))
-	{
-		parseStateFileLine(buf);
-	}
-}
-
-void TailFile::storeState()
-{
-	std::ofstream file(_stateFile.c_str());
-	if (!file.is_open())
-	{
-		_logger->log_error("store state file failed %s", _stateFile.c_str());
-		return;
-	}
-	file << "FILENAME=" << this->_currentTailFileName << "\n";
-	file << "POSITION=" << this->_currentTailFilePosition << "\n";
-	file.close();
-}
-
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j)
-{
-	return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver()
-{
-	struct stat statbuf;
-	std::vector<TailMatchedFileItem> matchedFiles;
-	std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
-
-	if (stat(fullPath.c_str(), &statbuf) == 0)
-	{
-		if (statbuf.st_size > this->_currentTailFilePosition)
-			// there are new input for the current tail file
-			return;
-
-		uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000);
-		std::string pattern = _fileName;
-		std::size_t found = _fileName.find_last_of(".");
-		if (found != std::string::npos)
-			pattern = _fileName.substr(0,found);
-		DIR *d;
-		d = opendir(this->_fileLocation.c_str());
-		if (!d)
-			return;
-		while (1)
-		{
-			struct dirent *entry;
-			entry = readdir(d);
-			if (!entry)
-				break;
-			std::string d_name = entry->d_name;
-			if (!(entry->d_type & DT_DIR))
-			{
-				std::string fileName = d_name;
-				std::string fileFullName = this->_fileLocation + "/" + d_name;
-				if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0)
-				{
-					if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile)
-					{
-						TailMatchedFileItem item;
-						item.fileName = fileName;
-						item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
-						matchedFiles.push_back(item);
-					}
-				}
-			}
-		}
-		closedir(d);
-
-		// Sort the list based on modified time
-		std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
-		for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it!=matchedFiles.end(); ++it)
-		{
-			TailMatchedFileItem item = *it;
-			if (item.fileName == _currentTailFileName)
-			{
-				++it;
-				if (it!=matchedFiles.end())
-				{
-					TailMatchedFileItem nextItem = *it;
-					_logger->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str());
-					_currentTailFileName = nextItem.fileName;
-					_currentTailFilePosition = 0;
-					storeState();
-				}
-				break;
-			}
-		}
-	}
-	else
-		return;
-}
-
-
-void TailFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-	std::string value;
-	if (context->getProperty(FileName.getName(), value))
-	{
-		std::size_t found = value.find_last_of("/\\");
-		this->_fileLocation = value.substr(0,found);
-		this->_fileName = value.substr(found+1);
-	}
-	if (context->getProperty(StateFile.getName(), value))
-	{
-		_stateFile = value;
-	}
-	if (!this->_stateRecovered)
-	{
-		_stateRecovered = true;
-		this->_currentTailFileName = _fileName;
-		this->_currentTailFilePosition = 0;
-		// recover the state if we have not done so
-		this->recoverState();
-	}
-	checkRollOver();
-	std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
-	struct stat statbuf;
-	if (stat(fullPath.c_str(), &statbuf) == 0)
-	{
-		if (statbuf.st_size <= this->_currentTailFilePosition)
-			// there are no new input for the current tail file
-		{
-			context->yield();
-			return;
-		}
-		FlowFileRecord *flowFile = session->create();
-		if (!flowFile)
-			return;
-		std::size_t found = _currentTailFileName.find_last_of(".");
-		std::string baseName = _currentTailFileName.substr(0,found);
-		std::string extension = _currentTailFileName.substr(found+1);
-		flowFile->updateAttribute(PATH, _fileLocation);
-		flowFile->addAttribute(ABSOLUTE_PATH, fullPath);
-		session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
-		session->transfer(flowFile, Success);
-		_logger->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize());
-		std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" +
-				std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
-		flowFile->updateAttribute(FILENAME, logName);
-		this->_currentTailFilePosition += flowFile->getSize();
-		storeState();
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/TimerDrivenSchedulingAgent.cpp b/src/TimerDrivenSchedulingAgent.cpp
deleted file mode 100644
index 3ce57ae..0000000
--- a/src/TimerDrivenSchedulingAgent.cpp
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * @file TimerDrivenSchedulingAgent.cpp
- * TimerDrivenSchedulingAgent class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <chrono>
-#include <thread>
-#include <iostream>
-#include "Property.h"
-#include "TimerDrivenSchedulingAgent.h"
-
-void TimerDrivenSchedulingAgent::schedule(Processor *processor)
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	_administrativeYieldDuration = 0;
-	std::string yieldValue;
-
-	if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue))
-	{
-		TimeUnit unit;
-		if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) &&
-					Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration))
-		{
-			_logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration);
-		}
-	}
-
-	_boredYieldDuration = 0;
-	if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
-	{
-		TimeUnit unit;
-		if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) &&
-					Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
-		{
-			_logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration);
-		}
-	}
-
-	if (processor->getScheduledState() != RUNNING)
-	{
-		_logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str());
-		return;
-	}
-
-	std::map<std::string, std::vector<std::thread *>>::iterator it =
-			_threads.find(processor->getUUIDStr());
-	if (it != _threads.end())
-	{
-		_logger->log_info("Can not schedule threads for processor %s because there are existed thread running");
-		return;
-	}
-
-	std::vector<std::thread *> threads;
-	for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
-	{
-		std::thread *thread = new std::thread(run, this, processor);
-		thread->detach();
-		threads.push_back(thread);
-		_logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(),
-				processor->getName().c_str());
-	}
-	_threads[processor->getUUIDStr().c_str()] = threads;
-
-	return;
-}
-
-void TimerDrivenSchedulingAgent::unschedule(Processor *processor)
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	if (processor->getScheduledState() != RUNNING)
-	{
-		_logger->log_info("Can not unschedule threads for processor %s because it is not running", processor->getName().c_str());
-		return;
-	}
-
-	std::map<std::string, std::vector<std::thread *>>::iterator it =
-			_threads.find(processor->getUUIDStr());
-
-	if (it == _threads.end())
-	{
-		_logger->log_info("Can not unschedule threads for processor %s because there are no existed thread running");
-		return;
-	}
-	for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread)
-	{
-		std::thread *thread = *itThread;
-		_logger->log_info("Scheduled Time Driven thread %d deleted for process %s", thread->get_id(),
-				processor->getName().c_str());
-		delete thread;
-	}
-	_threads.erase(processor->getUUIDStr());
-	processor->clearActiveTask();
-
-	return;
-}
-
-void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, Processor *processor)
-{
-	while (agent->_running)
-	{
-		bool shouldYield = agent->onTrigger(processor);
-
-		if (processor->isYield())
-		{
-			// Honor the yield
-			std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
-		}
-		else if (shouldYield && agent->_boredYieldDuration > 0)
-		{
-			// No work to do or need to apply back pressure
-			std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration));
-		}
-		std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
-	}
-	return;
-}
-
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/test/FlowFileRecordTest.cpp
----------------------------------------------------------------------
diff --git a/test/FlowFileRecordTest.cpp b/test/FlowFileRecordTest.cpp
deleted file mode 100644
index 09a3d33..0000000
--- a/test/FlowFileRecordTest.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * @file MiNiFiMain.cpp 
- * MiNiFiMain implementation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-
-#include "FlowFileRecord.h"
-
-int main(int argc, char **argv)
-{
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/test/Server.cpp
----------------------------------------------------------------------
diff --git a/test/Server.cpp b/test/Server.cpp
deleted file mode 100644
index e7b3452..0000000
--- a/test/Server.cpp
+++ /dev/null
@@ -1,607 +0,0 @@
-/* A simple server in the internet domain using TCP
-   The port number is passed as an argument */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <sys/types.h> 
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <errno.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <string>
-#include <errno.h>
-#include <chrono>
-#include <thread>
-#include <iostream>     // std::cout
-#include <fstream>      // std::ifstream
-#include <signal.h>
-
-#define DEFAULT_NIFI_SERVER_PORT 9000
-#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
-#define MAX_READ_TIMEOUT 30000 // 30 seconds
-
-//! FlowControl Protocol Msg Type
-typedef enum {
-	REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
-	REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
-	REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
-	REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
-	MAX_FLOW_CONTROL_MSG_TYPE
-} FlowControlMsgType;
-
-//! FlowControl Protocol Msg Type String
-static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
-{
-		"REGISTER_REQ",
-		"REGISTER_RESP",
-		"REPORT_REQ",
-		"REPORT_RESP"
-};
-
-//! Flow Control Msg Type to String
-inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
-{
-	if (type < MAX_FLOW_CONTROL_MSG_TYPE)
-		return FlowControlMsgTypeStr[type];
-	else
-		return NULL;
-}
-
-//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
-typedef enum {
-	//Fix length 8 bytes: client to server in register request, required field
-	FLOW_SERIAL_NUMBER,
-	// Flow XML name TLV: client to server in register request and report request, required field
-	FLOW_XML_NAME,
-	// Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
-	FLOW_XML_CONTENT,
-	// Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
-	REPORT_INTERVAL,
-	// Processor Name TLV:  server to client in report respond, option field in case server want to ask client to update processor property
-	PROCESSOR_NAME,
-	// Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
-	PROPERTY_NAME,
-	// Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
-	PROPERTY_VALUE,
-	// Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
-	REPORT_BLOB,
-	MAX_FLOW_MSG_ID
-} FlowControlMsgID;
-
-//! FlowControl Protocol Msg ID String
-static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
-{
-		"FLOW_SERIAL_NUMBER",
-		"FLOW_XML_NAME",
-		"FLOW_XML_CONTENT",
-		"REPORT_INTERVAL",
-		"PROCESSOR_NAME"
-		"PROPERTY_NAME",
-		"PROPERTY_VALUE",
-		"REPORT_BLOB"
-};
-
-#define TYPE_HDR_LEN 4 // Fix Hdr Type
-#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
-
-//! FlowControl Protocol Msg Len
-inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
-{
-	if (id == FLOW_SERIAL_NUMBER)
-		return (TYPE_HDR_LEN + 8);
-	else if (id == REPORT_INTERVAL)
-		return (TYPE_HDR_LEN + 4);
-	else if (id < MAX_FLOW_MSG_ID)
-		return (TLV_HDR_LEN + payLoadLen);
-	else
-		return -1;
-}
-
-//! Flow Control Msg Id to String
-inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
-{
-	if (id < MAX_FLOW_MSG_ID)
-		return FlowControlMsgIDStr[id];
-	else
-		return NULL;
-}
-
-//! Flow Control Respond status code
-typedef enum {
-	RESP_SUCCESS,
-	RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
-	RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
-	RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
-	RESP_FAILURE,
-	MAX_RESP_CODE
-} FlowControlRespCode;
-
-//! FlowControl Resp Code str
-static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
-{
-		"RESP_SUCCESS",
-		"RESP_TRIGGER_REGISTER",
-		"RESP_START_FLOW_CONTROLLER",
-		"RESP_STOP_FLOW_CONTROLLER",
-		"RESP_FAILURE"
-};
-
-//! Flow Control Resp Code to String
-inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
-{
-	if (code < MAX_RESP_CODE)
-		return FlowControlRespCodeStr[code];
-	else
-		return NULL;
-}
-
-//! Common FlowControlProtocol Header
-typedef struct {
-	uint32_t msgType; //! Msg Type
-	uint32_t seqNumber; //! Seq Number to match Req with Resp
-	uint32_t status; //! Resp Code, see FlowControlRespCode
-	uint32_t payloadLen; //! Msg Payload length
-} FlowControlProtocolHeader;
-
-
-//! encode uint32_t
-uint8_t *encode(uint8_t *buf, uint32_t value)
-{
-		*buf++ = (value & 0xFF000000) >> 24;
-		*buf++ = (value & 0x00FF0000) >> 16;
-		*buf++ = (value & 0x0000FF00) >> 8;
-		*buf++ = (value & 0x000000FF);
-		return buf;
-}
-
-//! encode uint32_t
-uint8_t *decode(uint8_t *buf, uint32_t &value)
-{
-		value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
-		return (buf + 4);
-}
-
-//! encode byte array
-uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
-{
-		memcpy(buf, bufArray, size);
-		buf += size;
-		return buf;
-}
-
-//! encode std::string
-uint8_t *encode(uint8_t *buf, std::string value)
-{
-		// add the \0 for size
-		buf = encode(buf, value.size()+1);
-		buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
-		return buf;
-}
-
-int sendData(int socket, uint8_t *buf, int buflen)
-{
-	int ret = 0, bytes = 0;
-
-	while (bytes < buflen)
-	{
-		ret = send(socket, buf+bytes, buflen-bytes, 0);
-		//check for errors
-		if (ret == -1)
-		{
-			return ret;
-		}
-		bytes+=ret;
-	}
-
-	return bytes;
-}
-
-void error(const char *msg)
-{
-    perror(msg);
-    exit(1);
-}
-
-/* readline - read a '\n' terminated line from socket fd 
-              into buffer bufptr of size len. The line in the
-              buffer is terminated with '\0'.
-              It returns -1 in case of error or if
-              the capacity of the buffer is exceeded.
-	      It returns 0 if EOF is encountered before reading '\n'.
- */
-int readline( int fd, char *bufptr, size_t len )
-{
-  /* Note that this function is very tricky.  It uses the
-     static variables bp, cnt, and b to establish a local buffer.
-     The recv call requests large chunks of data (the size of the buffer).
-     Then if the recv call reads more than one line, the overflow
-     remains in the buffer and it is made available to the next call
-     to readline. 
-     Notice also that this routine reads up to '\n' and overwrites
-     it with '\0'. Thus if the line is really terminated with
-     "\r\n", the '\r' will remain unchanged.
-  */
-  char *bufx = bufptr;
-  static char *bp;
-  static int cnt = 0;
-  static char b[ 4096 ];
-  char c;
-  
-  while ( --len > 0 )
-    {
-      if ( --cnt <= 0 )
-	{
-	  cnt = recv( fd, b, sizeof( b ), 0 );
-	  if ( cnt < 0 )
-	    {
-	      if ( errno == EINTR )
-		{
-		  len++;		/* the while will decrement */
-		  continue;
-		}
-	      return -1;
-	    }
-	  if ( cnt == 0 )
-	    return 0;
-	  bp = b;
-	}
-      c = *bp++;
-      *bufptr++ = c;
-      if ( c == '\n' )
-	{
-	  *bufptr = '\0';
-	  return bufptr - bufx;
-	}
-    }
-  return -1;
-}
-
-int readData(int socket, uint8_t *buf, int buflen)
-{
-	int sendSize = buflen;
-	int status;
-
-	while (buflen)
-	{
-#ifndef __MACH__
-		status = read(socket, buf, buflen);
-#else
-		status = recv(socket, buf, buflen, 0);
-#endif
-		if (status <= 0)
-		{
-			return status;
-		}
-		buflen -= status;
-		buf += status;
-	}
-
-	return sendSize;
-}
-
-int readHdr(int socket, FlowControlProtocolHeader *hdr)
-{
-	uint8_t buffer[sizeof(FlowControlProtocolHeader)];
-
-	uint8_t *data = buffer;
-
-	int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader));
-	if (status <= 0)
-		return status;
-
-	uint32_t value;
-	data = decode(data, value);
-	hdr->msgType = value;
-
-	data = decode(data, value);
-	hdr->seqNumber = value;
-
-	data = decode(data, value);
-	hdr->status = value;
-
-	data = decode(data, value);
-	hdr->payloadLen = value;
-
-	return sizeof(FlowControlProtocolHeader);
-}
-
-int readXML(char **xmlContent)
-{
-	  std::ifstream is ("conf/flowServer.xml", std::ifstream::binary);
-	  if (is) {
-	    // get length of file:
-	    is.seekg (0, is.end);
-	    int length = is.tellg();
-	    is.seekg (0, is.beg);
-
-	    char * buffer = new char [length];
-
-	    printf("Reading %s len %d\n", "conf/flowServer.xml", length);
-	    // read data as a block:
-	    is.read (buffer,length);
-
-	    is.close();
-
-	    // ...buffer contains the entire file...
-	    *xmlContent = buffer;
-
-	    return length;
-	  }
-	  return 0;
-}
-
-static int sockfd = 0, newsockfd = 0;
-void sigHandler(int signal)
-{
-	if (signal == SIGINT || signal == SIGTERM)
-	{
-		close(newsockfd);
-		close(sockfd);
-		exit(1);
-	}
-}
-
-int main(int argc, char *argv[])
-{
-     int portno;
-     socklen_t clilen;
-     struct sockaddr_in serv_addr, cli_addr;
-     char buffer[4096];
-     int flag = 0;
-     int number = 0;
-     
-     int n;
-     if (argc < 2) {
-         fprintf(stderr,"ERROR, no port provided\n");
-         exit(1);
-     }
-
- 	 if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR)
- 	 {
-
- 		return -1;
- 	 }
-     sockfd = socket(AF_INET, SOCK_STREAM, 0);
-     if (sockfd < 0) 
-        error("ERROR opening socket");
-     bzero((char *) &serv_addr, sizeof(serv_addr));
-     portno = atoi(argv[1]);
-     serv_addr.sin_family = AF_INET;
-     serv_addr.sin_addr.s_addr = INADDR_ANY;
-     serv_addr.sin_port = htons(portno);
-     if (bind(sockfd, (struct sockaddr *) &serv_addr,
-              sizeof(serv_addr)) < 0) 
-              error("ERROR on binding");
-     listen(sockfd,5);
-     if (portno == DEFAULT_NIFI_SERVER_PORT)
-     {
-    	 while (true)
-    	 {
-    		 clilen = sizeof(cli_addr);
-    		 newsockfd = accept(sockfd,
-                 (struct sockaddr *) &cli_addr, 
-                 &clilen);
-    		 if (newsockfd < 0)
-    		 {
-    			 error("ERROR on accept");
-    			 break;
-    		 }
-    		 // process request
-    		 FlowControlProtocolHeader hdr;
-    		 int status = readHdr(newsockfd, &hdr);
-    		 if (status > 0)
-    		 {
-    			 printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
-    		     printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber);
-    		     printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
-    		     printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen);
-    		 	 if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ)
-    		 	 {
-    		 		printf("Flow Control Protocol Register Req receive\n");
-    		 		uint8_t *payload = new uint8_t[hdr.payloadLen];
-    		 		uint8_t *payloadPtr = payload;
-    		 		status = readData(newsockfd, payload, hdr.payloadLen);
-    		 		while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
-    		 		{
-    		 			uint32_t msgID = 0xFFFFFFFF;
-    		 			payloadPtr = decode(payloadPtr, msgID);
-    		 			if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER)
-    		 			{
-    		 				// Fixed 8 bytes
-    		 				uint8_t seqNum[8];
-    		 				memcpy(seqNum, payloadPtr, 8);
-    		 				printf("Flow Control Protocol Register Req receive serial num\n");
-    		 				payloadPtr += 8;
-    		 			}
-    		 			else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME)
-    		 			{
-    		 				uint32_t len;
-    		 				payloadPtr = decode(payloadPtr, len);
-    		 				printf("Flow Control Protocol receive XML name length %d\n", len);
-    		 				std::string flowName = (const char *) payloadPtr;
-    		 				payloadPtr += len;
-    		 				printf("Flow Control Protocol receive XML name %s\n", flowName.c_str());
-    		 			}
-    		 			else
-    		 			{
-    		 				break;
-    		 			}
-    		 		}
-    		 		delete[] payload;
-    		 		// Send Register Respond
-    		 		// Calculate the total payload msg size
-    		 		char *xmlContent;
-    		 		uint32_t xmlLen = readXML(&xmlContent);
-    		 		uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
-    		 		if (xmlLen > 0)
-    		 			payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen);
-
-    		 		uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
-    		 		uint8_t *data = new uint8_t[size];
-    		 		uint8_t *start = data;
-
-    		 		// encode the HDR
-    		 		hdr.msgType = REGISTER_RESP;
-    		 		hdr.payloadLen = payloadSize;
-    		 		hdr.status = RESP_SUCCESS;
-    		 		data = encode(data, hdr.msgType);
-    		 		data = encode(data, hdr.seqNumber);
-    		 		data = encode(data, hdr.status);
-    		 		data = encode(data, hdr.payloadLen);
-
-    		 		// encode the report interval
-    		 		data = encode(data, REPORT_INTERVAL);
-    		 		data = encode(data, DEFAULT_REPORT_INTERVAL);
-
-    		 		// encode the XML content
-    		 		if (xmlLen > 0)
-    		 		{
-    		 			data = encode(data, FLOW_XML_CONTENT);
-    		 			data = encode(data, xmlLen);
-    		 			data = encode(data, (uint8_t *) xmlContent, xmlLen);
-    		 			delete[] xmlContent;
-    		 		}
-
-    		 		// send it
-    		 		status = sendData(newsockfd, start, size);
-    		 		delete[] start;
-    		 	 }
-    		 	 else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ)
-        		 {
-        		 		printf("Flow Control Protocol Report Req receive\n");
-        		 		uint8_t *payload = new uint8_t[hdr.payloadLen];
-        		 		uint8_t *payloadPtr = payload;
-        		 		status = readData(newsockfd, payload, hdr.payloadLen);
-        		 		while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
-        		 		{
-        		 			uint32_t msgID = 0xFFFFFFFF;
-        		 			payloadPtr = decode(payloadPtr, msgID);
-        		 			if (((FlowControlMsgID) msgID) == FLOW_XML_NAME)
-        		 			{
-        		 				uint32_t len;
-        		 				payloadPtr = decode(payloadPtr, len);
-        		 				printf("Flow Control Protocol receive XML name length %d\n", len);
-        		 				std::string flowName = (const char *) payloadPtr;
-        		 				payloadPtr += len;
-        		 				printf("Flow Control Protocol receive XML name %s\n", flowName.c_str());
-        		 			}
-        		 			else
-        		 			{
-        		 				break;
-        		 			}
-        		 		}
-        		 		delete[] payload;
-        		 		// Send Register Respond
-        		 		// Calculate the total payload msg size
-        		 		std::string processor = "RealTimeDataCollector";
-        		 		std::string propertyName1 = "real Time Message ID";
-        		 		std::string propertyValue1 = "41";
-        		 		std::string propertyName2 = "Batch Message ID";
-        		 		std::string propertyValue2 = "172,30,48";
-        		 		if (flag == 0)
-        		 		{
-              		 		propertyName1 = "Real Time Message ID";
-              		 		propertyValue1 = "41";
-                		    propertyName2 = "Batch Message ID";
-                		    propertyValue2 = "172,48";
-        		 			flag = 1;
-        		 		}
-        		 		else if (flag == 1)
-        		 		{
-        		 			propertyName1 = "Real Time Message ID";
-        		 			propertyValue1 = "172,48";
-        		 			propertyName2 = "Batch Message ID";
-        		 			propertyValue2 = "41";
-        		 			flag = 0;
-        		 		}
-        		 		uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1);
-        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1);
-        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1);
-        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1);
-        		 		payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1);
-
-        		 		uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
-        		 		uint8_t *data = new uint8_t[size];
-        		 		uint8_t *start = data;
-
-        		 		// encode the HDR
-        		 		hdr.msgType = REPORT_RESP;
-        		 		hdr.payloadLen = payloadSize;
-        		 		hdr.status = RESP_SUCCESS;
-
-        		 		if (number >= 10 && number < 20)
-        		 	    {
-        		 	        // After 10 second report, stop the flow controller for 10 second
-        		 	       hdr.status = RESP_STOP_FLOW_CONTROLLER;
-        		 	    }
-        		 		else if (number == 20)
-        		 		{
-        		 			// restart the flow controller after 10 second
-        		 			hdr.status = RESP_START_FLOW_CONTROLLER;
-        		 		}
-        		 		else if (number == 30)
-        		 		{
-        		 			// retrigger register
-        		 			hdr.status = RESP_TRIGGER_REGISTER;
-        		 			number = 0;
-        		 		}
-
-        		 	    number++;
-
-        		 		data = encode(data, hdr.msgType);
-        		 		data = encode(data, hdr.seqNumber);
-        		 		data = encode(data, hdr.status);
-        		 		data = encode(data, hdr.payloadLen);
-
-        		 		// encode the processorName
-        		 		data = encode(data, PROCESSOR_NAME);
-        		 		data = encode(data, processor);
-
-        		 		// encode the propertyName and value TLV
-        		 		data = encode(data, PROPERTY_NAME);
-            		 	data = encode(data, propertyName1);
-            		 	data = encode(data, PROPERTY_VALUE);
-            		 	data = encode(data, propertyValue1);
-            		 	data = encode(data, PROPERTY_NAME);
-            		 	data = encode(data, propertyName2);
-            		 	data = encode(data, PROPERTY_VALUE);
-            		 	data = encode(data, propertyValue2);
-        		 		// send it
-        		 		status = sendData(newsockfd, start, size);
-        		 		delete[] start;
-        		 	 }
-    		 }
-    		 close(newsockfd);
-    	 }
-    	 close(sockfd);
-     }
-     else
-     {
-    	 clilen = sizeof(cli_addr);
-    	 newsockfd = accept(sockfd,
-    	                (struct sockaddr *) &cli_addr,
-    	                 &clilen);
-    	 if (newsockfd < 0)
-    	    	error("ERROR on accept");
-    	 while (1)
-    	 {
-    	    	bzero(buffer,4096);
-    	    	n = readline(newsockfd,buffer,4095);
-    	    	if (n <= 0 )
-    	        {
-    	    		close(newsockfd);
-    	    		newsockfd = accept(sockfd,
-    	    		    	                (struct sockaddr *) &cli_addr,
-    	    		    	                 &clilen);
-    	    		continue;
-    	    	}
-    	    	printf("%s",buffer);
-    	  }
-    	  close(newsockfd);
-    	  close(sockfd);
-     }
-     return 0; 
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile
----------------------------------------------------------------------
diff --git a/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile b/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile
deleted file mode 100644
index f23f477..0000000
--- a/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
-CFLAGS = -Wall
-INCLUDES = -I./include
-
-CPP_FILES := $(wildcard src/*.cpp)
-OBJ_FILES := $(addprefix build/,$(notdir $(CPP_FILES:.cpp=.o)))
-
-all: lib/libyaml-cpp.a
-
-lib:
-	mkdir -p ./lib
-
-build:
-	mkdir -p ./build
-
-lib/libyaml-cpp.a: $(OBJ_FILES)
-	mkdir -p ./lib
-	ar crs $@ $^
-
-build/%.o: src/%.cpp
-	mkdir -p ./build
-	g++ -Os $(INCLUDES) $(CC_FLAGS) -c -o $@ $<
-
-clean:
-	rm -rf ./lib ./build