You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/11/14 01:48:24 UTC

[10/25] nifi-minifi-cpp git commit: MINIFICPP-250: Initial implementation fo CapturePacket Processor that uses lipcap.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp b/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp
new file mode 100644
index 0000000..76391c8
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp
@@ -0,0 +1,695 @@
+#define LOG_MODULE PacketLogModuleTcpReassembly
+
+#include <TcpReassembly.h>
+#include <TcpLayer.h>
+#include <IPv4Layer.h>
+#include <PacketUtils.h>
+#include <IpAddress.h>
+#include <Logger.h>
+#include <sstream>
+#ifdef WIN32 //for using ntohl, ntohs, etc.
+#include <winsock2.h>
+#elif LINUX
+#include <in.h> //for using ntohl, ntohs, etc.
+#elif MAC_OS_X
+#include <arpa/inet.h> //for using ntohl, ntohs, etc.
+#endif
+
+
+namespace pcpp
+{
+
+TcpStreamData::TcpStreamData()
+{
+	m_Data = NULL;
+	m_DataLen = 0;
+	m_DeleteDataOnDestruction = false;
+}
+
+TcpStreamData::TcpStreamData(uint8_t* tcpData, size_t tcpDataLength, ConnectionData connData)
+{
+	m_Data = tcpData;
+	m_DataLen = tcpDataLength;
+	m_Connection = connData;
+	m_DeleteDataOnDestruction = true;
+}
+
+TcpStreamData::~TcpStreamData()
+{
+	if (m_DeleteDataOnDestruction && m_Data != NULL)
+	{
+		delete [] m_Data;
+	}
+}
+
+TcpStreamData::TcpStreamData(TcpStreamData& other)
+{
+	copyData(other);
+}
+
+TcpStreamData& TcpStreamData::operator=(const TcpStreamData& other)
+{
+	if (this == &other)
+		return *this;
+
+	if (m_DeleteDataOnDestruction && m_Data != NULL)
+		delete [] m_Data;
+
+	copyData(other);
+	return *this;
+}
+
+void TcpStreamData::copyData(const TcpStreamData& other)
+{
+	m_DataLen = other.m_DataLen;
+
+	if (other.m_Data != NULL)
+	{
+		m_Data = new uint8_t[m_DataLen];
+		memcpy(m_Data, other.m_Data, m_DataLen);
+	}
+	else
+		m_Data = NULL;
+
+	m_Connection = other.m_Connection;
+	m_DeleteDataOnDestruction = true;
+}
+
+
+TcpReassembly::TcpReassembly(OnTcpMessageReady onMessageReadyCallback, void* userCookie, OnTcpConnectionStart onConnectionStartCallback, OnTcpConnectionEnd onConnectionEndCallback)
+{
+	m_OnMessageReadyCallback = onMessageReadyCallback;
+	m_UserCookie = userCookie;
+	m_OnConnStart = onConnectionStartCallback;
+	m_OnConnEnd = onConnectionEndCallback;
+}
+
+TcpReassembly::~TcpReassembly()
+{
+	while (!m_ConnectionList.empty())
+	{
+		delete m_ConnectionList.begin()->second;
+		m_ConnectionList.erase(m_ConnectionList.begin());
+	}
+}
+
+void TcpReassembly::ReassemblePacket(Packet& tcpData)
+{
+	// TCP reassembly doesn't support IPv6 for now
+	IPv4Layer* ipLayer = tcpData.getLayerOfType<IPv4Layer>();
+	if (ipLayer == NULL)
+		return;
+
+	// Ignore non-TCP packets
+	TcpLayer* tcpLayer = tcpData.getLayerOfType<TcpLayer>();
+	if (tcpLayer == NULL)
+		return;
+
+	// calculate the TCP payload size from the value of IPV4's "total length" field.
+	// The reason we don't take the TCP payload size is because sometimes there is padding on the packet that doesn't belong to the TCP layer
+	size_t tcpPayloadSize = ntohs(ipLayer->getIPv4Header()->totalLength) - ipLayer->getHeaderLen() - tcpLayer->getHeaderLen();
+
+	// calculate if this packet has FIN or RST flags
+	bool isFin = (tcpLayer->getTcpHeader()->finFlag == 1);
+	bool isRst = (tcpLayer->getTcpHeader()->rstFlag == 1);
+	bool isFinOrRst = isFin || isRst;
+
+	// ignore ACK packets or TCP packets with no payload (except for SYN, FIN or RST packets which we'll later need)
+	if (tcpPayloadSize == 0 && tcpLayer->getTcpHeader()->synFlag == 0 && !isFinOrRst)
+		return;
+
+	// if the actual TCP payload is smaller than the value written in IPV4's "total length" field then adjust tcpPayloadSize to avoid buffer overflow
+	if (tcpLayer->getLayerPayloadSize() < tcpPayloadSize)
+	{
+		LOG_DEBUG("Got a packet where actual TCP payload size is smaller then the value written in IPv4's 'total length' header. Adjusting tcpPayloadSize to avoid buffer overflow");
+		tcpPayloadSize = tcpLayer->getLayerPayloadSize();
+	}
+
+
+	TcpReassemblyData* tcpReassemblyData = NULL;
+
+	// calculate flow key for this packet
+	uint32_t flowKey = hash5Tuple(&tcpData);
+
+	// if this packet belongs to a connection that was already closed (for example: data packet that comes after FIN), ignore it
+	if (m_ClosedConnectionList.find(flowKey) != m_ClosedConnectionList.end())
+	{
+		LOG_DEBUG("Ignoring packet of already closed flow [0x%X]", flowKey);
+		return;
+	}
+
+	// find the connection in the connection map
+	std::map<uint32_t, TcpReassemblyData*>::iterator iter = m_ConnectionList.find(flowKey);
+	if (iter == m_ConnectionList.end())
+	{
+		// if it's a packet of a new connection, create a TcpReassemblyData object and add it to the active connection list
+		tcpReassemblyData = new TcpReassemblyData();
+		tcpReassemblyData->connData.srcIP = ipLayer->getSrcIpAddress();
+		tcpReassemblyData->connData.dstIP = ipLayer->getDstIpAddress();
+		tcpReassemblyData->connData.srcPort = ntohs(tcpLayer->getTcpHeader()->portSrc);
+		tcpReassemblyData->connData.dstPort = ntohs(tcpLayer->getTcpHeader()->portDst);
+		tcpReassemblyData->connData.flowKey = flowKey;
+
+		m_ConnectionList[flowKey] = tcpReassemblyData;
+
+		m_ConnectionInfo.push_back(tcpReassemblyData->connData);
+
+		// fire connection start callback
+		if (m_OnConnStart != NULL)
+			m_OnConnStart(tcpReassemblyData->connData, m_UserCookie);
+	}
+	else // connection already exists
+		tcpReassemblyData = iter->second;
+
+	int sideIndex = -1;
+	bool first = false;
+
+	// calcualte packet's source IP and source port
+	uint32_t srcIP = ipLayer->getSrcIpAddress().toInt();
+	uint16_t srcPort = tcpLayer->getTcpHeader()->portSrc;
+
+	// if this is a new connection and it's the first packet we see on that connection
+	if (tcpReassemblyData->numOfSides == 0)
+	{
+		LOG_DEBUG("Setting side for new connection");
+
+		// open the first side of the connection, side index is 0
+		sideIndex = 0;
+		tcpReassemblyData->twoSides[sideIndex].srcIP = srcIP;
+		tcpReassemblyData->twoSides[sideIndex].srcPort = srcPort;
+		tcpReassemblyData->numOfSides++;
+		first = true;
+	}
+	// if there is already one side in this connection (which will be at side index 0)
+	else if (tcpReassemblyData->numOfSides == 1)
+	{
+		// check if packet belongs to that side
+		if (tcpReassemblyData->twoSides[0].srcIP == srcIP && tcpReassemblyData->twoSides[0].srcPort == srcPort)
+		{
+			sideIndex = 0;
+		}
+		else
+		{
+			// this means packet belong to the second side which doesn't yet exist. Open a second side with side index 1
+			LOG_DEBUG("Setting second side of a connection");
+			sideIndex = 1;
+			tcpReassemblyData->twoSides[sideIndex].srcIP = srcIP;
+			tcpReassemblyData->twoSides[sideIndex].srcPort = srcPort;
+			tcpReassemblyData->numOfSides++;
+			first = true;
+		}
+	}
+	// if there are already 2 sides open for this connection
+	else if (tcpReassemblyData->numOfSides == 2)
+	{
+		// check if packet matches side 0
+		if (tcpReassemblyData->twoSides[0].srcIP == srcIP && tcpReassemblyData->twoSides[0].srcPort == srcPort)
+		{
+			sideIndex = 0;
+		}
+		// check if packet matches side 1
+		else if (tcpReassemblyData->twoSides[1].srcIP == srcIP && tcpReassemblyData->twoSides[1].srcPort == srcPort)
+		{
+			sideIndex = 1;
+		}
+		// packet doesn't match either side. This case doesn't make sense but it's handled anyway. Packet will be ignored
+		else
+		{
+			LOG_ERROR("Error occurred - packet doesn't match either side of the connection!!");
+			return;
+		}
+	}
+	// there are more than 2 side - this case doesn't make sense and shouldn't happen, but handled anyway. Packet will be ignored
+	else
+	{
+		LOG_ERROR("Error occurred - connection has more than 2 sides!!");
+		return;
+	}
+
+	// if this side already got FIN or RST packet before, ignore this packet as this side is considered closed
+	if (tcpReassemblyData->twoSides[sideIndex].gotFinOrRst)
+	{
+		LOG_DEBUG("Got a packet after FIN or RST were already seen on this side (%d). Ignoring this packet", sideIndex);
+		return;
+	}
+
+	// handle FIN/RST packets that don't contain additional TCP data
+	if (isFinOrRst && tcpPayloadSize == 0)
+	{
+		LOG_DEBUG("Got FIN or RST packet without data on side %d", sideIndex);
+
+		handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+		return;
+	}
+
+	// check if this packet contains data from a different side than the side seen before.
+	// If this is the case then treat the out-of-order packet list as missing data and send them to the user (callback) together with an indication that some data was missing.
+	// Why? because a new packet from the other side means the previous message was probably already received and a new message is starting.
+	// In this case out-of-order packets are probably actually missing data
+	// For example: let's assume these are HTTP messages. If we're seeing the first packet of a response this means the server has already received the full request and is now starting
+	// to send the response. So if we still have out-of-order packets from the request it probably means that some packets were lost during the capture. So we don't expect the client to
+	// continue sending packets of the previous request, so we'll treat the out-of-order packets as missing data
+	//
+	// I'm aware that there are edge cases where the situation I described above is not true, but at some point we must clean the out-of-order packet list to avoid memory leak.
+	// I decided to do what Wireshark does and clean this list when starting to see a message from the other side
+	if (!first && tcpPayloadSize > 0 && tcpReassemblyData->prevSide != -1 && tcpReassemblyData->prevSide != sideIndex &&
+			tcpReassemblyData->twoSides[tcpReassemblyData->prevSide].tcpFragmentList.size() > 0)
+	{
+		LOG_DEBUG("Seeing a first data packet from a different side. Previous side was %d, current side is %d", tcpReassemblyData->prevSide, sideIndex);
+		checkOutOfOrderFragments(tcpReassemblyData, tcpReassemblyData->prevSide, true);
+	}
+	tcpReassemblyData->prevSide = sideIndex;
+
+	// extract sequence value from packet
+	uint32_t sequence = ntohl(tcpLayer->getTcpHeader()->sequenceNumber);
+
+	// if it's the first packet we see on this side of the connection
+	if (first)
+	{
+		LOG_DEBUG("First data from this side of the connection");
+
+		// set initial sequence
+		tcpReassemblyData->twoSides[sideIndex].sequence = sequence + tcpPayloadSize;
+		if (tcpLayer->getTcpHeader()->synFlag != 0)
+			tcpReassemblyData->twoSides[sideIndex].sequence++;
+
+		// send data to the callback
+		if (tcpPayloadSize != 0 && m_OnMessageReadyCallback != NULL)
+		{
+			TcpStreamData streamData(tcpLayer->getLayerPayload(), tcpPayloadSize, tcpReassemblyData->connData);
+			streamData.setDeleteDataOnDestruction(false);
+			m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+		}
+
+		// handle case where this packet is FIN or RST (although it's unlikely)
+		if (isFinOrRst)
+			handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+		// return - nothing else to do here
+		return;
+	}
+
+	// if packet sequence is smaller than expected - this means that part or all of the TCP data is being re-transmitted
+	if (sequence < tcpReassemblyData->twoSides[sideIndex].sequence)
+	{
+		LOG_DEBUG("Found new data with the sequence lower than expected");
+
+		// calculate the sequence after this packet to see if this TCP payload contains also new data
+		uint32_t newSequence = sequence + tcpPayloadSize;
+
+		// this means that some of payload is new
+		if (newSequence > tcpReassemblyData->twoSides[sideIndex].sequence)
+		{
+			// calculate the size of the new data
+			uint32_t newLength = tcpReassemblyData->twoSides[sideIndex].sequence - sequence;
+
+			LOG_DEBUG("Although sequence is lower than expected payload is long enough to contain new data. Calling the callback with the new data");
+
+			// update the sequence for this side to include the new data that was seen
+			tcpReassemblyData->twoSides[sideIndex].sequence += tcpPayloadSize - newLength;
+
+			// send only the new data to the callback
+			if (m_OnMessageReadyCallback != NULL)
+			{
+				TcpStreamData streamData(tcpLayer->getLayerPayload() + newLength, tcpPayloadSize - newLength, tcpReassemblyData->connData);
+				streamData.setDeleteDataOnDestruction(false);
+				m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+			}
+		}
+
+		// handle case where this packet is FIN or RST
+		if (isFinOrRst)
+			handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+		// return - nothing else to do here
+		return;
+	}
+
+	// if packet sequence is exactly as expected - this is the "good" case and the most common one
+	else if (sequence == tcpReassemblyData->twoSides[sideIndex].sequence)
+	{
+		// if TCP data size is 0 - nothing to do
+		if (tcpPayloadSize == 0)
+		{
+			LOG_DEBUG("Payload length is 0, doing nothing");
+
+			// handle case where this packet is FIN or RST
+			if (isFinOrRst)
+				handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+			return;
+		}
+
+		LOG_DEBUG("Found new data with expected sequence. Calling the callback");
+
+		// update the sequence for this side to include TCP data from this packet
+		tcpReassemblyData->twoSides[sideIndex].sequence += tcpPayloadSize;
+
+		// if this is a SYN packet - add +1 to the sequence
+		if (tcpLayer->getTcpHeader()->synFlag != 0)
+			tcpReassemblyData->twoSides[sideIndex].sequence++;
+
+		// send the data to the callback
+		if (m_OnMessageReadyCallback != NULL)
+		{
+			TcpStreamData streamData(tcpLayer->getLayerPayload(), tcpPayloadSize, tcpReassemblyData->connData);
+			streamData.setDeleteDataOnDestruction(false);
+			m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+		}
+
+		//while (checkOutOfOrderFragments(tcpReassemblyData, sideIndex)) {}
+
+		// now that we've seen new data, go over the list of out-of-order packets and see if one or more of them fits now
+		checkOutOfOrderFragments(tcpReassemblyData, sideIndex, false);
+
+		// handle case where this packet is FIN or RST
+		if (isFinOrRst)
+			handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+		// return - nothing else to do here
+		return;
+	}
+
+	// this case means sequence size of the packet is higher than expected which means the packet is out-of-order or some packets were lost (missing data).
+	// we don't know which of the 2 cases it is at this point so we just add this data to the out-of-order packet list
+	else
+	{
+		// if TCP data size is 0 - nothing to do
+		if (tcpPayloadSize == 0)
+		{
+			LOG_DEBUG("Payload length is 0, doing nothing");
+
+			// handle case where this packet is FIN or RST
+			if (isFinOrRst)
+				handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+			return;
+		}
+
+		// create a new TcpFragment, copy the TCP data to it and add this packet to the the out-of-order packet list
+		TcpFragment* newTcpFrag = new TcpFragment();
+		newTcpFrag->data = new uint8_t[tcpPayloadSize];
+		newTcpFrag->dataLength = tcpPayloadSize;
+		newTcpFrag->sequence = sequence;
+		memcpy(newTcpFrag->data, tcpLayer->getLayerPayload(), tcpPayloadSize);
+		tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.pushBack(newTcpFrag);
+
+		LOG_DEBUG("Found out-of-order packet and added a new TCP fragment with size %d to the out-of-order list of side %d", (int)tcpPayloadSize, sideIndex);
+
+		// handle case where this packet is FIN or RST
+		if (isFinOrRst)
+		{
+			handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+			return;
+		}
+
+	}
+}
+
+void TcpReassembly::ReassemblePacket(RawPacket* tcpRawData)
+{
+	Packet parsedPacket(tcpRawData, false);
+	ReassemblePacket(parsedPacket);
+}
+
+std::string TcpReassembly::prepareMissingDataMessage(uint32_t missingDataLen)
+{
+	std::stringstream missingDataTextStream;
+	missingDataTextStream << "[" << missingDataLen << " bytes missing]";
+	return missingDataTextStream.str();
+}
+
+void TcpReassembly::handleFinOrRst(TcpReassemblyData* tcpReassemblyData, int sideIndex, uint32_t flowKey)
+{
+	// if this side already saw a FIN or RST packet, do nothing and return
+	if (tcpReassemblyData->twoSides[sideIndex].gotFinOrRst)
+		return;
+
+	LOG_DEBUG("Handling FIN or RST packet on side %d", sideIndex);
+
+	// set FIN/RST flag for this side
+	tcpReassemblyData->twoSides[sideIndex].gotFinOrRst = true;
+
+	// check if the other side also sees FIN or RST packet. If so - close the flow. Otherwise - only clear the out-of-order packets for this side
+	int otherSideIndex = 1 - sideIndex;
+	if (tcpReassemblyData->twoSides[otherSideIndex].gotFinOrRst)
+		closeConnectionInternal(flowKey, TcpReassembly::TcpReassemblyConnectionClosedByFIN_RST);
+	else
+		checkOutOfOrderFragments(tcpReassemblyData, sideIndex, true);
+}
+
+void TcpReassembly::checkOutOfOrderFragments(TcpReassemblyData* tcpReassemblyData, int sideIndex, bool cleanWholeFragList)
+{
+	bool foundSomething = false;
+
+	do
+	{
+		LOG_DEBUG("Starting first iteration of checkOutOfOrderFragments - looking for fragments that match the current sequence or have smaller sequence");
+
+		int index = 0;
+		foundSomething = false;
+
+		do
+		{
+			index = 0;
+			foundSomething = false;
+
+			// first fragment list iteration - go over the whole fragment list and see if can find fragments that match the current sequence
+			// or have smaller sequence but have big enough payload to get new data
+			while (index < (int)tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.size())
+			{
+				TcpFragment* curTcpFrag = tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.at(index);
+
+				// if fragment sequence matches the current sequence
+				if (curTcpFrag->sequence == tcpReassemblyData->twoSides[sideIndex].sequence)
+				{
+					// update sequence
+					tcpReassemblyData->twoSides[sideIndex].sequence += curTcpFrag->dataLength;
+					if (curTcpFrag->data != NULL)
+					{
+						LOG_DEBUG("Found an out-of-order packet matching to the current sequence with size %d on side %d. Pulling it out of the list and sending the data to the callback", (int)curTcpFrag->dataLength, sideIndex);
+
+						// send new data to callback
+
+						if (m_OnMessageReadyCallback != NULL)
+						{
+							TcpStreamData streamData(curTcpFrag->data, curTcpFrag->dataLength, tcpReassemblyData->connData);
+							streamData.setDeleteDataOnDestruction(false);
+							m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+						}
+					}
+
+
+					// remove fragment from list
+					tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.erase(tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.begin() + index);
+
+					foundSomething = true;
+
+					continue;
+				}
+
+				// if fragment sequence has lower sequence than the current sequence
+				if (curTcpFrag->sequence < tcpReassemblyData->twoSides[sideIndex].sequence)
+				{
+					// check if it still has new data
+					uint32_t newSequence = curTcpFrag->sequence + curTcpFrag->dataLength;
+
+					// it has new data
+					if (newSequence > tcpReassemblyData->twoSides[sideIndex].sequence)
+					{
+						// calculate the delta new data size
+						uint32_t newLength = tcpReassemblyData->twoSides[sideIndex].sequence - curTcpFrag->sequence;
+
+						LOG_DEBUG("Found a fragment in the out-of-order list which its sequence is lower than expected but its payload is long enough to contain new data. "
+							"Calling the callback with the new data. Fragment size is %d on side %d, new data size is %d", (int)curTcpFrag->dataLength, sideIndex, (int)(curTcpFrag->dataLength - newLength));
+
+						// update current sequence with the delta new data size
+						tcpReassemblyData->twoSides[sideIndex].sequence += curTcpFrag->dataLength - newLength;
+
+						// send only the new data to the callback
+						if (m_OnMessageReadyCallback != NULL)
+						{
+							TcpStreamData streamData(curTcpFrag->data + newLength, curTcpFrag->dataLength - newLength, tcpReassemblyData->connData);
+							streamData.setDeleteDataOnDestruction(false);
+							m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+						}
+
+						foundSomething = true;
+					}
+					else
+					{
+						LOG_DEBUG("Found a fragment in the out-of-order list which doesn't contain any new data, ignoring it. Fragment size is %d on side %d", (int)curTcpFrag->dataLength, sideIndex);
+					}
+
+					// delete fragment from list
+					tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.erase(tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.begin() + index);
+
+					continue;
+				}
+
+				//if got to here it means the fragment has higher sequence than current sequence, increment index and continue
+				index++;
+			}
+
+			// if managed to find new segment, do the search all over again
+		} while (foundSomething);
+
+
+		// if got here it means we're left only with fragments that have higher sequence than current sequence. This means out-of-order packets or
+		// missing data. If we don't want to clear the frag list yet, assume it's out-of-order and return
+		if (!cleanWholeFragList)
+			return;
+
+		LOG_DEBUG("Starting second  iteration of checkOutOfOrderFragments - handle missing data");
+
+		// second fragment list iteration - now we're left only with fragments that have higher sequence than current sequence. This means missing data.
+		// Search for the fragment with the closest sequence to the current one
+
+		uint32_t closestSequence = 0xffffffff;
+		int closestSequenceFragIndex = -1;
+		index = 0;
+
+		while (index < (int)tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.size())
+		{
+			// extract segment at current index
+			TcpFragment* curTcpFrag = tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.at(index);
+
+			// check if its sequence is closer than current closest sequence
+			if (curTcpFrag->sequence < closestSequence)
+			{
+				closestSequence = curTcpFrag->sequence;
+				closestSequenceFragIndex = index;
+			}
+
+			index++;
+		}
+
+		// this means fragment list is not empty at this stage
+		if (closestSequenceFragIndex > -1)
+		{
+			// get the fragment with the closest sequence
+			TcpFragment* curTcpFrag = tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.at(closestSequenceFragIndex);
+
+			// calculate number of missing bytes
+			uint32_t missingDataLen = curTcpFrag->sequence - tcpReassemblyData->twoSides[sideIndex].sequence;
+
+			// update sequence
+			tcpReassemblyData->twoSides[sideIndex].sequence = curTcpFrag->sequence + curTcpFrag->dataLength;
+			if (curTcpFrag->data != NULL)
+			{
+				// send new data to callback
+				if (m_OnMessageReadyCallback != NULL)
+				{
+					// prepare missing data text
+					std::string missingDataTextStr = prepareMissingDataMessage(missingDataLen);
+
+					// add missing data text to the data that will be sent to the callback. This means that the data will look something like:
+					// "[xx bytes missing]<original_data>"
+					size_t dataWithMissingDataTextLen = missingDataTextStr.length() + curTcpFrag->dataLength;
+					uint8_t* dataWithMissingDataText = new uint8_t[dataWithMissingDataTextLen];
+					memcpy(dataWithMissingDataText, missingDataTextStr.c_str(), missingDataTextStr.length());
+					memcpy(dataWithMissingDataText + missingDataTextStr.length(), curTcpFrag->data, curTcpFrag->dataLength);
+
+					//TcpStreamData streamData(curTcpFrag->data, curTcpFrag->dataLength, tcpReassemblyData->connData);
+					//streamData.setDeleteDataOnDestruction(false);
+					TcpStreamData streamData(dataWithMissingDataText, dataWithMissingDataTextLen, tcpReassemblyData->connData);
+					m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+
+					LOG_DEBUG("Found missing data on side %d: %d byte are missing. Sending the closest fragment which is in size %d + missing text message which size is %d",
+						sideIndex, missingDataLen, (int)curTcpFrag->dataLength, (int)missingDataTextStr.length());
+				}
+			}
+
+			// remove fragment from list
+			tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.erase(tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.begin() + closestSequenceFragIndex);
+
+			LOG_DEBUG("Calling checkOutOfOrderFragments again from the start");
+
+			// call the method again from the start to do the whole search again (both iterations). 
+			// the stop condition is when the list is empty (so closestSequenceFragIndex == -1)
+			foundSomething = true;
+		}
+
+	} while (foundSomething);
+}
+
+void TcpReassembly::closeConnection(uint32_t flowKey)
+{
+	closeConnectionInternal(flowKey, TcpReassembly::TcpReassemblyConnectionClosedManually);
+}
+
+void TcpReassembly::closeConnectionInternal(uint32_t flowKey, ConnectionEndReason reason)
+{
+	TcpReassemblyData* tcpReassemblyData = NULL;
+	std::map<uint32_t, TcpReassemblyData*>::iterator iter = m_ConnectionList.find(flowKey);
+	if (iter == m_ConnectionList.end())
+	{
+		LOG_ERROR("Cannot close flow with key 0x%X: cannot find flow", flowKey);
+		return;
+	}
+
+	LOG_DEBUG("Closing connection with flow key 0x%X", flowKey);
+
+	tcpReassemblyData = iter->second;
+
+	LOG_DEBUG("Calling checkOutOfOrderFragments on side 0");
+	checkOutOfOrderFragments(tcpReassemblyData, 0, true);
+
+	LOG_DEBUG("Calling checkOutOfOrderFragments on side 1");
+	checkOutOfOrderFragments(tcpReassemblyData, 1, true);
+
+	if (m_OnConnEnd != NULL)
+		m_OnConnEnd(tcpReassemblyData->connData, reason, m_UserCookie);
+
+	delete tcpReassemblyData;
+	m_ConnectionList.erase(iter);
+	m_ClosedConnectionList[flowKey] = true;
+
+	LOG_DEBUG("Connection with flow key 0x%X is closed", flowKey);
+}
+
+void TcpReassembly::closeAllConnections()
+{
+	LOG_DEBUG("Closing all flows");
+
+	while (!m_ConnectionList.empty())
+	{
+		TcpReassemblyData* tcpReassemblyData = m_ConnectionList.begin()->second;
+
+		uint32_t flowKey = tcpReassemblyData->connData.flowKey;
+		LOG_DEBUG("Closing connection with flow key 0x%X", flowKey);
+
+		LOG_DEBUG("Calling checkOutOfOrderFragments on side 0");
+		checkOutOfOrderFragments(tcpReassemblyData, 0, true);
+
+		LOG_DEBUG("Calling checkOutOfOrderFragments on side 1");
+		checkOutOfOrderFragments(tcpReassemblyData, 1, true);
+
+		if (m_OnConnEnd != NULL)
+			m_OnConnEnd(tcpReassemblyData->connData, TcpReassemblyConnectionClosedManually, m_UserCookie);
+
+		delete tcpReassemblyData;
+		m_ConnectionList.erase(m_ConnectionList.begin());
+		m_ClosedConnectionList[flowKey] = true;
+
+		LOG_DEBUG("Connection with flow key 0x%X is closed", flowKey);
+	}
+}
+
+const std::vector<ConnectionData>& TcpReassembly::getConnectionInformation() const
+{
+	return m_ConnectionInfo;
+}
+
+int TcpReassembly::isConnectionOpen(const ConnectionData& connection)
+{
+	if (m_ConnectionList.find(connection.flowKey) != m_ConnectionList.end())
+		return 1;
+
+	if (m_ClosedConnectionList.find(connection.flowKey) != m_ClosedConnectionList.end())
+		return 0;
+
+	return -1;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp b/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp
new file mode 100644
index 0000000..b1b6ff7
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp
@@ -0,0 +1,651 @@
+#include "TextBasedProtocol.h"
+#include "Logger.h"
+#include "PayloadLayer.h"
+#include <string.h>
+#include <algorithm>
+#include <stdlib.h>
+
+namespace pcpp
+{
+
+// this implementation of strnlen is required since mingw doesn't have strnlen
+size_t tbp_my_own_strnlen(const char *s, size_t n)
+{
+	const char *p = s;
+	/* We don't check here for NULL pointers.  */
+	for (;*p != 0 && n > 0; p++, n--)
+		;
+	return (size_t) (p - s);
+}
+
+
+// -------- Class TextBasedProtocolMessage -----------------
+
+
+TextBasedProtocolMessage::TextBasedProtocolMessage(uint8_t* data, size_t dataLen, Layer* prevLayer, Packet* packet) : Layer(data, dataLen, prevLayer, packet),
+						m_FieldList(NULL), m_LastField(NULL), m_FieldsOffset(0) {}
+
+TextBasedProtocolMessage::TextBasedProtocolMessage(const TextBasedProtocolMessage& other) : Layer(other)
+{
+	copyDataFrom(other);
+
+}
+
+TextBasedProtocolMessage& TextBasedProtocolMessage::operator=(const TextBasedProtocolMessage& other)
+{
+	Layer::operator=(other);
+	HeaderField* curField = m_FieldList;
+	while (curField != NULL)
+	{
+		HeaderField* temp = curField;
+		curField = curField->getNextField();
+		delete temp;
+	}
+
+	copyDataFrom(other);
+
+	return *this;
+}
+
+void TextBasedProtocolMessage::copyDataFrom(const TextBasedProtocolMessage& other)
+{
+	// copy field list
+	if (other.m_FieldList != NULL)
+	{
+		m_FieldList = new HeaderField(*(other.m_FieldList));
+		HeaderField* curField = m_FieldList;
+		curField->attachToTextBasedProtocolMessage(this, other.m_FieldList->m_NameOffsetInMessage);
+		HeaderField* curOtherField = other.m_FieldList;
+		while (curOtherField->getNextField() != NULL)
+		{
+			HeaderField* newField = new HeaderField(*(curOtherField->getNextField()));
+			newField->attachToTextBasedProtocolMessage(this, curOtherField->getNextField()->m_NameOffsetInMessage);
+			curField->setNextField(newField);
+			curField = curField->getNextField();
+			curOtherField = curOtherField->getNextField();
+		}
+
+		m_LastField = curField;
+	}
+	else
+	{
+		m_FieldList = NULL;
+		m_LastField = NULL;
+	}
+
+	m_FieldsOffset = other.m_FieldsOffset;
+
+	// copy map
+	for(HeaderField* field = m_FieldList; field != NULL; field = field->getNextField())
+	{
+		m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(field->getFieldName(), field));
+	}
+
+}
+
+
+void TextBasedProtocolMessage::parseFields()
+{
+	char nameValueSeperator = getHeaderFieldNameValueSeparator();
+	bool spacesAllowedBetweenNameAndValue = spacesAllowedBetweenHeaderFieldNameAndValue();
+
+	HeaderField* firstField = new HeaderField(this, m_FieldsOffset, nameValueSeperator, spacesAllowedBetweenNameAndValue);
+	LOG_DEBUG("Added new field: name='%s'; offset in packet=%d; length=%d", firstField->getFieldName().c_str(), firstField->m_NameOffsetInMessage, (int)firstField->getFieldSize());
+	LOG_DEBUG("     Field value = %s", firstField->getFieldValue().c_str());
+
+	if (m_FieldList == NULL)
+		m_FieldList = firstField;
+	else
+		m_FieldList->setNextField(firstField);
+
+	std::string fieldName = firstField->getFieldName();
+	std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+	m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(fieldName, firstField));
+
+	// Last field will be empty and contain just "\n" or "\r\n". This field will mark the end of the header
+	HeaderField* curField = m_FieldList;
+	int curOffset = m_FieldsOffset;
+	// last field can be one of:
+	// a.) \r\n\r\n or \n\n marking the end of the header
+	// b.) the end of the packet
+	while (!curField->isEndOfHeader() && curOffset + curField->getFieldSize() < m_DataLen)
+	{
+		curOffset += curField->getFieldSize();
+		HeaderField* newField = new HeaderField(this, curOffset, nameValueSeperator, spacesAllowedBetweenNameAndValue);
+		LOG_DEBUG("Added new field: name='%s'; offset in packet=%d; length=%d", newField->getFieldName().c_str(), newField->m_NameOffsetInMessage, (int)newField->getFieldSize());
+		LOG_DEBUG("     Field value = %s", newField->getFieldValue().c_str());
+		curField->setNextField(newField);
+		curField = curField->getNextField();
+		fieldName = newField->getFieldName();
+		std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+		m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(fieldName, newField));
+	}
+
+	m_LastField = curField;
+}
+
+
+TextBasedProtocolMessage::~TextBasedProtocolMessage()
+{
+	while (m_FieldList != NULL)
+	{
+		HeaderField* temp = m_FieldList;
+		m_FieldList = m_FieldList->getNextField();
+		delete temp;
+	}
+}
+
+
+HeaderField* TextBasedProtocolMessage::addField(const std::string& fieldName, const std::string& fieldValue)
+{
+	HeaderField newField(fieldName, fieldValue, getHeaderFieldNameValueSeparator(), spacesAllowedBetweenHeaderFieldNameAndValue());
+	return addField(newField);
+}
+
+HeaderField* TextBasedProtocolMessage::addField(const HeaderField& newField)
+{
+	return insertField(m_LastField, newField);
+}
+
+HeaderField* TextBasedProtocolMessage::addEndOfHeader()
+{
+	HeaderField endOfHeaderField(PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER, "", '\0', false);
+	return insertField(m_LastField, endOfHeaderField);
+}
+
+
+HeaderField* TextBasedProtocolMessage::insertField(HeaderField* prevField, const std::string& fieldName, const std::string& fieldValue)
+{
+	HeaderField newField(fieldName, fieldValue, getHeaderFieldNameValueSeparator(), spacesAllowedBetweenHeaderFieldNameAndValue());
+	return insertField(prevField, newField);
+}
+
+HeaderField* TextBasedProtocolMessage::insertField(std::string prevFieldName, const std::string& fieldName, const std::string& fieldValue)
+{
+	if (prevFieldName == "")
+	{
+		return insertField(NULL, fieldName, fieldValue);
+	}
+	else
+	{
+		HeaderField* prevField = getFieldByName(prevFieldName);
+		if (prevField == NULL)
+			return NULL;
+
+		return insertField(prevField, fieldName, fieldValue);
+	}
+}
+
+
+HeaderField* TextBasedProtocolMessage::insertField(HeaderField* prevField, const HeaderField& newField)
+{
+	if (newField.m_TextBasedProtocolMessage != NULL)
+	{
+		LOG_ERROR("This field is already associated with another message");
+		return NULL;
+	}
+
+	if (prevField != NULL && prevField->getFieldName() == PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+	{
+		LOG_ERROR("Cannot add a field after end of header");
+		return NULL;
+	}
+
+	HeaderField* newFieldToAdd = new HeaderField(newField);
+
+	int newFieldOffset = m_FieldsOffset;
+	if (prevField != NULL)
+		newFieldOffset = prevField->m_NameOffsetInMessage + prevField->getFieldSize();
+
+	// extend layer to make room for the new field. Field will be added just before the last field
+	extendLayer(newFieldOffset, newFieldToAdd->getFieldSize());
+
+	HeaderField* curField = m_FieldList;
+	if (prevField != NULL)
+		curField = prevField->getNextField();
+
+	// go over all fields after prevField and update their offsets
+	shiftFieldsOffset(curField, newFieldToAdd->getFieldSize());
+
+	// copy new field data to message
+	memcpy(m_Data + newFieldOffset, newFieldToAdd->m_NewFieldData, newFieldToAdd->getFieldSize());
+
+	// attach new field to message
+	newFieldToAdd->attachToTextBasedProtocolMessage(this, newFieldOffset);
+
+	// insert field into fields link list
+	if (prevField == NULL)
+	{
+		newFieldToAdd->setNextField(m_FieldList);
+		m_FieldList = newFieldToAdd;
+	}
+	else
+	{
+		newFieldToAdd->setNextField(prevField->getNextField());
+		prevField->setNextField(newFieldToAdd);
+	}
+
+	// if newField is the last field, update m_LastField
+	if (newFieldToAdd->getNextField() == NULL)
+		m_LastField = newFieldToAdd;
+
+	// insert the new field into name to field map
+	std::string fieldName = newFieldToAdd->getFieldName();
+	std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+	m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(fieldName, newFieldToAdd));
+
+	return newFieldToAdd;
+}
+
+bool TextBasedProtocolMessage::removeField(std::string fieldName, int index)
+{
+	std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+
+	HeaderField* fieldToRemove = NULL;
+
+	std::pair <std::multimap<std::string,HeaderField*>::iterator, std::multimap<std::string,HeaderField*>::iterator> range;
+	range = m_FieldNameToFieldMap.equal_range(fieldName);
+	int i = 0;
+    for (std::multimap<std::string,HeaderField*>::iterator iter = range.first; iter != range.second; ++iter)
+    {
+    	if (i == index)
+    	{
+    		fieldToRemove = iter->second;
+    		break;
+    	}
+
+    	i++;
+    }
+
+	if (fieldToRemove != NULL)
+		return removeField(fieldToRemove);
+	else
+	{
+		LOG_ERROR("Cannot find field '%s'", fieldName.c_str());
+		return false;
+	}
+}
+
+bool TextBasedProtocolMessage::removeField(HeaderField* fieldToRemove)
+{
+	if (fieldToRemove == NULL)
+		return true;
+
+	if (fieldToRemove->m_TextBasedProtocolMessage != this)
+	{
+		LOG_ERROR("Field isn't associated with this message");
+		return false;
+	}
+
+	std::string fieldName = fieldToRemove->getFieldName();
+
+	// shorten layer and delete this field
+	if (!shortenLayer(fieldToRemove->m_NameOffsetInMessage, fieldToRemove->getFieldSize()))
+	{
+		LOG_ERROR("Cannot shorten layer");
+		return false;
+	}
+
+	// update offsets of all fields after this field
+	HeaderField* curField = fieldToRemove->getNextField();
+	shiftFieldsOffset(curField, 0-fieldToRemove->getFieldSize());
+//	while (curField != NULL)
+//	{
+//		curField->m_NameOffsetInMessage -= fieldToRemove->getFieldSize();
+//		if (curField->m_ValueOffsetInMessage != -1)
+//			curField->m_ValueOffsetInMessage -= fieldToRemove->getFieldSize();
+//
+//		curField = curField->getNextField();
+//	}
+
+	// update fields link list
+	if (fieldToRemove == m_FieldList)
+		m_FieldList = m_FieldList->getNextField();
+	else
+	{
+		curField = m_FieldList;
+		while (curField->getNextField() != fieldToRemove)
+			curField = curField->getNextField();
+
+		curField->setNextField(fieldToRemove->getNextField());
+	}
+
+	// re-calculate m_LastField if needed
+	if (fieldToRemove == m_LastField)
+	{
+		if (m_FieldList == NULL)
+			m_LastField = NULL;
+		else
+		{
+			curField = m_FieldList;
+			while (curField->getNextField() != NULL)
+				curField = curField->getNextField();
+			m_LastField = curField;
+		}
+	}
+
+	// remove the hash entry for this field
+	std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+	std::pair <std::multimap<std::string,HeaderField*>::iterator, std::multimap<std::string,HeaderField*>::iterator> range;
+	range = m_FieldNameToFieldMap.equal_range(fieldName);
+    for (std::multimap<std::string,HeaderField*>::iterator iter = range.first; iter != range.second; ++iter)
+    {
+    	if (iter->second == fieldToRemove)
+    	{
+    		m_FieldNameToFieldMap.erase(iter);
+    		break;
+    	}
+    }
+
+	// finally - delete this field
+	delete fieldToRemove;
+
+	return true;
+}
+
+bool TextBasedProtocolMessage::isHeaderComplete()
+{
+	if (m_LastField == NULL)
+		return false;
+
+	return (m_LastField->getFieldName() == PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER);
+}
+
+void TextBasedProtocolMessage::shiftFieldsOffset(HeaderField* fromField, int numOfBytesToShift)
+{
+	while (fromField != NULL)
+	{
+		fromField->m_NameOffsetInMessage += numOfBytesToShift;
+		if (fromField->m_ValueOffsetInMessage != -1)
+			fromField->m_ValueOffsetInMessage += numOfBytesToShift;
+		fromField = fromField->getNextField();
+	}
+}
+
+HeaderField* TextBasedProtocolMessage::getFieldByName(std::string fieldName, int index)
+{
+	std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+
+	std::pair <std::multimap<std::string,HeaderField*>::iterator, std::multimap<std::string,HeaderField*>::iterator> range;
+	range = m_FieldNameToFieldMap.equal_range(fieldName);
+	int i = 0;
+    for (std::multimap<std::string,HeaderField*>::iterator iter = range.first; iter != range.second; ++iter)
+    {
+    	if (i == index)
+    		return iter->second;
+
+    	i++;
+    }
+
+    return NULL;
+}
+
+int TextBasedProtocolMessage::getFieldCount()
+{
+	int result = 0;
+
+	HeaderField* curField = getFirstField();
+	while (curField != NULL)
+	{
+		if (!curField->isEndOfHeader())
+			result++;
+		curField = curField->getNextField();
+	}
+
+	return result;
+}
+
+void TextBasedProtocolMessage::parseNextLayer()
+{
+	size_t headerLen = getHeaderLen();
+	if (m_DataLen <= headerLen)
+		return;
+
+	m_NextLayer = new PayloadLayer(m_Data + headerLen, m_DataLen - headerLen, this, m_Packet);
+}
+
+size_t TextBasedProtocolMessage::getHeaderLen()
+{
+	return m_LastField->m_NameOffsetInMessage + m_LastField->m_FieldSize;
+}
+
+void TextBasedProtocolMessage::computeCalculateFields()
+{
+	//nothing to do for now
+}
+
+
+
+
+
+// -------- Class HeaderField -----------------
+
+
+HeaderField::HeaderField(TextBasedProtocolMessage* TextBasedProtocolMessage, int offsetInMessage, char nameValueSeperator, bool spacesAllowedBetweenNameAndValue) :
+		m_NewFieldData(NULL), m_TextBasedProtocolMessage(TextBasedProtocolMessage), m_NameOffsetInMessage(offsetInMessage), m_NextField(NULL),
+		m_NameValueSeperator(nameValueSeperator), m_SpacesAllowedBetweenNameAndValue(spacesAllowedBetweenNameAndValue)
+{
+	char* fieldData = (char*)(m_TextBasedProtocolMessage->m_Data + m_NameOffsetInMessage);
+	//char* fieldEndPtr = strchr(fieldData, '\n');
+	char* fieldEndPtr = (char *)memchr(fieldData, '\n',m_TextBasedProtocolMessage->m_DataLen-(size_t)m_NameOffsetInMessage);
+	if (fieldEndPtr == NULL)
+		m_FieldSize = tbp_my_own_strnlen(fieldData, m_TextBasedProtocolMessage->m_DataLen-(size_t)m_NameOffsetInMessage);
+	else
+		m_FieldSize = fieldEndPtr - fieldData + 1;
+
+	if ((*fieldData) == '\r' || (*fieldData) == '\n')
+	{
+		m_FieldNameSize = -1;
+		m_ValueOffsetInMessage = -1;
+		m_FieldValueSize = -1;
+		m_FieldNameSize = -1;
+		m_IsEndOfHeaderField = true;
+		return;
+	}
+	else
+		m_IsEndOfHeaderField = false;
+
+//	char* fieldValuePtr = strchr(fieldData, ':');
+	char* fieldValuePtr = (char *)memchr(fieldData, nameValueSeperator, m_TextBasedProtocolMessage->m_DataLen-(size_t)m_NameOffsetInMessage);
+	// could not find the position of the separator, meaning field value position is unknown
+	if (fieldValuePtr == NULL)
+	{
+		m_ValueOffsetInMessage = -1;
+		m_FieldValueSize = -1;
+		m_FieldNameSize = m_FieldSize;
+	}
+	else
+	{
+		m_FieldNameSize = fieldValuePtr - fieldData;
+		// Header field looks like this: <field_name>[separator]<zero or more spaces><field_Value>
+		// So fieldValuePtr give us the position of the separator. Value offset is the first non-space byte forward
+		fieldValuePtr++;
+
+		if (spacesAllowedBetweenNameAndValue)
+		{
+			// advance fieldValuePtr 1 byte forward while didn't get to end of packet and fieldValuePtr points to a space char
+			while ((size_t)(fieldValuePtr - (char*)m_TextBasedProtocolMessage->m_Data) <= m_TextBasedProtocolMessage->getDataLen() && (*fieldValuePtr) == ' ')
+				fieldValuePtr++;
+		}
+
+		// reached the end of the packet and value start offset wasn't found
+		if ((size_t)(fieldValuePtr - (char*)(m_TextBasedProtocolMessage->m_Data)) > m_TextBasedProtocolMessage->getDataLen())
+		{
+			m_ValueOffsetInMessage = -1;
+			m_FieldValueSize = -1;
+		}
+		else
+		{
+			m_ValueOffsetInMessage = fieldValuePtr - (char*)m_TextBasedProtocolMessage->m_Data;
+			// couldn't find the end of the field, so assuming the field value length is from m_ValueOffsetInMessage until the end of the packet
+			if (fieldEndPtr == NULL)
+				m_FieldValueSize = (char*)(m_TextBasedProtocolMessage->m_Data + m_TextBasedProtocolMessage->getDataLen()) - fieldValuePtr;
+			else
+			{
+				m_FieldValueSize = fieldEndPtr - fieldValuePtr;
+				// if field ends with \r\n, decrease the value length by 1
+				if ((*(--fieldEndPtr)) == '\r')
+					m_FieldValueSize--;
+			}
+		}
+	}
+}
+
+HeaderField::HeaderField(std::string name, std::string value, char nameValueSeperator, bool spacesAllowedBetweenNameAndValue)
+{
+	m_NameValueSeperator = nameValueSeperator;
+	m_SpacesAllowedBetweenNameAndValue = spacesAllowedBetweenNameAndValue;
+	initNewField(name, value);
+}
+
+void HeaderField::initNewField(std::string name, std::string value)
+{
+	m_TextBasedProtocolMessage = NULL;
+	m_NameOffsetInMessage = 0;
+	m_NextField = NULL;
+
+	// first building the name-value separator
+	std::string nameValueSeparation(1, m_NameValueSeperator);
+	if (m_SpacesAllowedBetweenNameAndValue)
+		nameValueSeparation += " ";
+
+	// Field size is: name_length + separator_len + value_length + '\r\n'
+	if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+		m_FieldSize = name.length() + nameValueSeparation.length() + value.length() + 2;
+	else
+	// Field is \r\n (2B)
+		m_FieldSize = 2;
+
+	m_NewFieldData = new uint8_t[m_FieldSize];
+	std::string fieldData;
+
+	if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+		fieldData = name + nameValueSeparation + value + "\r\n";
+	else
+		fieldData = "\r\n";
+
+	// copy field data to m_NewFieldData
+	memcpy(m_NewFieldData, fieldData.c_str(), m_FieldSize);
+
+	// calculate value offset
+	if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+		m_ValueOffsetInMessage = name.length() + nameValueSeparation.length();
+	else
+		m_ValueOffsetInMessage = 0;
+	m_FieldNameSize = name.length();
+	m_FieldValueSize = value.length();
+
+	if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+		m_IsEndOfHeaderField = false;
+	else
+		m_IsEndOfHeaderField = true;
+}
+
+HeaderField::~HeaderField()
+{
+	if (m_NewFieldData != NULL)
+		delete [] m_NewFieldData;
+}
+
+HeaderField::HeaderField(const HeaderField& other) : m_NameValueSeperator('\0'), m_SpacesAllowedBetweenNameAndValue(false)
+{
+	m_NameValueSeperator = other.m_NameValueSeperator;
+	m_SpacesAllowedBetweenNameAndValue = other.m_SpacesAllowedBetweenNameAndValue;
+	initNewField(other.getFieldName(), other.getFieldValue());
+}
+
+char* HeaderField::getData()
+{
+	if (m_TextBasedProtocolMessage == NULL)
+		return (char*)m_NewFieldData;
+	else
+		return (char*)(m_TextBasedProtocolMessage->m_Data);
+}
+
+std::string HeaderField::getFieldName() const
+{
+	std::string result;
+
+	if (m_FieldNameSize != (size_t)-1)
+		result.assign((const char*)(((HeaderField*)this)->getData() + m_NameOffsetInMessage), m_FieldNameSize);
+
+	return result;
+}
+
+std::string HeaderField::getFieldValue() const
+{
+	std::string result;
+	if (m_ValueOffsetInMessage != -1)
+		result.assign((const char*)(((HeaderField*)this)->getData() + m_ValueOffsetInMessage), m_FieldValueSize);
+	return result;
+}
+
+bool HeaderField::setFieldValue(std::string newValue)
+{
+	// Field isn't linked with any message yet
+	if (m_TextBasedProtocolMessage == NULL)
+	{
+		std::string name = getFieldName();
+		delete [] m_NewFieldData;
+		initNewField(name, newValue);
+		return true;
+	}
+
+	std::string curValue = getFieldValue();
+	int lengthDifference = newValue.length() - curValue.length();
+	// new value is longer than current value
+	if (lengthDifference > 0)
+	{
+		if (!m_TextBasedProtocolMessage->extendLayer(m_ValueOffsetInMessage, lengthDifference))
+		{
+			LOG_ERROR("Could not extend layer");
+			return false;
+		}
+	}
+	// new value is shorter than current value
+	else if (lengthDifference < 0)
+	{
+		if (!m_TextBasedProtocolMessage->shortenLayer(m_ValueOffsetInMessage, 0-lengthDifference))
+		{
+			LOG_ERROR("Could not shorten layer");
+			return false;
+		}
+	}
+
+	if (lengthDifference != 0)
+		m_TextBasedProtocolMessage->shiftFieldsOffset(getNextField(), lengthDifference);
+
+	// update sizes
+	m_FieldValueSize += lengthDifference;
+	m_FieldSize += lengthDifference;
+
+	// write new value to field data
+	memcpy(getData() + m_ValueOffsetInMessage, newValue.c_str(), newValue.length());
+
+	return true;
+}
+
+void HeaderField::attachToTextBasedProtocolMessage(TextBasedProtocolMessage* message, int fieldOffsetInMessage)
+{
+	if (m_TextBasedProtocolMessage != NULL && m_TextBasedProtocolMessage != message)
+	{
+		LOG_ERROR("Header field already associated with another message");
+		return;
+	}
+
+	if (m_NewFieldData == NULL)
+	{
+		LOG_ERROR("Header field doesn't have new field data");
+		return;
+	}
+
+	delete [] m_NewFieldData;
+	m_NewFieldData = NULL;
+	m_TextBasedProtocolMessage = message;
+
+	int valueAndNameDifference = m_ValueOffsetInMessage - m_NameOffsetInMessage;
+	m_NameOffsetInMessage = fieldOffsetInMessage;
+	m_ValueOffsetInMessage = m_NameOffsetInMessage + valueAndNameDifference;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/UdpLayer.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/UdpLayer.cpp b/thirdparty/pcap++/Packet++/src/UdpLayer.cpp
new file mode 100644
index 0000000..1451421
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/UdpLayer.cpp
@@ -0,0 +1,122 @@
+#define LOG_MODULE PacketLogModuleUdpLayer
+
+#include <UdpLayer.h>
+#include <IpUtils.h>
+#include <PayloadLayer.h>
+#include <IPv4Layer.h>
+#include <IPv6Layer.h>
+#include <DnsLayer.h>
+#include <DhcpLayer.h>
+#include <VxlanLayer.h>
+#include <SipLayer.h>
+#include <Logger.h>
+#include <string.h>
+#include <sstream>
+
+namespace pcpp
+{
+
+UdpLayer::UdpLayer(uint16_t portSrc, uint16_t portDst)
+{
+	m_DataLen = sizeof(udphdr);
+	m_Data = new uint8_t[m_DataLen];
+	memset(m_Data, 0, m_DataLen);
+	udphdr* udpHdr = (udphdr*)m_Data;
+	udpHdr->portDst = htons(portDst);
+	udpHdr->portSrc = htons(portSrc);
+	m_Protocol = UDP;
+}
+
+uint16_t UdpLayer::calculateChecksum(bool writeResultToPacket)
+{
+	udphdr* udpHdr = (udphdr*)m_Data;
+	uint16_t checksumRes = 0;
+	uint16_t currChecksumValue = udpHdr->headerChecksum;
+
+	if (m_PrevLayer != NULL)
+	{
+		udpHdr->headerChecksum = 0;
+		ScalarBuffer<uint16_t> vec[2];
+		LOG_DEBUG("data len =  %d", (int)m_DataLen);
+		vec[0].buffer = (uint16_t*)m_Data;
+		vec[0].len = m_DataLen;
+
+		if (m_PrevLayer->getProtocol() == IPv4)
+		{
+			uint32_t srcIP = ((IPv4Layer*)m_PrevLayer)->getSrcIpAddress().toInt();
+			uint32_t dstIP = ((IPv4Layer*)m_PrevLayer)->getDstIpAddress().toInt();
+			uint16_t pseudoHeader[6];
+			pseudoHeader[0] = srcIP >> 16;
+			pseudoHeader[1] = srcIP & 0xFFFF;
+			pseudoHeader[2] = dstIP >> 16;
+			pseudoHeader[3] = dstIP & 0xFFFF;
+			pseudoHeader[4] = 0xffff & udpHdr->length;
+			pseudoHeader[5] = htons(0x00ff & PACKETPP_IPPROTO_UDP);
+			vec[1].buffer = pseudoHeader;
+			vec[1].len = 12;
+			checksumRes = compute_checksum(vec, 2);
+			LOG_DEBUG("calculated checksum = 0x%4X", checksumRes);
+		}
+		else if (m_PrevLayer->getProtocol() == IPv6)
+		{
+			uint16_t pseudoHeader[18];
+			((IPv6Layer*)m_PrevLayer)->getSrcIpAddress().copyTo((uint8_t*)pseudoHeader);
+			((IPv6Layer*)m_PrevLayer)->getDstIpAddress().copyTo((uint8_t*)(pseudoHeader+8));
+			pseudoHeader[16] = 0xffff & udpHdr->length;
+			pseudoHeader[17] = htons(0x00ff & PACKETPP_IPPROTO_UDP);
+			vec[1].buffer = pseudoHeader;
+			vec[1].len = 36;
+			checksumRes = compute_checksum(vec, 2);
+			LOG_DEBUG("calculated checksum = 0x%4X", checksumRes);
+		}
+	}
+
+	if(writeResultToPacket)
+		udpHdr->headerChecksum = htons(checksumRes);
+	else
+		udpHdr->headerChecksum = currChecksumValue;
+
+	return checksumRes;
+}
+
+void UdpLayer::parseNextLayer()
+{
+	if (m_DataLen <= sizeof(udphdr))
+		return;
+
+	udphdr* udpHder = getUdpHeader();
+	uint16_t portDst = ntohs(udpHder->portDst);
+	uint16_t portSrc = ntohs(udpHder->portSrc);
+
+	if ((portSrc == 68 && portDst == 67) || (portSrc == 67 && portDst == 68) || (portSrc == 67 && portDst == 67))
+		m_NextLayer = new DhcpLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+	else if (portDst == 4789)
+		m_NextLayer = new VxlanLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+	else if ((m_DataLen - sizeof(udphdr) >= sizeof(dnshdr)) && (DnsLayer::getDNSPortMap()->find(portDst) != DnsLayer::getDNSPortMap()->end() || DnsLayer::getDNSPortMap()->find(portSrc) != DnsLayer::getDNSPortMap()->end()))
+		m_NextLayer = new DnsLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+	else if (((portDst == 5060) || (portDst == 5061)) && (SipRequestFirstLine::parseMethod((char*)(m_Data + sizeof(udphdr)), m_DataLen - sizeof(udphdr)) != SipRequestLayer::SipMethodUnknown))
+		m_NextLayer = new SipRequestLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+	else if (((portDst == 5060) || (portDst == 5061)) && (SipResponseFirstLine::parseStatusCode((char*)(m_Data + sizeof(udphdr)), m_DataLen - sizeof(udphdr)) != SipResponseLayer::SipStatusCodeUnknown))
+		m_NextLayer = new SipResponseLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+	else
+		m_NextLayer = new PayloadLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+}
+
+void UdpLayer::computeCalculateFields()
+{
+	udphdr* udpHdr = (udphdr*)m_Data;
+	udpHdr->length = htons(m_DataLen);
+	calculateChecksum(true);
+}
+
+std::string UdpLayer::toString()
+{
+	std::ostringstream srcPortStream;
+	srcPortStream << ntohs(getUdpHeader()->portSrc);
+	std::ostringstream dstPortStream;
+	dstPortStream << ntohs(getUdpHeader()->portDst);
+
+	return "UDP Layer, Src port: " + srcPortStream.str() + ", Dst port: " + dstPortStream.str();
+}
+
+} // namespace pcpp

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/VlanLayer.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/VlanLayer.cpp b/thirdparty/pcap++/Packet++/src/VlanLayer.cpp
new file mode 100644
index 0000000..8617b6c
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/VlanLayer.cpp
@@ -0,0 +1,81 @@
+#define LOG_MODULE PacketLogModuleVlanLayer
+
+#include <VlanLayer.h>
+#include <IPv4Layer.h>
+#include <IPv6Layer.h>
+#include <PayloadLayer.h>
+#include <ArpLayer.h>
+#include <PPPoELayer.h>
+#include <MplsLayer.h>
+#include <string.h>
+#include <sstream>
+#if defined(WIN32) || defined(WINx64)
+#include <winsock2.h>
+#elif LINUX
+#include <in.h>
+#endif
+
+namespace pcpp
+{
+
+VlanLayer::VlanLayer(const uint16_t vlanID, bool cfi, uint8_t priority, uint16_t etherType)
+{
+	m_DataLen = sizeof(vlan_header);
+	m_Data = new uint8_t[m_DataLen];
+	memset(m_Data, 0, m_DataLen);
+	m_Protocol = VLAN;
+
+	vlan_header* vlanHeader = getVlanHeader();
+	setVlanID(vlanID);
+	setCFI(cfi);
+	setPriority(priority);
+	vlanHeader->etherType = htons(etherType);
+}
+
+void VlanLayer::parseNextLayer()
+{
+	if (m_DataLen <= sizeof(vlan_header))
+		return;
+
+	vlan_header* hdr = getVlanHeader();
+	switch (ntohs(hdr->etherType))
+	{
+	case PCPP_ETHERTYPE_IP:
+		m_NextLayer = new IPv4Layer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+		break;
+	case PCPP_ETHERTYPE_IPV6:
+		m_NextLayer = new IPv6Layer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+		break;
+	case PCPP_ETHERTYPE_ARP:
+		m_NextLayer = new ArpLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+		break;
+	case PCPP_ETHERTYPE_VLAN:
+		m_NextLayer = new VlanLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+		break;
+	case PCPP_ETHERTYPE_PPPOES:
+		m_NextLayer = new PPPoESessionLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+		break;
+	case PCPP_ETHERTYPE_PPPOED:
+		m_NextLayer = new PPPoEDiscoveryLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+		break;
+	case PCPP_ETHERTYPE_MPLS:
+		m_NextLayer = new MplsLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+		break;
+	default:
+		m_NextLayer = new PayloadLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+	}
+}
+
+std::string VlanLayer::toString()
+{
+	std::ostringstream cfiStream;
+	cfiStream << (int)getCFI();
+	std::ostringstream priStream;
+	priStream << (int)getPriority();
+	std::ostringstream idStream;
+	idStream << getVlanID();
+
+	return "VLAN Layer, Priority: " + priStream.str() + ", Vlan ID: " + idStream.str() + ", CFI: " + cfiStream.str();
+}
+
+} // namespace pcpp

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp b/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp
new file mode 100644
index 0000000..231ccce
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp
@@ -0,0 +1,64 @@
+#include <VxlanLayer.h>
+#include <EthLayer.h>
+#include <string.h>
+#if defined(WIN32) || defined(WINx64) //for using ntohl, ntohs, etc.
+#include <winsock2.h>
+#elif LINUX
+#include <in.h> //for using ntohl, ntohs, etc.
+#elif MAC_OS_X
+#include <arpa/inet.h> //for using ntohl, ntohs, etc.
+#endif
+
+
+namespace pcpp
+{
+
+VxlanLayer::VxlanLayer(uint32_t vni, uint16_t groupPolicyID, bool setGbpFlag, bool setPolicyAppliedFlag, bool setDontLearnFlag)
+{
+	m_DataLen = sizeof(vxlan_header);
+	m_Data = new uint8_t[m_DataLen];
+	memset(m_Data, 0, m_DataLen);
+	m_Protocol = VXLAN;
+
+	if (vni != 0)
+		setVNI(vni);
+
+	vxlan_header* vxlanHeader = getVxlanHeader();
+
+	if (groupPolicyID != 0)
+		vxlanHeader->groupPolicyID = htons(groupPolicyID);
+
+	vxlanHeader->vniPresentFlag = 1;
+
+	if (setGbpFlag)
+		vxlanHeader->gbpFlag = 1;
+	if (setPolicyAppliedFlag)
+		vxlanHeader->policyAppliedFlag = 1;
+	if (setDontLearnFlag)
+		vxlanHeader->dontLearnFlag = 1;
+}
+
+uint32_t VxlanLayer::getVNI()
+{
+	return (ntohl(getVxlanHeader()->vni) >> 8);
+}
+
+void VxlanLayer::setVNI(uint32_t vni)
+{
+	getVxlanHeader()->vni = htonl(vni << 8);
+}
+
+std::string VxlanLayer::toString()
+{
+	return "VXLAN Layer";
+}
+
+void VxlanLayer::parseNextLayer()
+{
+	if (m_DataLen <= sizeof(vxlan_header))
+		return;
+
+	m_NextLayer = new EthLayer(m_Data + sizeof(vxlan_header), m_DataLen - sizeof(vxlan_header), this, m_Packet);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Pcap++/Makefile
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Pcap++/Makefile b/thirdparty/pcap++/Pcap++/Makefile
new file mode 100755
index 0000000..6a1a2d5
--- /dev/null
+++ b/thirdparty/pcap++/Pcap++/Makefile
@@ -0,0 +1,95 @@
+ifeq ($(wildcard ../mk/platform.mk),)
+  $(error platform.mk not found! Please run configure script first)
+endif
+
+include ../mk/platform.mk
+
+ifeq ($(wildcard ../mk/PcapPlusPlus.mk),)
+  $(error PcapPlusPlus.mk not found! Please run configure script first)
+endif
+
+include ../mk/PcapPlusPlus.mk
+
+SOURCES := $(wildcard src/*.cpp)
+OBJS_FILENAMES := $(patsubst src/%.cpp,Obj/%.o,$(SOURCES))
+
+COMMONPP_HOME := ../Common++
+PACKETPP_HOME := ../Packet++
+LIGHT_PCAPNG_PCPP_HOME := ../3rdParty/LightPcapNg
+LIGHT_PCAPNG_HOME := $(LIGHT_PCAPNG_PCPP_HOME)/LightPcapNg
+
+LIGHT_PCAPNG_SOURCES := $(wildcard $(LIGHT_PCAPNG_HOME)/src/*.c)
+LIGHT_PCAPNG_OBJS_FILENAMES := $(patsubst $(LIGHT_PCAPNG_HOME)/src/%.c,$(LIGHT_PCAPNG_PCPP_HOME)/Obj/%.o,$(LIGHT_PCAPNG_SOURCES))
+
+ifdef WIN32
+DEPS := -DWPCAP -DHAVE_REMOTE -DHAVE_STRUCT_TIMESPEC
+endif
+ifdef LINUX
+DEPS := -DLINUX
+endif
+ifdef PF_RING_HOME
+DEPS += -DUSE_PF_RING
+endif
+ifdef USE_DPDK
+DEPS += -DUSE_DPDK
+endif
+ifdef MAC_OS_X
+DEPS := -DMAC_OS_X
+endif
+
+INCLUDES := -I"./src" \
+			-I"./header" \
+			-I"$(COMMONPP_HOME)/header" \
+			-I"$(PACKETPP_HOME)/header" \
+			-I"$(LIGHT_PCAPNG_HOME)/include"
+			
+ifdef WIN32
+INCLUDES += -I$(MINGW_HOME)/include/ddk \
+			-I$(WINPCAP_HOME)/Include
+endif
+ifdef LINUX
+INCLUDES += -I/usr/include/netinet
+endif
+ifdef PF_RING_HOME
+INCLUDES += -I$(PF_RING_HOME)/userland/lib -I$(PF_RING_HOME)/kernel
+endif
+ifdef USE_DPDK
+INCLUDES += -I"$(RTE_SDK)/build/include"
+endif
+
+ifdef HAS_PCAP_IMMEDIATE_MODE
+DEPS += -DHAS_PCAP_IMMEDIATE_MODE
+endif
+
+ifdef USE_DPDK
+FLAGS := -msse -msse2 -msse3 -mssse3
+endif		
+	
+Obj/%.o: src/%.cpp
+	@echo 'Building file: $<'
+	@$(G++) $(DEPS) $(INCLUDES) $(FLAGS) -O2 -g -Wall -c -fmessage-length=0 -MMD -MP -MF"$(@:Obj/%.o=Obj/%.d)" -MT"$(@:Obj/%.o=Obj/%.d)" -o "$@" "$<"
+
+CUR_TARGET := $(notdir $(shell pwd))
+
+.SILENT:
+
+all: Pcap++.lib
+
+start:
+	@echo '==> Building target: $(CUR_TARGET)'
+
+create-directories:
+	@$(MKDIR) -p Obj
+	@$(MKDIR) -p Lib
+
+Pcap++.lib: start create-directories $(OBJS_FILENAMES)
+	@cd $(LIGHT_PCAPNG_PCPP_HOME) && $(MAKE) light_pcapng_sources
+	@$(AR) -r  "Lib/$(LIB_PREFIX)Pcap++$(LIB_EXT)" $(OBJS_FILENAMES) $(LIGHT_PCAPNG_OBJS_FILENAMES)
+	@echo 'Finished successfully building: $(CUR_TARGET)'
+	@echo ' '
+
+clean:
+	@cd $(LIGHT_PCAPNG_PCPP_HOME) && $(MAKE) clean
+	@$(RM) -rf ./Obj/*
+	@$(RM) -rf ./Lib/*
+	@echo 'Clean finished: $(CUR_TARGET)'