You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/11/14 01:48:24 UTC
[10/25] nifi-minifi-cpp git commit: MINIFICPP-250: Initial
implementation fo CapturePacket Processor that uses lipcap.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp b/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp
new file mode 100644
index 0000000..76391c8
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/TcpReassembly.cpp
@@ -0,0 +1,695 @@
+#define LOG_MODULE PacketLogModuleTcpReassembly
+
+#include <TcpReassembly.h>
+#include <TcpLayer.h>
+#include <IPv4Layer.h>
+#include <PacketUtils.h>
+#include <IpAddress.h>
+#include <Logger.h>
+#include <sstream>
+#ifdef WIN32 //for using ntohl, ntohs, etc.
+#include <winsock2.h>
+#elif LINUX
+#include <in.h> //for using ntohl, ntohs, etc.
+#elif MAC_OS_X
+#include <arpa/inet.h> //for using ntohl, ntohs, etc.
+#endif
+
+
+namespace pcpp
+{
+
+TcpStreamData::TcpStreamData()
+{
+ m_Data = NULL;
+ m_DataLen = 0;
+ m_DeleteDataOnDestruction = false;
+}
+
+TcpStreamData::TcpStreamData(uint8_t* tcpData, size_t tcpDataLength, ConnectionData connData)
+{
+ m_Data = tcpData;
+ m_DataLen = tcpDataLength;
+ m_Connection = connData;
+ m_DeleteDataOnDestruction = true;
+}
+
+TcpStreamData::~TcpStreamData()
+{
+ if (m_DeleteDataOnDestruction && m_Data != NULL)
+ {
+ delete [] m_Data;
+ }
+}
+
+TcpStreamData::TcpStreamData(TcpStreamData& other)
+{
+ copyData(other);
+}
+
+TcpStreamData& TcpStreamData::operator=(const TcpStreamData& other)
+{
+ if (this == &other)
+ return *this;
+
+ if (m_DeleteDataOnDestruction && m_Data != NULL)
+ delete [] m_Data;
+
+ copyData(other);
+ return *this;
+}
+
+void TcpStreamData::copyData(const TcpStreamData& other)
+{
+ m_DataLen = other.m_DataLen;
+
+ if (other.m_Data != NULL)
+ {
+ m_Data = new uint8_t[m_DataLen];
+ memcpy(m_Data, other.m_Data, m_DataLen);
+ }
+ else
+ m_Data = NULL;
+
+ m_Connection = other.m_Connection;
+ m_DeleteDataOnDestruction = true;
+}
+
+
+TcpReassembly::TcpReassembly(OnTcpMessageReady onMessageReadyCallback, void* userCookie, OnTcpConnectionStart onConnectionStartCallback, OnTcpConnectionEnd onConnectionEndCallback)
+{
+ m_OnMessageReadyCallback = onMessageReadyCallback;
+ m_UserCookie = userCookie;
+ m_OnConnStart = onConnectionStartCallback;
+ m_OnConnEnd = onConnectionEndCallback;
+}
+
+TcpReassembly::~TcpReassembly()
+{
+ while (!m_ConnectionList.empty())
+ {
+ delete m_ConnectionList.begin()->second;
+ m_ConnectionList.erase(m_ConnectionList.begin());
+ }
+}
+
+void TcpReassembly::ReassemblePacket(Packet& tcpData)
+{
+ // TCP reassembly doesn't support IPv6 for now
+ IPv4Layer* ipLayer = tcpData.getLayerOfType<IPv4Layer>();
+ if (ipLayer == NULL)
+ return;
+
+ // Ignore non-TCP packets
+ TcpLayer* tcpLayer = tcpData.getLayerOfType<TcpLayer>();
+ if (tcpLayer == NULL)
+ return;
+
+ // calculate the TCP payload size from the value of IPV4's "total length" field.
+ // The reason we don't take the TCP payload size is because sometimes there is padding on the packet that doesn't belong to the TCP layer
+ size_t tcpPayloadSize = ntohs(ipLayer->getIPv4Header()->totalLength) - ipLayer->getHeaderLen() - tcpLayer->getHeaderLen();
+
+ // calculate if this packet has FIN or RST flags
+ bool isFin = (tcpLayer->getTcpHeader()->finFlag == 1);
+ bool isRst = (tcpLayer->getTcpHeader()->rstFlag == 1);
+ bool isFinOrRst = isFin || isRst;
+
+ // ignore ACK packets or TCP packets with no payload (except for SYN, FIN or RST packets which we'll later need)
+ if (tcpPayloadSize == 0 && tcpLayer->getTcpHeader()->synFlag == 0 && !isFinOrRst)
+ return;
+
+ // if the actual TCP payload is smaller than the value written in IPV4's "total length" field then adjust tcpPayloadSize to avoid buffer overflow
+ if (tcpLayer->getLayerPayloadSize() < tcpPayloadSize)
+ {
+ LOG_DEBUG("Got a packet where actual TCP payload size is smaller then the value written in IPv4's 'total length' header. Adjusting tcpPayloadSize to avoid buffer overflow");
+ tcpPayloadSize = tcpLayer->getLayerPayloadSize();
+ }
+
+
+ TcpReassemblyData* tcpReassemblyData = NULL;
+
+ // calculate flow key for this packet
+ uint32_t flowKey = hash5Tuple(&tcpData);
+
+ // if this packet belongs to a connection that was already closed (for example: data packet that comes after FIN), ignore it
+ if (m_ClosedConnectionList.find(flowKey) != m_ClosedConnectionList.end())
+ {
+ LOG_DEBUG("Ignoring packet of already closed flow [0x%X]", flowKey);
+ return;
+ }
+
+ // find the connection in the connection map
+ std::map<uint32_t, TcpReassemblyData*>::iterator iter = m_ConnectionList.find(flowKey);
+ if (iter == m_ConnectionList.end())
+ {
+ // if it's a packet of a new connection, create a TcpReassemblyData object and add it to the active connection list
+ tcpReassemblyData = new TcpReassemblyData();
+ tcpReassemblyData->connData.srcIP = ipLayer->getSrcIpAddress();
+ tcpReassemblyData->connData.dstIP = ipLayer->getDstIpAddress();
+ tcpReassemblyData->connData.srcPort = ntohs(tcpLayer->getTcpHeader()->portSrc);
+ tcpReassemblyData->connData.dstPort = ntohs(tcpLayer->getTcpHeader()->portDst);
+ tcpReassemblyData->connData.flowKey = flowKey;
+
+ m_ConnectionList[flowKey] = tcpReassemblyData;
+
+ m_ConnectionInfo.push_back(tcpReassemblyData->connData);
+
+ // fire connection start callback
+ if (m_OnConnStart != NULL)
+ m_OnConnStart(tcpReassemblyData->connData, m_UserCookie);
+ }
+ else // connection already exists
+ tcpReassemblyData = iter->second;
+
+ int sideIndex = -1;
+ bool first = false;
+
+ // calcualte packet's source IP and source port
+ uint32_t srcIP = ipLayer->getSrcIpAddress().toInt();
+ uint16_t srcPort = tcpLayer->getTcpHeader()->portSrc;
+
+ // if this is a new connection and it's the first packet we see on that connection
+ if (tcpReassemblyData->numOfSides == 0)
+ {
+ LOG_DEBUG("Setting side for new connection");
+
+ // open the first side of the connection, side index is 0
+ sideIndex = 0;
+ tcpReassemblyData->twoSides[sideIndex].srcIP = srcIP;
+ tcpReassemblyData->twoSides[sideIndex].srcPort = srcPort;
+ tcpReassemblyData->numOfSides++;
+ first = true;
+ }
+ // if there is already one side in this connection (which will be at side index 0)
+ else if (tcpReassemblyData->numOfSides == 1)
+ {
+ // check if packet belongs to that side
+ if (tcpReassemblyData->twoSides[0].srcIP == srcIP && tcpReassemblyData->twoSides[0].srcPort == srcPort)
+ {
+ sideIndex = 0;
+ }
+ else
+ {
+ // this means packet belong to the second side which doesn't yet exist. Open a second side with side index 1
+ LOG_DEBUG("Setting second side of a connection");
+ sideIndex = 1;
+ tcpReassemblyData->twoSides[sideIndex].srcIP = srcIP;
+ tcpReassemblyData->twoSides[sideIndex].srcPort = srcPort;
+ tcpReassemblyData->numOfSides++;
+ first = true;
+ }
+ }
+ // if there are already 2 sides open for this connection
+ else if (tcpReassemblyData->numOfSides == 2)
+ {
+ // check if packet matches side 0
+ if (tcpReassemblyData->twoSides[0].srcIP == srcIP && tcpReassemblyData->twoSides[0].srcPort == srcPort)
+ {
+ sideIndex = 0;
+ }
+ // check if packet matches side 1
+ else if (tcpReassemblyData->twoSides[1].srcIP == srcIP && tcpReassemblyData->twoSides[1].srcPort == srcPort)
+ {
+ sideIndex = 1;
+ }
+ // packet doesn't match either side. This case doesn't make sense but it's handled anyway. Packet will be ignored
+ else
+ {
+ LOG_ERROR("Error occurred - packet doesn't match either side of the connection!!");
+ return;
+ }
+ }
+ // there are more than 2 side - this case doesn't make sense and shouldn't happen, but handled anyway. Packet will be ignored
+ else
+ {
+ LOG_ERROR("Error occurred - connection has more than 2 sides!!");
+ return;
+ }
+
+ // if this side already got FIN or RST packet before, ignore this packet as this side is considered closed
+ if (tcpReassemblyData->twoSides[sideIndex].gotFinOrRst)
+ {
+ LOG_DEBUG("Got a packet after FIN or RST were already seen on this side (%d). Ignoring this packet", sideIndex);
+ return;
+ }
+
+ // handle FIN/RST packets that don't contain additional TCP data
+ if (isFinOrRst && tcpPayloadSize == 0)
+ {
+ LOG_DEBUG("Got FIN or RST packet without data on side %d", sideIndex);
+
+ handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+ return;
+ }
+
+ // check if this packet contains data from a different side than the side seen before.
+ // If this is the case then treat the out-of-order packet list as missing data and send them to the user (callback) together with an indication that some data was missing.
+ // Why? because a new packet from the other side means the previous message was probably already received and a new message is starting.
+ // In this case out-of-order packets are probably actually missing data
+ // For example: let's assume these are HTTP messages. If we're seeing the first packet of a response this means the server has already received the full request and is now starting
+ // to send the response. So if we still have out-of-order packets from the request it probably means that some packets were lost during the capture. So we don't expect the client to
+ // continue sending packets of the previous request, so we'll treat the out-of-order packets as missing data
+ //
+ // I'm aware that there are edge cases where the situation I described above is not true, but at some point we must clean the out-of-order packet list to avoid memory leak.
+ // I decided to do what Wireshark does and clean this list when starting to see a message from the other side
+ if (!first && tcpPayloadSize > 0 && tcpReassemblyData->prevSide != -1 && tcpReassemblyData->prevSide != sideIndex &&
+ tcpReassemblyData->twoSides[tcpReassemblyData->prevSide].tcpFragmentList.size() > 0)
+ {
+ LOG_DEBUG("Seeing a first data packet from a different side. Previous side was %d, current side is %d", tcpReassemblyData->prevSide, sideIndex);
+ checkOutOfOrderFragments(tcpReassemblyData, tcpReassemblyData->prevSide, true);
+ }
+ tcpReassemblyData->prevSide = sideIndex;
+
+ // extract sequence value from packet
+ uint32_t sequence = ntohl(tcpLayer->getTcpHeader()->sequenceNumber);
+
+ // if it's the first packet we see on this side of the connection
+ if (first)
+ {
+ LOG_DEBUG("First data from this side of the connection");
+
+ // set initial sequence
+ tcpReassemblyData->twoSides[sideIndex].sequence = sequence + tcpPayloadSize;
+ if (tcpLayer->getTcpHeader()->synFlag != 0)
+ tcpReassemblyData->twoSides[sideIndex].sequence++;
+
+ // send data to the callback
+ if (tcpPayloadSize != 0 && m_OnMessageReadyCallback != NULL)
+ {
+ TcpStreamData streamData(tcpLayer->getLayerPayload(), tcpPayloadSize, tcpReassemblyData->connData);
+ streamData.setDeleteDataOnDestruction(false);
+ m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+ }
+
+ // handle case where this packet is FIN or RST (although it's unlikely)
+ if (isFinOrRst)
+ handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+ // return - nothing else to do here
+ return;
+ }
+
+ // if packet sequence is smaller than expected - this means that part or all of the TCP data is being re-transmitted
+ if (sequence < tcpReassemblyData->twoSides[sideIndex].sequence)
+ {
+ LOG_DEBUG("Found new data with the sequence lower than expected");
+
+ // calculate the sequence after this packet to see if this TCP payload contains also new data
+ uint32_t newSequence = sequence + tcpPayloadSize;
+
+ // this means that some of payload is new
+ if (newSequence > tcpReassemblyData->twoSides[sideIndex].sequence)
+ {
+ // calculate the size of the new data
+ uint32_t newLength = tcpReassemblyData->twoSides[sideIndex].sequence - sequence;
+
+ LOG_DEBUG("Although sequence is lower than expected payload is long enough to contain new data. Calling the callback with the new data");
+
+ // update the sequence for this side to include the new data that was seen
+ tcpReassemblyData->twoSides[sideIndex].sequence += tcpPayloadSize - newLength;
+
+ // send only the new data to the callback
+ if (m_OnMessageReadyCallback != NULL)
+ {
+ TcpStreamData streamData(tcpLayer->getLayerPayload() + newLength, tcpPayloadSize - newLength, tcpReassemblyData->connData);
+ streamData.setDeleteDataOnDestruction(false);
+ m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+ }
+ }
+
+ // handle case where this packet is FIN or RST
+ if (isFinOrRst)
+ handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+ // return - nothing else to do here
+ return;
+ }
+
+ // if packet sequence is exactly as expected - this is the "good" case and the most common one
+ else if (sequence == tcpReassemblyData->twoSides[sideIndex].sequence)
+ {
+ // if TCP data size is 0 - nothing to do
+ if (tcpPayloadSize == 0)
+ {
+ LOG_DEBUG("Payload length is 0, doing nothing");
+
+ // handle case where this packet is FIN or RST
+ if (isFinOrRst)
+ handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+ return;
+ }
+
+ LOG_DEBUG("Found new data with expected sequence. Calling the callback");
+
+ // update the sequence for this side to include TCP data from this packet
+ tcpReassemblyData->twoSides[sideIndex].sequence += tcpPayloadSize;
+
+ // if this is a SYN packet - add +1 to the sequence
+ if (tcpLayer->getTcpHeader()->synFlag != 0)
+ tcpReassemblyData->twoSides[sideIndex].sequence++;
+
+ // send the data to the callback
+ if (m_OnMessageReadyCallback != NULL)
+ {
+ TcpStreamData streamData(tcpLayer->getLayerPayload(), tcpPayloadSize, tcpReassemblyData->connData);
+ streamData.setDeleteDataOnDestruction(false);
+ m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+ }
+
+ //while (checkOutOfOrderFragments(tcpReassemblyData, sideIndex)) {}
+
+ // now that we've seen new data, go over the list of out-of-order packets and see if one or more of them fits now
+ checkOutOfOrderFragments(tcpReassemblyData, sideIndex, false);
+
+ // handle case where this packet is FIN or RST
+ if (isFinOrRst)
+ handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+ // return - nothing else to do here
+ return;
+ }
+
+ // this case means sequence size of the packet is higher than expected which means the packet is out-of-order or some packets were lost (missing data).
+ // we don't know which of the 2 cases it is at this point so we just add this data to the out-of-order packet list
+ else
+ {
+ // if TCP data size is 0 - nothing to do
+ if (tcpPayloadSize == 0)
+ {
+ LOG_DEBUG("Payload length is 0, doing nothing");
+
+ // handle case where this packet is FIN or RST
+ if (isFinOrRst)
+ handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+
+ return;
+ }
+
+ // create a new TcpFragment, copy the TCP data to it and add this packet to the the out-of-order packet list
+ TcpFragment* newTcpFrag = new TcpFragment();
+ newTcpFrag->data = new uint8_t[tcpPayloadSize];
+ newTcpFrag->dataLength = tcpPayloadSize;
+ newTcpFrag->sequence = sequence;
+ memcpy(newTcpFrag->data, tcpLayer->getLayerPayload(), tcpPayloadSize);
+ tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.pushBack(newTcpFrag);
+
+ LOG_DEBUG("Found out-of-order packet and added a new TCP fragment with size %d to the out-of-order list of side %d", (int)tcpPayloadSize, sideIndex);
+
+ // handle case where this packet is FIN or RST
+ if (isFinOrRst)
+ {
+ handleFinOrRst(tcpReassemblyData, sideIndex, flowKey);
+ return;
+ }
+
+ }
+}
+
+void TcpReassembly::ReassemblePacket(RawPacket* tcpRawData)
+{
+ Packet parsedPacket(tcpRawData, false);
+ ReassemblePacket(parsedPacket);
+}
+
+std::string TcpReassembly::prepareMissingDataMessage(uint32_t missingDataLen)
+{
+ std::stringstream missingDataTextStream;
+ missingDataTextStream << "[" << missingDataLen << " bytes missing]";
+ return missingDataTextStream.str();
+}
+
+void TcpReassembly::handleFinOrRst(TcpReassemblyData* tcpReassemblyData, int sideIndex, uint32_t flowKey)
+{
+ // if this side already saw a FIN or RST packet, do nothing and return
+ if (tcpReassemblyData->twoSides[sideIndex].gotFinOrRst)
+ return;
+
+ LOG_DEBUG("Handling FIN or RST packet on side %d", sideIndex);
+
+ // set FIN/RST flag for this side
+ tcpReassemblyData->twoSides[sideIndex].gotFinOrRst = true;
+
+ // check if the other side also sees FIN or RST packet. If so - close the flow. Otherwise - only clear the out-of-order packets for this side
+ int otherSideIndex = 1 - sideIndex;
+ if (tcpReassemblyData->twoSides[otherSideIndex].gotFinOrRst)
+ closeConnectionInternal(flowKey, TcpReassembly::TcpReassemblyConnectionClosedByFIN_RST);
+ else
+ checkOutOfOrderFragments(tcpReassemblyData, sideIndex, true);
+}
+
+void TcpReassembly::checkOutOfOrderFragments(TcpReassemblyData* tcpReassemblyData, int sideIndex, bool cleanWholeFragList)
+{
+ bool foundSomething = false;
+
+ do
+ {
+ LOG_DEBUG("Starting first iteration of checkOutOfOrderFragments - looking for fragments that match the current sequence or have smaller sequence");
+
+ int index = 0;
+ foundSomething = false;
+
+ do
+ {
+ index = 0;
+ foundSomething = false;
+
+ // first fragment list iteration - go over the whole fragment list and see if can find fragments that match the current sequence
+ // or have smaller sequence but have big enough payload to get new data
+ while (index < (int)tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.size())
+ {
+ TcpFragment* curTcpFrag = tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.at(index);
+
+ // if fragment sequence matches the current sequence
+ if (curTcpFrag->sequence == tcpReassemblyData->twoSides[sideIndex].sequence)
+ {
+ // update sequence
+ tcpReassemblyData->twoSides[sideIndex].sequence += curTcpFrag->dataLength;
+ if (curTcpFrag->data != NULL)
+ {
+ LOG_DEBUG("Found an out-of-order packet matching to the current sequence with size %d on side %d. Pulling it out of the list and sending the data to the callback", (int)curTcpFrag->dataLength, sideIndex);
+
+ // send new data to callback
+
+ if (m_OnMessageReadyCallback != NULL)
+ {
+ TcpStreamData streamData(curTcpFrag->data, curTcpFrag->dataLength, tcpReassemblyData->connData);
+ streamData.setDeleteDataOnDestruction(false);
+ m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+ }
+ }
+
+
+ // remove fragment from list
+ tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.erase(tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.begin() + index);
+
+ foundSomething = true;
+
+ continue;
+ }
+
+ // if fragment sequence has lower sequence than the current sequence
+ if (curTcpFrag->sequence < tcpReassemblyData->twoSides[sideIndex].sequence)
+ {
+ // check if it still has new data
+ uint32_t newSequence = curTcpFrag->sequence + curTcpFrag->dataLength;
+
+ // it has new data
+ if (newSequence > tcpReassemblyData->twoSides[sideIndex].sequence)
+ {
+ // calculate the delta new data size
+ uint32_t newLength = tcpReassemblyData->twoSides[sideIndex].sequence - curTcpFrag->sequence;
+
+ LOG_DEBUG("Found a fragment in the out-of-order list which its sequence is lower than expected but its payload is long enough to contain new data. "
+ "Calling the callback with the new data. Fragment size is %d on side %d, new data size is %d", (int)curTcpFrag->dataLength, sideIndex, (int)(curTcpFrag->dataLength - newLength));
+
+ // update current sequence with the delta new data size
+ tcpReassemblyData->twoSides[sideIndex].sequence += curTcpFrag->dataLength - newLength;
+
+ // send only the new data to the callback
+ if (m_OnMessageReadyCallback != NULL)
+ {
+ TcpStreamData streamData(curTcpFrag->data + newLength, curTcpFrag->dataLength - newLength, tcpReassemblyData->connData);
+ streamData.setDeleteDataOnDestruction(false);
+ m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+ }
+
+ foundSomething = true;
+ }
+ else
+ {
+ LOG_DEBUG("Found a fragment in the out-of-order list which doesn't contain any new data, ignoring it. Fragment size is %d on side %d", (int)curTcpFrag->dataLength, sideIndex);
+ }
+
+ // delete fragment from list
+ tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.erase(tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.begin() + index);
+
+ continue;
+ }
+
+ //if got to here it means the fragment has higher sequence than current sequence, increment index and continue
+ index++;
+ }
+
+ // if managed to find new segment, do the search all over again
+ } while (foundSomething);
+
+
+ // if got here it means we're left only with fragments that have higher sequence than current sequence. This means out-of-order packets or
+ // missing data. If we don't want to clear the frag list yet, assume it's out-of-order and return
+ if (!cleanWholeFragList)
+ return;
+
+ LOG_DEBUG("Starting second iteration of checkOutOfOrderFragments - handle missing data");
+
+ // second fragment list iteration - now we're left only with fragments that have higher sequence than current sequence. This means missing data.
+ // Search for the fragment with the closest sequence to the current one
+
+ uint32_t closestSequence = 0xffffffff;
+ int closestSequenceFragIndex = -1;
+ index = 0;
+
+ while (index < (int)tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.size())
+ {
+ // extract segment at current index
+ TcpFragment* curTcpFrag = tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.at(index);
+
+ // check if its sequence is closer than current closest sequence
+ if (curTcpFrag->sequence < closestSequence)
+ {
+ closestSequence = curTcpFrag->sequence;
+ closestSequenceFragIndex = index;
+ }
+
+ index++;
+ }
+
+ // this means fragment list is not empty at this stage
+ if (closestSequenceFragIndex > -1)
+ {
+ // get the fragment with the closest sequence
+ TcpFragment* curTcpFrag = tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.at(closestSequenceFragIndex);
+
+ // calculate number of missing bytes
+ uint32_t missingDataLen = curTcpFrag->sequence - tcpReassemblyData->twoSides[sideIndex].sequence;
+
+ // update sequence
+ tcpReassemblyData->twoSides[sideIndex].sequence = curTcpFrag->sequence + curTcpFrag->dataLength;
+ if (curTcpFrag->data != NULL)
+ {
+ // send new data to callback
+ if (m_OnMessageReadyCallback != NULL)
+ {
+ // prepare missing data text
+ std::string missingDataTextStr = prepareMissingDataMessage(missingDataLen);
+
+ // add missing data text to the data that will be sent to the callback. This means that the data will look something like:
+ // "[xx bytes missing]<original_data>"
+ size_t dataWithMissingDataTextLen = missingDataTextStr.length() + curTcpFrag->dataLength;
+ uint8_t* dataWithMissingDataText = new uint8_t[dataWithMissingDataTextLen];
+ memcpy(dataWithMissingDataText, missingDataTextStr.c_str(), missingDataTextStr.length());
+ memcpy(dataWithMissingDataText + missingDataTextStr.length(), curTcpFrag->data, curTcpFrag->dataLength);
+
+ //TcpStreamData streamData(curTcpFrag->data, curTcpFrag->dataLength, tcpReassemblyData->connData);
+ //streamData.setDeleteDataOnDestruction(false);
+ TcpStreamData streamData(dataWithMissingDataText, dataWithMissingDataTextLen, tcpReassemblyData->connData);
+ m_OnMessageReadyCallback(sideIndex, streamData, m_UserCookie);
+
+ LOG_DEBUG("Found missing data on side %d: %d byte are missing. Sending the closest fragment which is in size %d + missing text message which size is %d",
+ sideIndex, missingDataLen, (int)curTcpFrag->dataLength, (int)missingDataTextStr.length());
+ }
+ }
+
+ // remove fragment from list
+ tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.erase(tcpReassemblyData->twoSides[sideIndex].tcpFragmentList.begin() + closestSequenceFragIndex);
+
+ LOG_DEBUG("Calling checkOutOfOrderFragments again from the start");
+
+ // call the method again from the start to do the whole search again (both iterations).
+ // the stop condition is when the list is empty (so closestSequenceFragIndex == -1)
+ foundSomething = true;
+ }
+
+ } while (foundSomething);
+}
+
+void TcpReassembly::closeConnection(uint32_t flowKey)
+{
+ closeConnectionInternal(flowKey, TcpReassembly::TcpReassemblyConnectionClosedManually);
+}
+
+void TcpReassembly::closeConnectionInternal(uint32_t flowKey, ConnectionEndReason reason)
+{
+ TcpReassemblyData* tcpReassemblyData = NULL;
+ std::map<uint32_t, TcpReassemblyData*>::iterator iter = m_ConnectionList.find(flowKey);
+ if (iter == m_ConnectionList.end())
+ {
+ LOG_ERROR("Cannot close flow with key 0x%X: cannot find flow", flowKey);
+ return;
+ }
+
+ LOG_DEBUG("Closing connection with flow key 0x%X", flowKey);
+
+ tcpReassemblyData = iter->second;
+
+ LOG_DEBUG("Calling checkOutOfOrderFragments on side 0");
+ checkOutOfOrderFragments(tcpReassemblyData, 0, true);
+
+ LOG_DEBUG("Calling checkOutOfOrderFragments on side 1");
+ checkOutOfOrderFragments(tcpReassemblyData, 1, true);
+
+ if (m_OnConnEnd != NULL)
+ m_OnConnEnd(tcpReassemblyData->connData, reason, m_UserCookie);
+
+ delete tcpReassemblyData;
+ m_ConnectionList.erase(iter);
+ m_ClosedConnectionList[flowKey] = true;
+
+ LOG_DEBUG("Connection with flow key 0x%X is closed", flowKey);
+}
+
+void TcpReassembly::closeAllConnections()
+{
+ LOG_DEBUG("Closing all flows");
+
+ while (!m_ConnectionList.empty())
+ {
+ TcpReassemblyData* tcpReassemblyData = m_ConnectionList.begin()->second;
+
+ uint32_t flowKey = tcpReassemblyData->connData.flowKey;
+ LOG_DEBUG("Closing connection with flow key 0x%X", flowKey);
+
+ LOG_DEBUG("Calling checkOutOfOrderFragments on side 0");
+ checkOutOfOrderFragments(tcpReassemblyData, 0, true);
+
+ LOG_DEBUG("Calling checkOutOfOrderFragments on side 1");
+ checkOutOfOrderFragments(tcpReassemblyData, 1, true);
+
+ if (m_OnConnEnd != NULL)
+ m_OnConnEnd(tcpReassemblyData->connData, TcpReassemblyConnectionClosedManually, m_UserCookie);
+
+ delete tcpReassemblyData;
+ m_ConnectionList.erase(m_ConnectionList.begin());
+ m_ClosedConnectionList[flowKey] = true;
+
+ LOG_DEBUG("Connection with flow key 0x%X is closed", flowKey);
+ }
+}
+
+const std::vector<ConnectionData>& TcpReassembly::getConnectionInformation() const
+{
+ return m_ConnectionInfo;
+}
+
+int TcpReassembly::isConnectionOpen(const ConnectionData& connection)
+{
+ if (m_ConnectionList.find(connection.flowKey) != m_ConnectionList.end())
+ return 1;
+
+ if (m_ClosedConnectionList.find(connection.flowKey) != m_ClosedConnectionList.end())
+ return 0;
+
+ return -1;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp b/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp
new file mode 100644
index 0000000..b1b6ff7
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/TextBasedProtocol.cpp
@@ -0,0 +1,651 @@
+#include "TextBasedProtocol.h"
+#include "Logger.h"
+#include "PayloadLayer.h"
+#include <string.h>
+#include <algorithm>
+#include <stdlib.h>
+
+namespace pcpp
+{
+
+// this implementation of strnlen is required since mingw doesn't have strnlen
+size_t tbp_my_own_strnlen(const char *s, size_t n)
+{
+ const char *p = s;
+ /* We don't check here for NULL pointers. */
+ for (;*p != 0 && n > 0; p++, n--)
+ ;
+ return (size_t) (p - s);
+}
+
+
+// -------- Class TextBasedProtocolMessage -----------------
+
+
+TextBasedProtocolMessage::TextBasedProtocolMessage(uint8_t* data, size_t dataLen, Layer* prevLayer, Packet* packet) : Layer(data, dataLen, prevLayer, packet),
+ m_FieldList(NULL), m_LastField(NULL), m_FieldsOffset(0) {}
+
+TextBasedProtocolMessage::TextBasedProtocolMessage(const TextBasedProtocolMessage& other) : Layer(other)
+{
+ copyDataFrom(other);
+
+}
+
+TextBasedProtocolMessage& TextBasedProtocolMessage::operator=(const TextBasedProtocolMessage& other)
+{
+ Layer::operator=(other);
+ HeaderField* curField = m_FieldList;
+ while (curField != NULL)
+ {
+ HeaderField* temp = curField;
+ curField = curField->getNextField();
+ delete temp;
+ }
+
+ copyDataFrom(other);
+
+ return *this;
+}
+
+void TextBasedProtocolMessage::copyDataFrom(const TextBasedProtocolMessage& other)
+{
+ // copy field list
+ if (other.m_FieldList != NULL)
+ {
+ m_FieldList = new HeaderField(*(other.m_FieldList));
+ HeaderField* curField = m_FieldList;
+ curField->attachToTextBasedProtocolMessage(this, other.m_FieldList->m_NameOffsetInMessage);
+ HeaderField* curOtherField = other.m_FieldList;
+ while (curOtherField->getNextField() != NULL)
+ {
+ HeaderField* newField = new HeaderField(*(curOtherField->getNextField()));
+ newField->attachToTextBasedProtocolMessage(this, curOtherField->getNextField()->m_NameOffsetInMessage);
+ curField->setNextField(newField);
+ curField = curField->getNextField();
+ curOtherField = curOtherField->getNextField();
+ }
+
+ m_LastField = curField;
+ }
+ else
+ {
+ m_FieldList = NULL;
+ m_LastField = NULL;
+ }
+
+ m_FieldsOffset = other.m_FieldsOffset;
+
+ // copy map
+ for(HeaderField* field = m_FieldList; field != NULL; field = field->getNextField())
+ {
+ m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(field->getFieldName(), field));
+ }
+
+}
+
+
+void TextBasedProtocolMessage::parseFields()
+{
+ char nameValueSeperator = getHeaderFieldNameValueSeparator();
+ bool spacesAllowedBetweenNameAndValue = spacesAllowedBetweenHeaderFieldNameAndValue();
+
+ HeaderField* firstField = new HeaderField(this, m_FieldsOffset, nameValueSeperator, spacesAllowedBetweenNameAndValue);
+ LOG_DEBUG("Added new field: name='%s'; offset in packet=%d; length=%d", firstField->getFieldName().c_str(), firstField->m_NameOffsetInMessage, (int)firstField->getFieldSize());
+ LOG_DEBUG(" Field value = %s", firstField->getFieldValue().c_str());
+
+ if (m_FieldList == NULL)
+ m_FieldList = firstField;
+ else
+ m_FieldList->setNextField(firstField);
+
+ std::string fieldName = firstField->getFieldName();
+ std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+ m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(fieldName, firstField));
+
+ // Last field will be empty and contain just "\n" or "\r\n". This field will mark the end of the header
+ HeaderField* curField = m_FieldList;
+ int curOffset = m_FieldsOffset;
+ // last field can be one of:
+ // a.) \r\n\r\n or \n\n marking the end of the header
+ // b.) the end of the packet
+ while (!curField->isEndOfHeader() && curOffset + curField->getFieldSize() < m_DataLen)
+ {
+ curOffset += curField->getFieldSize();
+ HeaderField* newField = new HeaderField(this, curOffset, nameValueSeperator, spacesAllowedBetweenNameAndValue);
+ LOG_DEBUG("Added new field: name='%s'; offset in packet=%d; length=%d", newField->getFieldName().c_str(), newField->m_NameOffsetInMessage, (int)newField->getFieldSize());
+ LOG_DEBUG(" Field value = %s", newField->getFieldValue().c_str());
+ curField->setNextField(newField);
+ curField = curField->getNextField();
+ fieldName = newField->getFieldName();
+ std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+ m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(fieldName, newField));
+ }
+
+ m_LastField = curField;
+}
+
+
+TextBasedProtocolMessage::~TextBasedProtocolMessage()
+{
+ while (m_FieldList != NULL)
+ {
+ HeaderField* temp = m_FieldList;
+ m_FieldList = m_FieldList->getNextField();
+ delete temp;
+ }
+}
+
+
+HeaderField* TextBasedProtocolMessage::addField(const std::string& fieldName, const std::string& fieldValue)
+{
+ HeaderField newField(fieldName, fieldValue, getHeaderFieldNameValueSeparator(), spacesAllowedBetweenHeaderFieldNameAndValue());
+ return addField(newField);
+}
+
+HeaderField* TextBasedProtocolMessage::addField(const HeaderField& newField)
+{
+ return insertField(m_LastField, newField);
+}
+
+HeaderField* TextBasedProtocolMessage::addEndOfHeader()
+{
+ HeaderField endOfHeaderField(PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER, "", '\0', false);
+ return insertField(m_LastField, endOfHeaderField);
+}
+
+
+HeaderField* TextBasedProtocolMessage::insertField(HeaderField* prevField, const std::string& fieldName, const std::string& fieldValue)
+{
+ HeaderField newField(fieldName, fieldValue, getHeaderFieldNameValueSeparator(), spacesAllowedBetweenHeaderFieldNameAndValue());
+ return insertField(prevField, newField);
+}
+
+HeaderField* TextBasedProtocolMessage::insertField(std::string prevFieldName, const std::string& fieldName, const std::string& fieldValue)
+{
+ if (prevFieldName == "")
+ {
+ return insertField(NULL, fieldName, fieldValue);
+ }
+ else
+ {
+ HeaderField* prevField = getFieldByName(prevFieldName);
+ if (prevField == NULL)
+ return NULL;
+
+ return insertField(prevField, fieldName, fieldValue);
+ }
+}
+
+
+HeaderField* TextBasedProtocolMessage::insertField(HeaderField* prevField, const HeaderField& newField)
+{
+ if (newField.m_TextBasedProtocolMessage != NULL)
+ {
+ LOG_ERROR("This field is already associated with another message");
+ return NULL;
+ }
+
+ if (prevField != NULL && prevField->getFieldName() == PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+ {
+ LOG_ERROR("Cannot add a field after end of header");
+ return NULL;
+ }
+
+ HeaderField* newFieldToAdd = new HeaderField(newField);
+
+ int newFieldOffset = m_FieldsOffset;
+ if (prevField != NULL)
+ newFieldOffset = prevField->m_NameOffsetInMessage + prevField->getFieldSize();
+
+ // extend layer to make room for the new field. Field will be added just before the last field
+ extendLayer(newFieldOffset, newFieldToAdd->getFieldSize());
+
+ HeaderField* curField = m_FieldList;
+ if (prevField != NULL)
+ curField = prevField->getNextField();
+
+ // go over all fields after prevField and update their offsets
+ shiftFieldsOffset(curField, newFieldToAdd->getFieldSize());
+
+ // copy new field data to message
+ memcpy(m_Data + newFieldOffset, newFieldToAdd->m_NewFieldData, newFieldToAdd->getFieldSize());
+
+ // attach new field to message
+ newFieldToAdd->attachToTextBasedProtocolMessage(this, newFieldOffset);
+
+ // insert field into fields link list
+ if (prevField == NULL)
+ {
+ newFieldToAdd->setNextField(m_FieldList);
+ m_FieldList = newFieldToAdd;
+ }
+ else
+ {
+ newFieldToAdd->setNextField(prevField->getNextField());
+ prevField->setNextField(newFieldToAdd);
+ }
+
+ // if newField is the last field, update m_LastField
+ if (newFieldToAdd->getNextField() == NULL)
+ m_LastField = newFieldToAdd;
+
+ // insert the new field into name to field map
+ std::string fieldName = newFieldToAdd->getFieldName();
+ std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+ m_FieldNameToFieldMap.insert(std::pair<std::string, HeaderField*>(fieldName, newFieldToAdd));
+
+ return newFieldToAdd;
+}
+
+bool TextBasedProtocolMessage::removeField(std::string fieldName, int index)
+{
+ std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+
+ HeaderField* fieldToRemove = NULL;
+
+ std::pair <std::multimap<std::string,HeaderField*>::iterator, std::multimap<std::string,HeaderField*>::iterator> range;
+ range = m_FieldNameToFieldMap.equal_range(fieldName);
+ int i = 0;
+ for (std::multimap<std::string,HeaderField*>::iterator iter = range.first; iter != range.second; ++iter)
+ {
+ if (i == index)
+ {
+ fieldToRemove = iter->second;
+ break;
+ }
+
+ i++;
+ }
+
+ if (fieldToRemove != NULL)
+ return removeField(fieldToRemove);
+ else
+ {
+ LOG_ERROR("Cannot find field '%s'", fieldName.c_str());
+ return false;
+ }
+}
+
+bool TextBasedProtocolMessage::removeField(HeaderField* fieldToRemove)
+{
+ if (fieldToRemove == NULL)
+ return true;
+
+ if (fieldToRemove->m_TextBasedProtocolMessage != this)
+ {
+ LOG_ERROR("Field isn't associated with this message");
+ return false;
+ }
+
+ std::string fieldName = fieldToRemove->getFieldName();
+
+ // shorten layer and delete this field
+ if (!shortenLayer(fieldToRemove->m_NameOffsetInMessage, fieldToRemove->getFieldSize()))
+ {
+ LOG_ERROR("Cannot shorten layer");
+ return false;
+ }
+
+ // update offsets of all fields after this field
+ HeaderField* curField = fieldToRemove->getNextField();
+ shiftFieldsOffset(curField, 0-fieldToRemove->getFieldSize());
+// while (curField != NULL)
+// {
+// curField->m_NameOffsetInMessage -= fieldToRemove->getFieldSize();
+// if (curField->m_ValueOffsetInMessage != -1)
+// curField->m_ValueOffsetInMessage -= fieldToRemove->getFieldSize();
+//
+// curField = curField->getNextField();
+// }
+
+ // update fields link list
+ if (fieldToRemove == m_FieldList)
+ m_FieldList = m_FieldList->getNextField();
+ else
+ {
+ curField = m_FieldList;
+ while (curField->getNextField() != fieldToRemove)
+ curField = curField->getNextField();
+
+ curField->setNextField(fieldToRemove->getNextField());
+ }
+
+ // re-calculate m_LastField if needed
+ if (fieldToRemove == m_LastField)
+ {
+ if (m_FieldList == NULL)
+ m_LastField = NULL;
+ else
+ {
+ curField = m_FieldList;
+ while (curField->getNextField() != NULL)
+ curField = curField->getNextField();
+ m_LastField = curField;
+ }
+ }
+
+ // remove the hash entry for this field
+ std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+ std::pair <std::multimap<std::string,HeaderField*>::iterator, std::multimap<std::string,HeaderField*>::iterator> range;
+ range = m_FieldNameToFieldMap.equal_range(fieldName);
+ for (std::multimap<std::string,HeaderField*>::iterator iter = range.first; iter != range.second; ++iter)
+ {
+ if (iter->second == fieldToRemove)
+ {
+ m_FieldNameToFieldMap.erase(iter);
+ break;
+ }
+ }
+
+ // finally - delete this field
+ delete fieldToRemove;
+
+ return true;
+}
+
+bool TextBasedProtocolMessage::isHeaderComplete()
+{
+ if (m_LastField == NULL)
+ return false;
+
+ return (m_LastField->getFieldName() == PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER);
+}
+
+void TextBasedProtocolMessage::shiftFieldsOffset(HeaderField* fromField, int numOfBytesToShift)
+{
+ while (fromField != NULL)
+ {
+ fromField->m_NameOffsetInMessage += numOfBytesToShift;
+ if (fromField->m_ValueOffsetInMessage != -1)
+ fromField->m_ValueOffsetInMessage += numOfBytesToShift;
+ fromField = fromField->getNextField();
+ }
+}
+
+HeaderField* TextBasedProtocolMessage::getFieldByName(std::string fieldName, int index)
+{
+ std::transform(fieldName.begin(), fieldName.end(), fieldName.begin(), ::tolower);
+
+ std::pair <std::multimap<std::string,HeaderField*>::iterator, std::multimap<std::string,HeaderField*>::iterator> range;
+ range = m_FieldNameToFieldMap.equal_range(fieldName);
+ int i = 0;
+ for (std::multimap<std::string,HeaderField*>::iterator iter = range.first; iter != range.second; ++iter)
+ {
+ if (i == index)
+ return iter->second;
+
+ i++;
+ }
+
+ return NULL;
+}
+
+int TextBasedProtocolMessage::getFieldCount()
+{
+ int result = 0;
+
+ HeaderField* curField = getFirstField();
+ while (curField != NULL)
+ {
+ if (!curField->isEndOfHeader())
+ result++;
+ curField = curField->getNextField();
+ }
+
+ return result;
+}
+
+void TextBasedProtocolMessage::parseNextLayer()
+{
+ size_t headerLen = getHeaderLen();
+ if (m_DataLen <= headerLen)
+ return;
+
+ m_NextLayer = new PayloadLayer(m_Data + headerLen, m_DataLen - headerLen, this, m_Packet);
+}
+
+size_t TextBasedProtocolMessage::getHeaderLen()
+{
+ return m_LastField->m_NameOffsetInMessage + m_LastField->m_FieldSize;
+}
+
+void TextBasedProtocolMessage::computeCalculateFields()
+{
+ //nothing to do for now
+}
+
+
+
+
+
+// -------- Class HeaderField -----------------
+
+
+HeaderField::HeaderField(TextBasedProtocolMessage* TextBasedProtocolMessage, int offsetInMessage, char nameValueSeperator, bool spacesAllowedBetweenNameAndValue) :
+ m_NewFieldData(NULL), m_TextBasedProtocolMessage(TextBasedProtocolMessage), m_NameOffsetInMessage(offsetInMessage), m_NextField(NULL),
+ m_NameValueSeperator(nameValueSeperator), m_SpacesAllowedBetweenNameAndValue(spacesAllowedBetweenNameAndValue)
+{
+ char* fieldData = (char*)(m_TextBasedProtocolMessage->m_Data + m_NameOffsetInMessage);
+ //char* fieldEndPtr = strchr(fieldData, '\n');
+ char* fieldEndPtr = (char *)memchr(fieldData, '\n',m_TextBasedProtocolMessage->m_DataLen-(size_t)m_NameOffsetInMessage);
+ if (fieldEndPtr == NULL)
+ m_FieldSize = tbp_my_own_strnlen(fieldData, m_TextBasedProtocolMessage->m_DataLen-(size_t)m_NameOffsetInMessage);
+ else
+ m_FieldSize = fieldEndPtr - fieldData + 1;
+
+ if ((*fieldData) == '\r' || (*fieldData) == '\n')
+ {
+ m_FieldNameSize = -1;
+ m_ValueOffsetInMessage = -1;
+ m_FieldValueSize = -1;
+ m_FieldNameSize = -1;
+ m_IsEndOfHeaderField = true;
+ return;
+ }
+ else
+ m_IsEndOfHeaderField = false;
+
+// char* fieldValuePtr = strchr(fieldData, ':');
+ char* fieldValuePtr = (char *)memchr(fieldData, nameValueSeperator, m_TextBasedProtocolMessage->m_DataLen-(size_t)m_NameOffsetInMessage);
+ // could not find the position of the separator, meaning field value position is unknown
+ if (fieldValuePtr == NULL)
+ {
+ m_ValueOffsetInMessage = -1;
+ m_FieldValueSize = -1;
+ m_FieldNameSize = m_FieldSize;
+ }
+ else
+ {
+ m_FieldNameSize = fieldValuePtr - fieldData;
+ // Header field looks like this: <field_name>[separator]<zero or more spaces><field_Value>
+ // So fieldValuePtr give us the position of the separator. Value offset is the first non-space byte forward
+ fieldValuePtr++;
+
+ if (spacesAllowedBetweenNameAndValue)
+ {
+ // advance fieldValuePtr 1 byte forward while didn't get to end of packet and fieldValuePtr points to a space char
+ while ((size_t)(fieldValuePtr - (char*)m_TextBasedProtocolMessage->m_Data) <= m_TextBasedProtocolMessage->getDataLen() && (*fieldValuePtr) == ' ')
+ fieldValuePtr++;
+ }
+
+ // reached the end of the packet and value start offset wasn't found
+ if ((size_t)(fieldValuePtr - (char*)(m_TextBasedProtocolMessage->m_Data)) > m_TextBasedProtocolMessage->getDataLen())
+ {
+ m_ValueOffsetInMessage = -1;
+ m_FieldValueSize = -1;
+ }
+ else
+ {
+ m_ValueOffsetInMessage = fieldValuePtr - (char*)m_TextBasedProtocolMessage->m_Data;
+ // couldn't find the end of the field, so assuming the field value length is from m_ValueOffsetInMessage until the end of the packet
+ if (fieldEndPtr == NULL)
+ m_FieldValueSize = (char*)(m_TextBasedProtocolMessage->m_Data + m_TextBasedProtocolMessage->getDataLen()) - fieldValuePtr;
+ else
+ {
+ m_FieldValueSize = fieldEndPtr - fieldValuePtr;
+ // if field ends with \r\n, decrease the value length by 1
+ if ((*(--fieldEndPtr)) == '\r')
+ m_FieldValueSize--;
+ }
+ }
+ }
+}
+
+HeaderField::HeaderField(std::string name, std::string value, char nameValueSeperator, bool spacesAllowedBetweenNameAndValue)
+{
+ m_NameValueSeperator = nameValueSeperator;
+ m_SpacesAllowedBetweenNameAndValue = spacesAllowedBetweenNameAndValue;
+ initNewField(name, value);
+}
+
+void HeaderField::initNewField(std::string name, std::string value)
+{
+ m_TextBasedProtocolMessage = NULL;
+ m_NameOffsetInMessage = 0;
+ m_NextField = NULL;
+
+ // first building the name-value separator
+ std::string nameValueSeparation(1, m_NameValueSeperator);
+ if (m_SpacesAllowedBetweenNameAndValue)
+ nameValueSeparation += " ";
+
+ // Field size is: name_length + separator_len + value_length + '\r\n'
+ if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+ m_FieldSize = name.length() + nameValueSeparation.length() + value.length() + 2;
+ else
+ // Field is \r\n (2B)
+ m_FieldSize = 2;
+
+ m_NewFieldData = new uint8_t[m_FieldSize];
+ std::string fieldData;
+
+ if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+ fieldData = name + nameValueSeparation + value + "\r\n";
+ else
+ fieldData = "\r\n";
+
+ // copy field data to m_NewFieldData
+ memcpy(m_NewFieldData, fieldData.c_str(), m_FieldSize);
+
+ // calculate value offset
+ if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+ m_ValueOffsetInMessage = name.length() + nameValueSeparation.length();
+ else
+ m_ValueOffsetInMessage = 0;
+ m_FieldNameSize = name.length();
+ m_FieldValueSize = value.length();
+
+ if (name != PCPP_END_OF_TEXT_BASED_PROTOCOL_HEADER)
+ m_IsEndOfHeaderField = false;
+ else
+ m_IsEndOfHeaderField = true;
+}
+
+HeaderField::~HeaderField()
+{
+ if (m_NewFieldData != NULL)
+ delete [] m_NewFieldData;
+}
+
+HeaderField::HeaderField(const HeaderField& other) : m_NameValueSeperator('\0'), m_SpacesAllowedBetweenNameAndValue(false)
+{
+ m_NameValueSeperator = other.m_NameValueSeperator;
+ m_SpacesAllowedBetweenNameAndValue = other.m_SpacesAllowedBetweenNameAndValue;
+ initNewField(other.getFieldName(), other.getFieldValue());
+}
+
+char* HeaderField::getData()
+{
+ if (m_TextBasedProtocolMessage == NULL)
+ return (char*)m_NewFieldData;
+ else
+ return (char*)(m_TextBasedProtocolMessage->m_Data);
+}
+
+std::string HeaderField::getFieldName() const
+{
+ std::string result;
+
+ if (m_FieldNameSize != (size_t)-1)
+ result.assign((const char*)(((HeaderField*)this)->getData() + m_NameOffsetInMessage), m_FieldNameSize);
+
+ return result;
+}
+
+std::string HeaderField::getFieldValue() const
+{
+ std::string result;
+ if (m_ValueOffsetInMessage != -1)
+ result.assign((const char*)(((HeaderField*)this)->getData() + m_ValueOffsetInMessage), m_FieldValueSize);
+ return result;
+}
+
+bool HeaderField::setFieldValue(std::string newValue)
+{
+ // Field isn't linked with any message yet
+ if (m_TextBasedProtocolMessage == NULL)
+ {
+ std::string name = getFieldName();
+ delete [] m_NewFieldData;
+ initNewField(name, newValue);
+ return true;
+ }
+
+ std::string curValue = getFieldValue();
+ int lengthDifference = newValue.length() - curValue.length();
+ // new value is longer than current value
+ if (lengthDifference > 0)
+ {
+ if (!m_TextBasedProtocolMessage->extendLayer(m_ValueOffsetInMessage, lengthDifference))
+ {
+ LOG_ERROR("Could not extend layer");
+ return false;
+ }
+ }
+ // new value is shorter than current value
+ else if (lengthDifference < 0)
+ {
+ if (!m_TextBasedProtocolMessage->shortenLayer(m_ValueOffsetInMessage, 0-lengthDifference))
+ {
+ LOG_ERROR("Could not shorten layer");
+ return false;
+ }
+ }
+
+ if (lengthDifference != 0)
+ m_TextBasedProtocolMessage->shiftFieldsOffset(getNextField(), lengthDifference);
+
+ // update sizes
+ m_FieldValueSize += lengthDifference;
+ m_FieldSize += lengthDifference;
+
+ // write new value to field data
+ memcpy(getData() + m_ValueOffsetInMessage, newValue.c_str(), newValue.length());
+
+ return true;
+}
+
+void HeaderField::attachToTextBasedProtocolMessage(TextBasedProtocolMessage* message, int fieldOffsetInMessage)
+{
+ if (m_TextBasedProtocolMessage != NULL && m_TextBasedProtocolMessage != message)
+ {
+ LOG_ERROR("Header field already associated with another message");
+ return;
+ }
+
+ if (m_NewFieldData == NULL)
+ {
+ LOG_ERROR("Header field doesn't have new field data");
+ return;
+ }
+
+ delete [] m_NewFieldData;
+ m_NewFieldData = NULL;
+ m_TextBasedProtocolMessage = message;
+
+ int valueAndNameDifference = m_ValueOffsetInMessage - m_NameOffsetInMessage;
+ m_NameOffsetInMessage = fieldOffsetInMessage;
+ m_ValueOffsetInMessage = m_NameOffsetInMessage + valueAndNameDifference;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/UdpLayer.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/UdpLayer.cpp b/thirdparty/pcap++/Packet++/src/UdpLayer.cpp
new file mode 100644
index 0000000..1451421
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/UdpLayer.cpp
@@ -0,0 +1,122 @@
+#define LOG_MODULE PacketLogModuleUdpLayer
+
+#include <UdpLayer.h>
+#include <IpUtils.h>
+#include <PayloadLayer.h>
+#include <IPv4Layer.h>
+#include <IPv6Layer.h>
+#include <DnsLayer.h>
+#include <DhcpLayer.h>
+#include <VxlanLayer.h>
+#include <SipLayer.h>
+#include <Logger.h>
+#include <string.h>
+#include <sstream>
+
+namespace pcpp
+{
+
+UdpLayer::UdpLayer(uint16_t portSrc, uint16_t portDst)
+{
+ m_DataLen = sizeof(udphdr);
+ m_Data = new uint8_t[m_DataLen];
+ memset(m_Data, 0, m_DataLen);
+ udphdr* udpHdr = (udphdr*)m_Data;
+ udpHdr->portDst = htons(portDst);
+ udpHdr->portSrc = htons(portSrc);
+ m_Protocol = UDP;
+}
+
+uint16_t UdpLayer::calculateChecksum(bool writeResultToPacket)
+{
+ udphdr* udpHdr = (udphdr*)m_Data;
+ uint16_t checksumRes = 0;
+ uint16_t currChecksumValue = udpHdr->headerChecksum;
+
+ if (m_PrevLayer != NULL)
+ {
+ udpHdr->headerChecksum = 0;
+ ScalarBuffer<uint16_t> vec[2];
+ LOG_DEBUG("data len = %d", (int)m_DataLen);
+ vec[0].buffer = (uint16_t*)m_Data;
+ vec[0].len = m_DataLen;
+
+ if (m_PrevLayer->getProtocol() == IPv4)
+ {
+ uint32_t srcIP = ((IPv4Layer*)m_PrevLayer)->getSrcIpAddress().toInt();
+ uint32_t dstIP = ((IPv4Layer*)m_PrevLayer)->getDstIpAddress().toInt();
+ uint16_t pseudoHeader[6];
+ pseudoHeader[0] = srcIP >> 16;
+ pseudoHeader[1] = srcIP & 0xFFFF;
+ pseudoHeader[2] = dstIP >> 16;
+ pseudoHeader[3] = dstIP & 0xFFFF;
+ pseudoHeader[4] = 0xffff & udpHdr->length;
+ pseudoHeader[5] = htons(0x00ff & PACKETPP_IPPROTO_UDP);
+ vec[1].buffer = pseudoHeader;
+ vec[1].len = 12;
+ checksumRes = compute_checksum(vec, 2);
+ LOG_DEBUG("calculated checksum = 0x%4X", checksumRes);
+ }
+ else if (m_PrevLayer->getProtocol() == IPv6)
+ {
+ uint16_t pseudoHeader[18];
+ ((IPv6Layer*)m_PrevLayer)->getSrcIpAddress().copyTo((uint8_t*)pseudoHeader);
+ ((IPv6Layer*)m_PrevLayer)->getDstIpAddress().copyTo((uint8_t*)(pseudoHeader+8));
+ pseudoHeader[16] = 0xffff & udpHdr->length;
+ pseudoHeader[17] = htons(0x00ff & PACKETPP_IPPROTO_UDP);
+ vec[1].buffer = pseudoHeader;
+ vec[1].len = 36;
+ checksumRes = compute_checksum(vec, 2);
+ LOG_DEBUG("calculated checksum = 0x%4X", checksumRes);
+ }
+ }
+
+ if(writeResultToPacket)
+ udpHdr->headerChecksum = htons(checksumRes);
+ else
+ udpHdr->headerChecksum = currChecksumValue;
+
+ return checksumRes;
+}
+
+void UdpLayer::parseNextLayer()
+{
+ if (m_DataLen <= sizeof(udphdr))
+ return;
+
+ udphdr* udpHder = getUdpHeader();
+ uint16_t portDst = ntohs(udpHder->portDst);
+ uint16_t portSrc = ntohs(udpHder->portSrc);
+
+ if ((portSrc == 68 && portDst == 67) || (portSrc == 67 && portDst == 68) || (portSrc == 67 && portDst == 67))
+ m_NextLayer = new DhcpLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+ else if (portDst == 4789)
+ m_NextLayer = new VxlanLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+ else if ((m_DataLen - sizeof(udphdr) >= sizeof(dnshdr)) && (DnsLayer::getDNSPortMap()->find(portDst) != DnsLayer::getDNSPortMap()->end() || DnsLayer::getDNSPortMap()->find(portSrc) != DnsLayer::getDNSPortMap()->end()))
+ m_NextLayer = new DnsLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+ else if (((portDst == 5060) || (portDst == 5061)) && (SipRequestFirstLine::parseMethod((char*)(m_Data + sizeof(udphdr)), m_DataLen - sizeof(udphdr)) != SipRequestLayer::SipMethodUnknown))
+ m_NextLayer = new SipRequestLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+ else if (((portDst == 5060) || (portDst == 5061)) && (SipResponseFirstLine::parseStatusCode((char*)(m_Data + sizeof(udphdr)), m_DataLen - sizeof(udphdr)) != SipResponseLayer::SipStatusCodeUnknown))
+ m_NextLayer = new SipResponseLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+ else
+ m_NextLayer = new PayloadLayer(m_Data + sizeof(udphdr), m_DataLen - sizeof(udphdr), this, m_Packet);
+}
+
+void UdpLayer::computeCalculateFields()
+{
+ udphdr* udpHdr = (udphdr*)m_Data;
+ udpHdr->length = htons(m_DataLen);
+ calculateChecksum(true);
+}
+
+std::string UdpLayer::toString()
+{
+ std::ostringstream srcPortStream;
+ srcPortStream << ntohs(getUdpHeader()->portSrc);
+ std::ostringstream dstPortStream;
+ dstPortStream << ntohs(getUdpHeader()->portDst);
+
+ return "UDP Layer, Src port: " + srcPortStream.str() + ", Dst port: " + dstPortStream.str();
+}
+
+} // namespace pcpp
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/VlanLayer.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/VlanLayer.cpp b/thirdparty/pcap++/Packet++/src/VlanLayer.cpp
new file mode 100644
index 0000000..8617b6c
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/VlanLayer.cpp
@@ -0,0 +1,81 @@
+#define LOG_MODULE PacketLogModuleVlanLayer
+
+#include <VlanLayer.h>
+#include <IPv4Layer.h>
+#include <IPv6Layer.h>
+#include <PayloadLayer.h>
+#include <ArpLayer.h>
+#include <PPPoELayer.h>
+#include <MplsLayer.h>
+#include <string.h>
+#include <sstream>
+#if defined(WIN32) || defined(WINx64)
+#include <winsock2.h>
+#elif LINUX
+#include <in.h>
+#endif
+
+namespace pcpp
+{
+
+VlanLayer::VlanLayer(const uint16_t vlanID, bool cfi, uint8_t priority, uint16_t etherType)
+{
+ m_DataLen = sizeof(vlan_header);
+ m_Data = new uint8_t[m_DataLen];
+ memset(m_Data, 0, m_DataLen);
+ m_Protocol = VLAN;
+
+ vlan_header* vlanHeader = getVlanHeader();
+ setVlanID(vlanID);
+ setCFI(cfi);
+ setPriority(priority);
+ vlanHeader->etherType = htons(etherType);
+}
+
+void VlanLayer::parseNextLayer()
+{
+ if (m_DataLen <= sizeof(vlan_header))
+ return;
+
+ vlan_header* hdr = getVlanHeader();
+ switch (ntohs(hdr->etherType))
+ {
+ case PCPP_ETHERTYPE_IP:
+ m_NextLayer = new IPv4Layer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ break;
+ case PCPP_ETHERTYPE_IPV6:
+ m_NextLayer = new IPv6Layer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ break;
+ case PCPP_ETHERTYPE_ARP:
+ m_NextLayer = new ArpLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ break;
+ case PCPP_ETHERTYPE_VLAN:
+ m_NextLayer = new VlanLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ break;
+ case PCPP_ETHERTYPE_PPPOES:
+ m_NextLayer = new PPPoESessionLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ break;
+ case PCPP_ETHERTYPE_PPPOED:
+ m_NextLayer = new PPPoEDiscoveryLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ break;
+ case PCPP_ETHERTYPE_MPLS:
+ m_NextLayer = new MplsLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ break;
+ default:
+ m_NextLayer = new PayloadLayer(m_Data + sizeof(vlan_header), m_DataLen - sizeof(vlan_header), this, m_Packet);
+ }
+}
+
+std::string VlanLayer::toString()
+{
+ std::ostringstream cfiStream;
+ cfiStream << (int)getCFI();
+ std::ostringstream priStream;
+ priStream << (int)getPriority();
+ std::ostringstream idStream;
+ idStream << getVlanID();
+
+ return "VLAN Layer, Priority: " + priStream.str() + ", Vlan ID: " + idStream.str() + ", CFI: " + cfiStream.str();
+}
+
+} // namespace pcpp
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp b/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp
new file mode 100644
index 0000000..231ccce
--- /dev/null
+++ b/thirdparty/pcap++/Packet++/src/VxlanLayer.cpp
@@ -0,0 +1,64 @@
+#include <VxlanLayer.h>
+#include <EthLayer.h>
+#include <string.h>
+#if defined(WIN32) || defined(WINx64) //for using ntohl, ntohs, etc.
+#include <winsock2.h>
+#elif LINUX
+#include <in.h> //for using ntohl, ntohs, etc.
+#elif MAC_OS_X
+#include <arpa/inet.h> //for using ntohl, ntohs, etc.
+#endif
+
+
+namespace pcpp
+{
+
+VxlanLayer::VxlanLayer(uint32_t vni, uint16_t groupPolicyID, bool setGbpFlag, bool setPolicyAppliedFlag, bool setDontLearnFlag)
+{
+ m_DataLen = sizeof(vxlan_header);
+ m_Data = new uint8_t[m_DataLen];
+ memset(m_Data, 0, m_DataLen);
+ m_Protocol = VXLAN;
+
+ if (vni != 0)
+ setVNI(vni);
+
+ vxlan_header* vxlanHeader = getVxlanHeader();
+
+ if (groupPolicyID != 0)
+ vxlanHeader->groupPolicyID = htons(groupPolicyID);
+
+ vxlanHeader->vniPresentFlag = 1;
+
+ if (setGbpFlag)
+ vxlanHeader->gbpFlag = 1;
+ if (setPolicyAppliedFlag)
+ vxlanHeader->policyAppliedFlag = 1;
+ if (setDontLearnFlag)
+ vxlanHeader->dontLearnFlag = 1;
+}
+
+uint32_t VxlanLayer::getVNI()
+{
+ return (ntohl(getVxlanHeader()->vni) >> 8);
+}
+
+void VxlanLayer::setVNI(uint32_t vni)
+{
+ getVxlanHeader()->vni = htonl(vni << 8);
+}
+
+std::string VxlanLayer::toString()
+{
+ return "VXLAN Layer";
+}
+
+void VxlanLayer::parseNextLayer()
+{
+ if (m_DataLen <= sizeof(vxlan_header))
+ return;
+
+ m_NextLayer = new EthLayer(m_Data + sizeof(vxlan_header), m_DataLen - sizeof(vxlan_header), this, m_Packet);
+}
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c0a788b3/thirdparty/pcap++/Pcap++/Makefile
----------------------------------------------------------------------
diff --git a/thirdparty/pcap++/Pcap++/Makefile b/thirdparty/pcap++/Pcap++/Makefile
new file mode 100755
index 0000000..6a1a2d5
--- /dev/null
+++ b/thirdparty/pcap++/Pcap++/Makefile
@@ -0,0 +1,95 @@
+ifeq ($(wildcard ../mk/platform.mk),)
+ $(error platform.mk not found! Please run configure script first)
+endif
+
+include ../mk/platform.mk
+
+ifeq ($(wildcard ../mk/PcapPlusPlus.mk),)
+ $(error PcapPlusPlus.mk not found! Please run configure script first)
+endif
+
+include ../mk/PcapPlusPlus.mk
+
+SOURCES := $(wildcard src/*.cpp)
+OBJS_FILENAMES := $(patsubst src/%.cpp,Obj/%.o,$(SOURCES))
+
+COMMONPP_HOME := ../Common++
+PACKETPP_HOME := ../Packet++
+LIGHT_PCAPNG_PCPP_HOME := ../3rdParty/LightPcapNg
+LIGHT_PCAPNG_HOME := $(LIGHT_PCAPNG_PCPP_HOME)/LightPcapNg
+
+LIGHT_PCAPNG_SOURCES := $(wildcard $(LIGHT_PCAPNG_HOME)/src/*.c)
+LIGHT_PCAPNG_OBJS_FILENAMES := $(patsubst $(LIGHT_PCAPNG_HOME)/src/%.c,$(LIGHT_PCAPNG_PCPP_HOME)/Obj/%.o,$(LIGHT_PCAPNG_SOURCES))
+
+ifdef WIN32
+DEPS := -DWPCAP -DHAVE_REMOTE -DHAVE_STRUCT_TIMESPEC
+endif
+ifdef LINUX
+DEPS := -DLINUX
+endif
+ifdef PF_RING_HOME
+DEPS += -DUSE_PF_RING
+endif
+ifdef USE_DPDK
+DEPS += -DUSE_DPDK
+endif
+ifdef MAC_OS_X
+DEPS := -DMAC_OS_X
+endif
+
+INCLUDES := -I"./src" \
+ -I"./header" \
+ -I"$(COMMONPP_HOME)/header" \
+ -I"$(PACKETPP_HOME)/header" \
+ -I"$(LIGHT_PCAPNG_HOME)/include"
+
+ifdef WIN32
+INCLUDES += -I$(MINGW_HOME)/include/ddk \
+ -I$(WINPCAP_HOME)/Include
+endif
+ifdef LINUX
+INCLUDES += -I/usr/include/netinet
+endif
+ifdef PF_RING_HOME
+INCLUDES += -I$(PF_RING_HOME)/userland/lib -I$(PF_RING_HOME)/kernel
+endif
+ifdef USE_DPDK
+INCLUDES += -I"$(RTE_SDK)/build/include"
+endif
+
+ifdef HAS_PCAP_IMMEDIATE_MODE
+DEPS += -DHAS_PCAP_IMMEDIATE_MODE
+endif
+
+ifdef USE_DPDK
+FLAGS := -msse -msse2 -msse3 -mssse3
+endif
+
+Obj/%.o: src/%.cpp
+ @echo 'Building file: $<'
+ @$(G++) $(DEPS) $(INCLUDES) $(FLAGS) -O2 -g -Wall -c -fmessage-length=0 -MMD -MP -MF"$(@:Obj/%.o=Obj/%.d)" -MT"$(@:Obj/%.o=Obj/%.d)" -o "$@" "$<"
+
+CUR_TARGET := $(notdir $(shell pwd))
+
+.SILENT:
+
+all: Pcap++.lib
+
+start:
+ @echo '==> Building target: $(CUR_TARGET)'
+
+create-directories:
+ @$(MKDIR) -p Obj
+ @$(MKDIR) -p Lib
+
+Pcap++.lib: start create-directories $(OBJS_FILENAMES)
+ @cd $(LIGHT_PCAPNG_PCPP_HOME) && $(MAKE) light_pcapng_sources
+ @$(AR) -r "Lib/$(LIB_PREFIX)Pcap++$(LIB_EXT)" $(OBJS_FILENAMES) $(LIGHT_PCAPNG_OBJS_FILENAMES)
+ @echo 'Finished successfully building: $(CUR_TARGET)'
+ @echo ' '
+
+clean:
+ @cd $(LIGHT_PCAPNG_PCPP_HOME) && $(MAKE) clean
+ @$(RM) -rf ./Obj/*
+ @$(RM) -rf ./Lib/*
+ @echo 'Clean finished: $(CUR_TARGET)'