You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/10 11:11:46 UTC

[04/19] nifi-minifi-cpp git commit: MINIFICPP-342: MQTT extension

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.c b/thirdparty/paho.mqtt.c/src/MQTTPacket.c
new file mode 100644
index 0000000..c21a432
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.c
@@ -0,0 +1,755 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief functions to deal with reading and writing of MQTT packets from and to sockets
+ *
+ * Some other related functions are in the MQTTPacketOut module
+ */
+
+#include "MQTTPacket.h"
+#include "Log.h"
+#if !defined(NO_PERSISTENCE)
+	#include "MQTTPersistence.h"
+#endif
+#include "Messages.h"
+#include "StackTrace.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "Heap.h"
+
+#if !defined(min)
+#define min(A,B) ( (A) < (B) ? (A):(B))
+#endif
+
+/**
+ * List of the predefined MQTT v3 packet names.
+ */
+static const char *packet_names[] =
+{
+	"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
+	"PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
+	"PINGREQ", "PINGRESP", "DISCONNECT"
+};
+
+const char** MQTTClient_packet_names = packet_names;
+
+
+/**
+ * Converts an MQTT packet code into its name
+ * @param ptype packet code
+ * @return the corresponding string, or "UNKNOWN"
+ */
+const char* MQTTPacket_name(int ptype)
+{
+	return (ptype >= 0 && ptype <= DISCONNECT) ? packet_names[ptype] : "UNKNOWN";
+}
+
+/**
+ * Array of functions to build packets, indexed according to packet code
+ */
+pf new_packets[] =
+{
+	NULL,	/**< reserved */
+	NULL,	/**< MQTTPacket_connect*/
+	MQTTPacket_connack, /**< CONNACK */
+	MQTTPacket_publish,	/**< PUBLISH */
+	MQTTPacket_ack, /**< PUBACK */
+	MQTTPacket_ack, /**< PUBREC */
+	MQTTPacket_ack, /**< PUBREL */
+	MQTTPacket_ack, /**< PUBCOMP */
+	NULL, /**< MQTTPacket_subscribe*/
+	MQTTPacket_suback, /**< SUBACK */
+	NULL, /**< MQTTPacket_unsubscribe*/
+	MQTTPacket_ack, /**< UNSUBACK */
+	MQTTPacket_header_only, /**< PINGREQ */
+	MQTTPacket_header_only, /**< PINGRESP */
+	MQTTPacket_header_only  /**< DISCONNECT */
+};
+
+
+static char* readUTFlen(char** pptr, char* enddata, int* len);
+static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net);
+
+/**
+ * Reads one MQTT packet from a socket.
+ * @param socket a socket from which to read an MQTT packet
+ * @param error pointer to the error code which is completed if no packet is returned
+ * @return the packet structure or NULL if there was an error
+ */
+void* MQTTPacket_Factory(networkHandles* net, int* error)
+{
+	char* data = NULL;
+	static Header header;
+	size_t remaining_length;
+	int ptype;
+	void* pack = NULL;
+	size_t actual_len = 0;
+
+	FUNC_ENTRY;
+	*error = SOCKET_ERROR;  /* indicate whether an error occurred, or not */
+
+	/* read the packet data from the socket */
+#if defined(OPENSSL)
+	*error = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, &header.byte) : Socket_getch(net->socket, &header.byte); 
+#else
+	*error = Socket_getch(net->socket, &header.byte);
+#endif
+	if (*error != TCPSOCKET_COMPLETE)   /* first byte is the header byte */
+		goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
+
+	/* now read the remaining length, so we know how much more to read */
+	if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE)
+		goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
+
+	/* now read the rest, the variable header and payload */
+#if defined(OPENSSL)
+	data = (net->ssl) ? SSLSocket_getdata(net->ssl, net->socket, remaining_length, &actual_len) : 
+						Socket_getdata(net->socket, remaining_length, &actual_len);
+#else
+	data = Socket_getdata(net->socket, remaining_length, &actual_len);
+#endif
+	if (data == NULL)
+	{
+		*error = SOCKET_ERROR;
+		goto exit; /* socket error */
+	}
+
+	if (actual_len != remaining_length)
+		*error = TCPSOCKET_INTERRUPTED;
+	else
+	{
+		ptype = header.bits.type;
+		if (ptype < CONNECT || ptype > DISCONNECT || new_packets[ptype] == NULL)
+			Log(TRACE_MIN, 2, NULL, ptype);
+		else
+		{
+			if ((pack = (*new_packets[ptype])(header.byte, data, remaining_length)) == NULL)
+				*error = BAD_MQTT_PACKET;
+#if !defined(NO_PERSISTENCE)
+			else if (header.bits.type == PUBLISH && header.bits.qos == 2)
+			{
+				int buf0len;
+				char *buf = malloc(10);
+				buf[0] = header.byte;
+				buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length);
+				*error = MQTTPersistence_put(net->socket, buf, buf0len, 1,
+					&data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1);
+				free(buf);
+			}
+#endif
+		}
+	}
+	if (pack)
+		time(&(net->lastReceived));
+exit:
+	FUNC_EXIT_RC(*error);
+	return pack;
+}
+
+
+/**
+ * Sends an MQTT packet in one system call write
+ * @param socket the socket to which to write the data
+ * @param header the one-byte MQTT header
+ * @param buffer the rest of the buffer to write (not including remaining length)
+ * @param buflen the length of the data in buffer to be written
+ * @return the completion code (TCPSOCKET_COMPLETE etc)
+ */
+int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData)
+{
+	int rc;
+	size_t buf0len;
+	char *buf;
+
+	FUNC_ENTRY;
+	buf = malloc(10);
+	buf[0] = header.byte;
+	buf0len = 1 + MQTTPacket_encode(&buf[1], buflen);
+#if !defined(NO_PERSISTENCE)
+	if (header.bits.type == PUBREL)
+	{
+		char* ptraux = buffer;
+		int msgId = readInt(&ptraux);
+		rc = MQTTPersistence_put(net->socket, buf, buf0len, 1, &buffer, &buflen,
+			header.bits.type, msgId, 0);
+	}
+#endif
+
+#if defined(OPENSSL)
+	if (net->ssl)
+		rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, 1, &buffer, &buflen, &freeData);
+	else
+#endif
+		rc = Socket_putdatas(net->socket, buf, buf0len, 1, &buffer, &buflen, &freeData);
+		
+	if (rc == TCPSOCKET_COMPLETE)
+		time(&(net->lastSent));
+	
+	if (rc != TCPSOCKET_INTERRUPTED)
+	  free(buf);
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Sends an MQTT packet from multiple buffers in one system call write
+ * @param socket the socket to which to write the data
+ * @param header the one-byte MQTT header
+ * @param count the number of buffers
+ * @param buffers the rest of the buffers to write (not including remaining length)
+ * @param buflens the lengths of the data in the array of buffers to be written
+ * @return the completion code (TCPSOCKET_COMPLETE etc)
+ */
+int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees)
+{
+	int i, rc;
+	size_t buf0len, total = 0;
+	char *buf;
+
+	FUNC_ENTRY;
+	buf = malloc(10);
+	buf[0] = header.byte;
+	for (i = 0; i < count; i++)
+		total += buflens[i];
+	buf0len = 1 + MQTTPacket_encode(&buf[1], total);
+#if !defined(NO_PERSISTENCE)
+	if (header.bits.type == PUBLISH && header.bits.qos != 0)
+	{   /* persist PUBLISH QoS1 and Qo2 */
+		char *ptraux = buffers[2];
+		int msgId = readInt(&ptraux);
+		rc = MQTTPersistence_put(net->socket, buf, buf0len, count, buffers, buflens,
+			header.bits.type, msgId, 0);
+	}
+#endif
+#if defined(OPENSSL)
+	if (net->ssl)
+		rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, count, buffers, buflens, frees);
+	else
+#endif
+		rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens, frees);
+		
+	if (rc == TCPSOCKET_COMPLETE)
+		time(&(net->lastSent));
+	
+	if (rc != TCPSOCKET_INTERRUPTED)
+	  free(buf);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Encodes the message length according to the MQTT algorithm
+ * @param buf the buffer into which the encoded data is written
+ * @param length the length to be encoded
+ * @return the number of bytes written to buffer
+ */
+int MQTTPacket_encode(char* buf, size_t length)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	do
+	{
+		char d = length % 128;
+		length /= 128;
+		/* if there are more digits to encode, set the top bit of this digit */
+		if (length > 0)
+			d |= 0x80;
+		buf[rc++] = d;
+	} while (length > 0);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Decodes the message length according to the MQTT algorithm
+ * @param socket the socket from which to read the bytes
+ * @param value the decoded length returned
+ * @return the number of bytes read from the socket
+ */
+int MQTTPacket_decode(networkHandles* net, size_t* value)
+{
+	int rc = SOCKET_ERROR;
+	char c;
+	int multiplier = 1;
+	int len = 0;
+#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
+
+	FUNC_ENTRY;
+	*value = 0;
+	do
+	{
+		if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+		{
+			rc = SOCKET_ERROR;	/* bad data */
+			goto exit;
+		}
+#if defined(OPENSSL)
+		rc = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, &c) : Socket_getch(net->socket, &c);
+#else
+		rc = Socket_getch(net->socket, &c);
+#endif
+		if (rc != TCPSOCKET_COMPLETE)
+				goto exit;
+		*value += (c & 127) * multiplier;
+		multiplier *= 128;
+	} while ((c & 128) != 0);
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Calculates an integer from two bytes read from the input buffer
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @return the integer value calculated
+ */
+int readInt(char** pptr)
+{
+	char* ptr = *pptr;
+	int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
+	*pptr += 2;
+	return len;
+}
+
+
+/**
+ * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec which really means
+ * a length delimited string.  So it reads the two byte length then the data according to
+ * that length.  The end of the buffer is provided too, so we can prevent buffer overruns caused
+ * by an incorrect length.
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @param enddata pointer to the end of the buffer not to be read beyond
+ * @param len returns the calculcated value of the length bytes read
+ * @return an allocated C string holding the characters read, or NULL if the length read would
+ * have caused an overrun.
+ *
+ */
+static char* readUTFlen(char** pptr, char* enddata, int* len)
+{
+	char* string = NULL;
+
+	FUNC_ENTRY;
+	if (enddata - (*pptr) > 1) /* enough length to read the integer? */
+	{
+		*len = readInt(pptr);
+		if (&(*pptr)[*len] <= enddata)
+		{
+			string = malloc(*len+1);
+			memcpy(string, *pptr, *len);
+			string[*len] = '\0';
+			*pptr += *len;
+		}
+	}
+	FUNC_EXIT;
+	return string;
+}
+
+
+/**
+ * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec which really means
+ * a length delimited string.  So it reads the two byte length then the data according to
+ * that length.  The end of the buffer is provided too, so we can prevent buffer overruns caused
+ * by an incorrect length.
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @param enddata pointer to the end of the buffer not to be read beyond
+ * @return an allocated C string holding the characters read, or NULL if the length read would
+ * have caused an overrun.
+ */
+char* readUTF(char** pptr, char* enddata)
+{
+	int len;
+	return readUTFlen(pptr, enddata, &len);
+}
+
+
+/**
+ * Reads one character from the input buffer.
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @return the character read
+ */
+unsigned char readChar(char** pptr)
+{
+	unsigned char c = **pptr;
+	(*pptr)++;
+	return c;
+}
+
+
+/**
+ * Writes one character to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param c the character to write
+ */
+void writeChar(char** pptr, char c)
+{
+	**pptr = c;
+	(*pptr)++;
+}
+
+
+/**
+ * Writes an integer as 2 bytes to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param anInt the integer to write
+ */
+void writeInt(char** pptr, int anInt)
+{
+	**pptr = (char)(anInt / 256);
+	(*pptr)++;
+	**pptr = (char)(anInt % 256);
+	(*pptr)++;
+}
+
+
+/**
+ * Writes a "UTF" string to an output buffer.  Converts C string to length-delimited.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param string the C string to write
+ */
+void writeUTF(char** pptr, const char* string)
+{
+	size_t len = strlen(string);
+	writeInt(pptr, (int)len);
+	memcpy(*pptr, string, len);
+	*pptr += len;
+}
+
+
+/**
+ * Writes length delimited data to an output buffer
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param data the data to write
+ * @param datalen the length of the data to write
+ */
+void writeData(char** pptr, const void* data, int datalen)
+{
+	writeInt(pptr, datalen);
+	memcpy(*pptr, data, datalen);
+	*pptr += datalen;
+}
+
+
+/**
+ * Function used in the new packets table to create packets which have only a header.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen)
+{
+	static unsigned char header = 0;
+	header = aHeader;
+	return &header;
+}
+
+
+/**
+ * Send an MQTT disconnect packet down a socket.
+ * @param socket the open socket to send the data to
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID)
+{
+	Header header;
+	int rc = 0;
+
+	FUNC_ENTRY;
+	header.byte = 0;
+	header.bits.type = DISCONNECT;
+	rc = MQTTPacket_send(net, header, NULL, 0, 0);
+	Log(LOG_PROTOCOL, 28, NULL, net->socket, clientID, rc);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create publish packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen)
+{
+	Publish* pack = malloc(sizeof(Publish));
+	char* curdata = data;
+	char* enddata = &data[datalen];
+
+	FUNC_ENTRY;
+	pack->header.byte = aHeader;
+	if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
+	{
+		free(pack);
+		pack = NULL;
+		goto exit;
+	}
+	if (pack->header.bits.qos > 0)  /* Msgid only exists for QoS 1 or 2 */
+		pack->msgId = readInt(&curdata);
+	else
+		pack->msgId = 0;
+	pack->payload = curdata;
+	pack->payloadlen = (int)(datalen-(curdata-data));
+exit:
+	FUNC_EXIT;
+	return pack;
+}
+
+
+/**
+ * Free allocated storage for a publish packet.
+ * @param pack pointer to the publish packet structure
+ */
+void MQTTPacket_freePublish(Publish* pack)
+{
+	FUNC_ENTRY;
+	if (pack->topic != NULL)
+		free(pack->topic);
+	free(pack);
+	FUNC_EXIT;
+}
+
+
+/**
+ * Send an MQTT acknowledgement packet down a socket.
+ * @param type the MQTT packet type e.g. SUBACK
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param net the network handle to send the data to
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net)
+{
+	Header header;
+	int rc;
+	char *buf = malloc(2);
+	char *ptr = buf;
+
+	FUNC_ENTRY;
+	header.byte = 0;
+	header.bits.type = type;
+	header.bits.dup = dup;
+	if (type == PUBREL)
+	    header.bits.qos = 1;
+	writeInt(&ptr, msgid);
+	if ((rc = MQTTPacket_send(net, header, buf, 2, 1)) != TCPSOCKET_INTERRUPTED)
+		free(buf);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Send an MQTT PUBACK packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	rc =  MQTTPacket_send_ack(PUBACK, msgid, 0, net);
+	Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Free allocated storage for a suback packet.
+ * @param pack pointer to the suback packet structure
+ */
+void MQTTPacket_freeSuback(Suback* pack)
+{
+	FUNC_ENTRY;
+	if (pack->qoss != NULL)
+		ListFree(pack->qoss);
+	free(pack);
+	FUNC_EXIT;
+}
+
+
+/**
+ * Send an MQTT PUBREC packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	rc =  MQTTPacket_send_ack(PUBREC, msgid, 0, net);
+	Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Send an MQTT PUBREL packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	rc = MQTTPacket_send_ack(PUBREL, msgid, dup, net);
+	Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Send an MQTT PUBCOMP packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	rc = MQTTPacket_send_ack(PUBCOMP, msgid, 0, net);
+	Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create acknowledgement packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen)
+{
+	Ack* pack = malloc(sizeof(Ack));
+	char* curdata = data;
+
+	FUNC_ENTRY;
+	pack->header.byte = aHeader;
+	pack->msgId = readInt(&curdata);
+	FUNC_EXIT;
+	return pack;
+}
+
+
+/**
+ * Send an MQTT PUBLISH packet down a socket.
+ * @param pack a structure from which to get some values to use, e.g topic, payload
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param qos the value to use for the MQTT QoS setting
+ * @param retained boolean - whether to set the MQTT retained flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID)
+{
+	Header header;
+	char *topiclen;
+	int rc = -1;
+
+	FUNC_ENTRY;
+	topiclen = malloc(2);
+
+	header.bits.type = PUBLISH;
+	header.bits.dup = dup;
+	header.bits.qos = qos;
+	header.bits.retain = retained;
+	if (qos > 0)
+	{
+		char *buf = malloc(2);
+		char *ptr = buf;
+		char* bufs[4] = {topiclen, pack->topic, buf, pack->payload};
+		size_t lens[4] = {2, strlen(pack->topic), 2, pack->payloadlen};
+		int frees[4] = {1, 0, 1, 0};
+
+		writeInt(&ptr, pack->msgId);
+		ptr = topiclen;
+		writeInt(&ptr, (int)lens[1]);
+		rc = MQTTPacket_sends(net, header, 4, bufs, lens, frees);
+		if (rc != TCPSOCKET_INTERRUPTED)
+			free(buf);
+	}
+	else
+	{
+		char* ptr = topiclen;
+		char* bufs[3] = {topiclen, pack->topic, pack->payload};
+		size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
+		int frees[3] = {1, 0, 0};
+
+		writeInt(&ptr, (int)lens[1]);
+		rc = MQTTPacket_sends(net, header, 3, bufs, lens, frees);
+	}
+	if (rc != TCPSOCKET_INTERRUPTED)
+		free(topiclen);
+	if (qos == 0)
+		Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc);
+	else
+		Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc,
+				min(20, pack->payloadlen), pack->payload);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Free allocated storage for a various packet tyoes
+ * @param pack pointer to the suback packet structure
+ */
+void MQTTPacket_free_packet(MQTTPacket* pack)
+{
+	FUNC_ENTRY;
+	if (pack->header.bits.type == PUBLISH)
+		MQTTPacket_freePublish((Publish*)pack);
+	/*else if (pack->header.type == SUBSCRIBE)
+		MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
+	else if (pack->header.type == UNSUBSCRIBE)
+		MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
+	else
+		free(pack);
+	FUNC_EXIT;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.h b/thirdparty/paho.mqtt.c/src/MQTTPacket.h
new file mode 100644
index 0000000..8bad955
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.h
@@ -0,0 +1,262 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ *    Ian Craggs - big endian Linux reversed definition
+ *******************************************************************************/
+
+#if !defined(MQTTPACKET_H)
+#define MQTTPACKET_H
+
+#include "Socket.h"
+#if defined(OPENSSL)
+#include "SSLSocket.h"
+#endif
+#include "LinkedList.h"
+#include "Clients.h"
+
+/*BE
+include "Socket"
+include "LinkedList"
+include "Clients"
+BE*/
+
+typedef unsigned int bool;
+typedef void* (*pf)(unsigned char, char*, size_t);
+
+#define BAD_MQTT_PACKET -4
+
+enum msgTypes
+{
+	CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
+	PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
+	PINGREQ, PINGRESP, DISCONNECT
+};
+
+#if defined(__linux__)
+#include <endian.h>
+#if __BYTE_ORDER == __BIG_ENDIAN
+	#define REVERSED 1
+#endif
+#endif
+
+/**
+ * Bitfields for the MQTT header byte.
+ */
+typedef union
+{
+	/*unsigned*/ char byte;	/**< the whole byte */
+#if defined(REVERSED)
+	struct
+	{
+		unsigned int type : 4;	/**< message type nibble */
+		bool dup : 1;			/**< DUP flag bit */
+		unsigned int qos : 2;	/**< QoS value, 0, 1 or 2 */
+		bool retain : 1;		/**< retained flag bit */
+	} bits;
+#else
+	struct
+	{
+		bool retain : 1;		/**< retained flag bit */
+		unsigned int qos : 2;	/**< QoS value, 0, 1 or 2 */
+		bool dup : 1;			/**< DUP flag bit */
+		unsigned int type : 4;	/**< message type nibble */
+	} bits;
+#endif
+} Header;
+
+
+/**
+ * Data for a connect packet.
+ */
+typedef struct
+{
+	Header header;	/**< MQTT header byte */
+	union
+	{
+		unsigned char all;	/**< all connect flags */
+#if defined(REVERSED)
+		struct
+		{
+			bool username : 1;			/**< 3.1 user name */
+			bool password : 1; 			/**< 3.1 password */
+			bool willRetain : 1;		/**< will retain setting */
+			unsigned int willQoS : 2;	/**< will QoS value */
+			bool will : 1;			/**< will flag */
+			bool cleanstart : 1;	/**< cleansession flag */
+			int : 1;	/**< unused */
+		} bits;
+#else
+		struct
+		{
+			int : 1;	/**< unused */
+			bool cleanstart : 1;	/**< cleansession flag */
+			bool will : 1;			/**< will flag */
+			unsigned int willQoS : 2;	/**< will QoS value */
+			bool willRetain : 1;		/**< will retain setting */
+			bool password : 1; 			/**< 3.1 password */
+			bool username : 1;			/**< 3.1 user name */
+		} bits;
+#endif
+	} flags;	/**< connect flags byte */
+
+	char *Protocol, /**< MQTT protocol name */
+		*clientID,	/**< string client id */
+        *willTopic,	/**< will topic */
+        *willMsg;	/**< will payload */
+
+	int keepAliveTimer;		/**< keepalive timeout value in seconds */
+	unsigned char version;	/**< MQTT version number */
+} Connect;
+
+
+/**
+ * Data for a connack packet.
+ */
+typedef struct
+{
+	Header header; /**< MQTT header byte */
+	union
+	{
+		unsigned char all;	/**< all connack flags */
+#if defined(REVERSED)
+		struct
+		{
+			unsigned int reserved : 7;	/**< message type nibble */
+			bool sessionPresent : 1;    /**< was a session found on the server? */
+		} bits;
+#else
+		struct
+		{
+			bool sessionPresent : 1;    /**< was a session found on the server? */
+			unsigned int reserved : 7;	/**< message type nibble */
+		} bits;
+#endif
+	} flags;	 /**< connack flags byte */
+	char rc; /**< connack return code */
+} Connack;
+
+
+/**
+ * Data for a packet with header only.
+ */
+typedef struct
+{
+	Header header;	/**< MQTT header byte */
+} MQTTPacket;
+
+
+/**
+ * Data for a subscribe packet.
+ */
+typedef struct
+{
+	Header header;	/**< MQTT header byte */
+	int msgId;		/**< MQTT message id */
+	List* topics;	/**< list of topic strings */
+	List* qoss;		/**< list of corresponding QoSs */
+	int noTopics;	/**< topic and qos count */
+} Subscribe;
+
+
+/**
+ * Data for a suback packet.
+ */
+typedef struct
+{
+	Header header;	/**< MQTT header byte */
+	int msgId;		/**< MQTT message id */
+	List* qoss;		/**< list of granted QoSs */
+} Suback;
+
+
+/**
+ * Data for an unsubscribe packet.
+ */
+typedef struct
+{
+	Header header;	/**< MQTT header byte */
+	int msgId;		/**< MQTT message id */
+	List* topics;	/**< list of topic strings */
+	int noTopics;	/**< topic count */
+} Unsubscribe;
+
+
+/**
+ * Data for a publish packet.
+ */
+typedef struct
+{
+	Header header;	/**< MQTT header byte */
+	char* topic;	/**< topic string */
+	int topiclen;
+	int msgId;		/**< MQTT message id */
+	char* payload;	/**< binary payload, length delimited */
+	int payloadlen;	/**< payload length */
+} Publish;
+
+
+/**
+ * Data for one of the ack packets.
+ */
+typedef struct
+{
+	Header header;	/**< MQTT header byte */
+	int msgId;		/**< MQTT message id */
+} Ack;
+
+typedef Ack Puback;
+typedef Ack Pubrec;
+typedef Ack Pubrel;
+typedef Ack Pubcomp;
+typedef Ack Unsuback;
+
+int MQTTPacket_encode(char* buf, size_t length);
+int MQTTPacket_decode(networkHandles* net, size_t* value);
+int readInt(char** pptr);
+char* readUTF(char** pptr, char* enddata);
+unsigned char readChar(char** pptr);
+void writeChar(char** pptr, char c);
+void writeInt(char** pptr, int anInt);
+void writeUTF(char** pptr, const char* string);
+void writeData(char** pptr, const void* data, int datalen);
+
+const char* MQTTPacket_name(int ptype);
+
+void* MQTTPacket_Factory(networkHandles* net, int* error);
+int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int free);
+int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees);
+
+void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen);
+int MQTTPacket_send_disconnect(networkHandles* net, const char* clientID);
+
+void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen);
+void MQTTPacket_freePublish(Publish* pack);
+int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID);
+int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID);
+void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen);
+
+void MQTTPacket_freeSuback(Suback* pack);
+int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID);
+int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID);
+int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID);
+
+void MQTTPacket_free_packet(MQTTPacket* pack);
+
+#if !defined(NO_BRIDGE)
+	#include "MQTTPacketOut.h"
+#endif
+
+#endif /* MQTTPACKET_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
new file mode 100644
index 0000000..b924085
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
@@ -0,0 +1,269 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ *    Rong Xiang, Ian Craggs - C++ compatibility
+ *    Ian Craggs - binary password and will payload
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief functions to deal with reading and writing of MQTT packets from and to sockets
+ *
+ * Some other related functions are in the MQTTPacket module
+ */
+
+
+#include "MQTTPacketOut.h"
+#include "Log.h"
+#include "StackTrace.h"
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "Heap.h"
+
+
+/**
+ * Send an MQTT CONNECT packet down a socket.
+ * @param client a structure from which to get all the required values
+ * @param MQTTVersion the MQTT version to connect with
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
+{
+	char *buf, *ptr;
+	Connect packet;
+	int rc = -1, len;
+
+	FUNC_ENTRY;
+	packet.header.byte = 0;
+	packet.header.bits.type = CONNECT;
+
+	len = ((MQTTVersion == 3) ? 12 : 10) + (int)strlen(client->clientID)+2;
+	if (client->will)
+		len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
+	if (client->username)
+		len += (int)strlen(client->username)+2;
+	if (client->password)
+		len += client->passwordlen+2;
+
+	ptr = buf = malloc(len);
+	if (MQTTVersion == 3)
+	{
+		writeUTF(&ptr, "MQIsdp");
+		writeChar(&ptr, (char)3);
+	}
+	else if (MQTTVersion == 4)
+	{
+		writeUTF(&ptr, "MQTT");
+		writeChar(&ptr, (char)4);
+	}
+	else
+		goto exit;
+
+	packet.flags.all = 0;
+	packet.flags.bits.cleanstart = client->cleansession;
+	packet.flags.bits.will = (client->will) ? 1 : 0;
+	if (packet.flags.bits.will)
+	{
+		packet.flags.bits.willQoS = client->will->qos;
+		packet.flags.bits.willRetain = client->will->retained;
+	}
+
+	if (client->username)
+		packet.flags.bits.username = 1;
+	if (client->password)
+		packet.flags.bits.password = 1;
+
+	writeChar(&ptr, packet.flags.all);
+	writeInt(&ptr, client->keepAliveInterval);
+	writeUTF(&ptr, client->clientID);
+	if (client->will)
+	{
+		writeUTF(&ptr, client->will->topic);
+		writeData(&ptr, client->will->payload, client->will->payloadlen);
+	}
+	if (client->username)
+		writeUTF(&ptr, client->username);
+	if (client->password)
+		writeData(&ptr, client->password, client->passwordlen);
+
+	rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1);
+	Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID, client->cleansession, rc);
+exit:
+	if (rc != TCPSOCKET_INTERRUPTED)
+		free(buf);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create connack packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen)
+{
+	Connack* pack = malloc(sizeof(Connack));
+	char* curdata = data;
+
+	FUNC_ENTRY;
+	pack->header.byte = aHeader;
+	pack->flags.all = readChar(&curdata);
+	pack->rc = readChar(&curdata);
+	FUNC_EXIT;
+	return pack;
+}
+
+
+/**
+ * Send an MQTT PINGREQ packet down a socket.
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
+{
+	Header header;
+	int rc = 0;
+	size_t buflen = 0;
+
+	FUNC_ENTRY;
+	header.byte = 0;
+	header.bits.type = PINGREQ;
+	rc = MQTTPacket_send(net, header, NULL, buflen,0);
+	Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Send an MQTT subscribe packet down a socket.
+ * @param topics list of topics
+ * @param qoss list of corresponding QoSs
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID)
+{
+	Header header;
+	char *data, *ptr;
+	int rc = -1;
+	ListElement *elem = NULL, *qosElem = NULL;
+	int datalen;
+
+	FUNC_ENTRY;
+	header.bits.type = SUBSCRIBE;
+	header.bits.dup = dup;
+	header.bits.qos = 1;
+	header.bits.retain = 0;
+
+	datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
+	while (ListNextElement(topics, &elem))
+		datalen += (int)strlen((char*)(elem->content));
+	ptr = data = malloc(datalen);
+
+	writeInt(&ptr, msgid);
+	elem = NULL;
+	while (ListNextElement(topics, &elem))
+	{
+		ListNextElement(qoss, &qosElem);
+		writeUTF(&ptr, (char*)(elem->content));
+		writeChar(&ptr, *(int*)(qosElem->content));
+	}
+	rc = MQTTPacket_send(net, header, data, datalen, 1);
+	Log(LOG_PROTOCOL, 22, NULL, net->socket, clientID, msgid, rc);
+	if (rc != TCPSOCKET_INTERRUPTED)
+		free(data);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create suback packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen)
+{
+	Suback* pack = malloc(sizeof(Suback));
+	char* curdata = data;
+
+	FUNC_ENTRY;
+	pack->header.byte = aHeader;
+	pack->msgId = readInt(&curdata);
+	pack->qoss = ListInitialize();
+	while ((size_t)(curdata - data) < datalen)
+	{
+		int* newint;
+		newint = malloc(sizeof(int));
+		*newint = (int)readChar(&curdata);
+		ListAppend(pack->qoss, newint, sizeof(int));
+	}
+	FUNC_EXIT;
+	return pack;
+}
+
+
+/**
+ * Send an MQTT unsubscribe packet down a socket.
+ * @param topics list of topics
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID)
+{
+	Header header;
+	char *data, *ptr;
+	int rc = -1;
+	ListElement *elem = NULL;
+	int datalen;
+
+	FUNC_ENTRY;
+	header.bits.type = UNSUBSCRIBE;
+	header.bits.dup = dup;
+	header.bits.qos = 1;
+	header.bits.retain = 0;
+
+	datalen = 2 + topics->count * 2; /* utf length == 2 */
+	while (ListNextElement(topics, &elem))
+		datalen += (int)strlen((char*)(elem->content));
+	ptr = data = malloc(datalen);
+
+	writeInt(&ptr, msgid);
+	elem = NULL;
+	while (ListNextElement(topics, &elem))
+		writeUTF(&ptr, (char*)(elem->content));
+	rc = MQTTPacket_send(net, header, data, datalen, 1);
+	Log(LOG_PROTOCOL, 25, NULL, net->socket, clientID, msgid, rc);
+	if (rc != TCPSOCKET_INTERRUPTED)
+		free(data);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
new file mode 100644
index 0000000..700db77
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ *******************************************************************************/
+
+#if !defined(MQTTPACKETOUT_H)
+#define MQTTPACKETOUT_H
+
+#include "MQTTPacket.h"
+
+int MQTTPacket_send_connect(Clients* client, int MQTTVersion);
+void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen);
+
+int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID);
+
+int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID);
+void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen);
+
+int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID);
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.c b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
new file mode 100644
index 0000000..24efb6d
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
@@ -0,0 +1,654 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - async client updates
+ *    Ian Craggs - fix for bug 432903 - queue persistence
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief Functions that apply to persistence operations.
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include "MQTTPersistence.h"
+#include "MQTTPersistenceDefault.h"
+#include "MQTTProtocolClient.h"
+#include "Heap.h"
+
+
+static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen);
+static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
+
+/**
+ * Creates a ::MQTTClient_persistence structure representing a persistence implementation.
+ * @param persistence the ::MQTTClient_persistence structure.
+ * @param type the type of the persistence implementation. See ::MQTTClient_create.
+ * @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+#include "StackTrace.h"
+
+int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
+{
+	int rc = 0;
+	MQTTClient_persistence* per = NULL;
+
+	FUNC_ENTRY;
+#if !defined(NO_PERSISTENCE)
+	switch (type)
+	{
+		case MQTTCLIENT_PERSISTENCE_NONE :
+			per = NULL;
+			break;
+		case MQTTCLIENT_PERSISTENCE_DEFAULT :
+			per = malloc(sizeof(MQTTClient_persistence));
+			if ( per != NULL )
+			{
+				if ( pcontext != NULL )
+				{
+					per->context = malloc(strlen(pcontext) + 1);
+					strcpy(per->context, pcontext);
+				}
+				else
+					per->context = ".";  /* working directory */
+				/* file system functions */
+				per->popen        = pstopen;
+				per->pclose       = pstclose;
+				per->pput         = pstput;
+				per->pget         = pstget;
+				per->premove      = pstremove;
+				per->pkeys        = pstkeys;
+				per->pclear       = pstclear;
+				per->pcontainskey = pstcontainskey;
+			}
+			else
+				rc = MQTTCLIENT_PERSISTENCE_ERROR;
+			break;
+		case MQTTCLIENT_PERSISTENCE_USER :
+			per = (MQTTClient_persistence *)pcontext;
+			if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
+				per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
+				per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
+				rc = MQTTCLIENT_PERSISTENCE_ERROR;
+			break;
+		default:
+			rc = MQTTCLIENT_PERSISTENCE_ERROR;
+			break;
+	}
+#endif
+
+	*persistence = per;
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Open persistent store and restore any persisted messages.
+ * @param client the client as ::Clients.
+ * @param serverURI the URI of the remote end.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_initialize(Clients *c, const char *serverURI)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	if ( c->persistence != NULL )
+	{
+		rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
+		if ( rc == 0 )
+			rc = MQTTPersistence_restore(c);
+	}
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Close persistent store.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_close(Clients *c)
+{
+	int rc =0;
+
+	FUNC_ENTRY;
+	if (c->persistence != NULL)
+	{
+		rc = c->persistence->pclose(c->phandle);
+		c->phandle = NULL;
+#if !defined(NO_PERSISTENCE)
+		if ( c->persistence->popen == pstopen )
+			free(c->persistence);
+#endif
+		c->persistence = NULL;
+	}
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+/**
+ * Clears the persistent store.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_clear(Clients *c)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	if (c->persistence != NULL)
+		rc = c->persistence->pclear(c->phandle);
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Restores the persisted records to the outbound and inbound message queues of the
+ * client.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_restore(Clients *c)
+{
+	int rc = 0;
+	char **msgkeys = NULL,
+		 *buffer = NULL;
+	int nkeys, buflen;
+	int i = 0;
+	int msgs_sent = 0;
+	int msgs_rcvd = 0;
+
+	FUNC_ENTRY;
+	if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
+	{
+		while (rc == 0 && i < nkeys)
+		{
+			if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0)
+			{
+				;
+			}
+			else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0)
+			{
+				;
+			}
+			else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
+			{
+				MQTTPacket* pack = MQTTPersistence_restorePacket(buffer, buflen);
+				if ( pack != NULL )
+				{
+					if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_RECEIVED) != NULL )
+					{
+						Publish* publish = (Publish*)pack;
+						Messages* msg = NULL;
+						msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain);
+						msg->nextMessageType = PUBREL;
+						/* order does not matter for persisted received messages */
+						ListAppend(c->inboundMsgs, msg, msg->len);
+						publish->topic = NULL;
+						MQTTPacket_freePublish(publish);
+						msgs_rcvd++;
+					}
+					else if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_SENT) != NULL )
+					{
+						Publish* publish = (Publish*)pack;
+						Messages* msg = NULL;
+						char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+						sprintf(key, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
+						msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain);
+						if ( c->persistence->pcontainskey(c->phandle, key) == 0 )
+							/* PUBLISH Qo2 and PUBREL sent */
+							msg->nextMessageType = PUBCOMP;
+						/* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
+						/* retry at the first opportunity */
+						msg->lastTouch = 0;
+						MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
+						publish->topic = NULL;
+						MQTTPacket_freePublish(publish);
+						free(key);
+						msgs_sent++;
+					}
+					else if ( strstr(msgkeys[i],PERSISTENCE_PUBREL) != NULL )
+					{
+						/* orphaned PUBRELs ? */
+						Pubrel* pubrel = (Pubrel*)pack;
+						char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+						sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
+						if ( c->persistence->pcontainskey(c->phandle, key) != 0 )
+							rc = c->persistence->premove(c->phandle, msgkeys[i]);
+						free(pubrel);
+						free(key);
+					}
+				}
+				else  /* pack == NULL -> bad persisted record */
+					rc = c->persistence->premove(c->phandle, msgkeys[i]);
+			}
+			if (buffer)
+			{
+				free(buffer);
+				buffer = NULL;
+			}
+			if (msgkeys[i])
+				free(msgkeys[i]);
+			i++;
+		}
+		if (msgkeys)
+			free(msgkeys);
+	}
+	Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n", 
+		msgs_sent, msgs_rcvd, c->clientID);
+	MQTTPersistence_wrapMsgID(c);
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Returns a MQTT packet restored from persisted data.
+ * @param buffer the persisted data.
+ * @param buflen the number of bytes of the data buffer.
+ */
+void* MQTTPersistence_restorePacket(char* buffer, size_t buflen)
+{
+	void* pack = NULL;
+	Header header;
+	int fixed_header_length = 1, ptype, remaining_length = 0;
+	char c;
+	int multiplier = 1;
+	extern pf new_packets[];
+
+	FUNC_ENTRY;
+	header.byte = buffer[0];
+	/* decode the message length according to the MQTT algorithm */
+	do
+	{
+		c = *(++buffer);
+		remaining_length += (c & 127) * multiplier;
+		multiplier *= 128;
+		fixed_header_length++;
+	} while ((c & 128) != 0);
+
+	if ( (fixed_header_length + remaining_length) == buflen )
+	{
+		ptype = header.bits.type;
+		if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
+			pack = (*new_packets[ptype])(header.byte, ++buffer, remaining_length);
+	}
+
+	FUNC_EXIT;
+	return pack;
+}
+
+
+/**
+ * Inserts the specified message into the list, maintaining message ID order.
+ * @param list the list to insert the message into.
+ * @param content the message to add.
+ * @param size size of the message.
+ */
+void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
+{
+	ListElement* index = NULL;
+	ListElement* current = NULL;
+
+	FUNC_ENTRY;
+	while(ListNextElement(list, &current) != NULL && index == NULL)
+	{
+		if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
+			index = current;
+	}
+
+	ListInsert(list, content, size, index);
+	FUNC_EXIT;
+}
+
+
+/**
+ * Adds a record to the persistent store. This function must not be called for QoS0
+ * messages.
+ * @param socket the socket of the client.
+ * @param buf0 fixed header.
+ * @param buf0len length of the fixed header.
+ * @param count number of buffers representing the variable header and/or the payload.
+ * @param buffers the buffers representing the variable header and/or the payload.
+ * @param buflens length of the buffers representing the variable header and/or the payload.
+ * @param msgId the message ID.
+ * @param scr 0 indicates message in the sending direction; 1 indicates message in the
+ * receiving direction.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count,
+								 char** buffers, size_t* buflens, int htype, int msgId, int scr )
+{
+	int rc = 0;
+	extern ClientStates* bstate;
+	int nbufs, i;
+	int* lens = NULL;
+	char** bufs = NULL;
+	char *key;
+	Clients* client = NULL;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
+	if (client->persistence != NULL)
+	{
+		key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+		nbufs = 1 + count;
+		lens = (int *)malloc(nbufs * sizeof(int));
+		bufs = (char **)malloc(nbufs * sizeof(char *));
+		lens[0] = (int)buf0len;
+		bufs[0] = buf0;
+		for (i = 0; i < count; i++)
+		{
+			lens[i+1] = (int)buflens[i];
+			bufs[i+1] = buffers[i];
+		}
+
+		/* key */
+		if ( scr == 0 )
+		{  /* sending */
+			if (htype == PUBLISH)   /* PUBLISH QoS1 and QoS2*/
+				sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId);
+			if (htype == PUBREL)  /* PUBREL */
+				sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId);
+		}
+		if ( scr == 1 )  /* receiving PUBLISH QoS2 */
+			sprintf(key, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId);
+
+		rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
+
+		free(key);
+		free(lens);
+		free(bufs);
+	}
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Deletes a record from the persistent store.
+ * @param client the client as ::Clients.
+ * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
+ * or #PERSISTENCE_PUBLISH_RECEIVED.
+ * @param qos the qos field of the message.
+ * @param msgId the message ID.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	if (c->persistence != NULL)
+	{
+		char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+		if ( (strcmp(type,PERSISTENCE_PUBLISH_SENT) == 0) && qos == 2 )
+		{
+			sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId) ;
+			rc = c->persistence->premove(c->phandle, key);
+			sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId) ;
+			rc = c->persistence->premove(c->phandle, key);
+		}
+		else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
+		{    /* or PERSISTENCE_PUBLISH_RECEIVED */
+			sprintf(key, "%s%d", type, msgId) ;
+			rc = c->persistence->premove(c->phandle, key);
+		}
+		free(key);
+	}
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
+ * message IDs in the outboundMsgs queue.
+ * @param client the client as ::Clients.
+ */
+void MQTTPersistence_wrapMsgID(Clients *client)
+{
+	ListElement* wrapel = NULL;
+	ListElement* current = NULL;
+
+	FUNC_ENTRY;
+	if ( client->outboundMsgs->count > 0 )
+	{
+		int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
+		int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
+		int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
+		current = ListNextElement(client->outboundMsgs, &current);
+
+		while(ListNextElement(client->outboundMsgs, &current) != NULL)
+		{
+			int curMsgID = ((Messages*)current->content)->msgid;
+			int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
+			int curgap = curMsgID - curPrevMsgID;
+			if ( curgap > gap )
+			{
+				gap = curgap;
+				wrapel = current;
+			}
+		}
+	}
+
+	if ( wrapel != NULL )
+	{
+		/* put wrapel at the beginning of the queue */
+		client->outboundMsgs->first->prev = client->outboundMsgs->last;
+		client->outboundMsgs->last->next = client->outboundMsgs->first;
+		client->outboundMsgs->first = wrapel;
+		client->outboundMsgs->last = wrapel->prev;
+		client->outboundMsgs->first->prev = NULL;
+		client->outboundMsgs->last->next = NULL;
+	}
+	FUNC_EXIT;
+}
+
+
+#if !defined(NO_PERSISTENCE)
+int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
+{
+	int rc = 0;
+	char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
+	
+	FUNC_ENTRY;
+	sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
+	if ((rc = client->persistence->premove(client->phandle, key)) != 0)
+		Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
+{
+	int rc = 0;
+	int nbufs = 8;
+	int bufindex = 0;
+	char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
+	int* lens = NULL;
+	void** bufs = NULL;
+		
+	FUNC_ENTRY;
+	lens = (int*)malloc(nbufs * sizeof(int));
+	bufs = malloc(nbufs * sizeof(char *));
+						
+	bufs[bufindex] = &qe->msg->payloadlen;
+	lens[bufindex++] = sizeof(qe->msg->payloadlen);
+				
+	bufs[bufindex] = qe->msg->payload;
+	lens[bufindex++] = qe->msg->payloadlen;
+		
+	bufs[bufindex] = &qe->msg->qos;
+	lens[bufindex++] = sizeof(qe->msg->qos);
+		
+	bufs[bufindex] = &qe->msg->retained;
+	lens[bufindex++] = sizeof(qe->msg->retained);
+		
+	bufs[bufindex] = &qe->msg->dup;
+	lens[bufindex++] = sizeof(qe->msg->dup);
+				
+	bufs[bufindex] = &qe->msg->msgid;
+	lens[bufindex++] = sizeof(qe->msg->msgid);
+						
+	bufs[bufindex] = qe->topicName;
+	lens[bufindex++] = (int)strlen(qe->topicName) + 1;
+				
+	bufs[bufindex] = &qe->topicLen;
+	lens[bufindex++] = sizeof(qe->topicLen);			
+		
+	sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno);	
+	qe->seqno = aclient->qentry_seqno;
+
+	if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, (char**)bufs, lens)) != 0)
+		Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
+
+	free(lens);
+	free(bufs);
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen)
+{
+	MQTTPersistence_qEntry* qe = NULL;
+	char* ptr = buffer;
+	int data_size;
+	
+	FUNC_ENTRY;
+	qe = malloc(sizeof(MQTTPersistence_qEntry));
+	memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
+	
+	qe->msg = malloc(sizeof(MQTTPersistence_message));
+	memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
+	
+	qe->msg->payloadlen = *(int*)ptr;
+	ptr += sizeof(int);
+	
+	data_size = qe->msg->payloadlen;
+	qe->msg->payload = malloc(data_size);
+	memcpy(qe->msg->payload, ptr, data_size);
+	ptr += data_size;
+	
+	qe->msg->qos = *(int*)ptr;
+	ptr += sizeof(int);
+	
+	qe->msg->retained = *(int*)ptr;
+	ptr += sizeof(int);
+	
+	qe->msg->dup = *(int*)ptr;
+	ptr += sizeof(int);
+	
+	qe->msg->msgid = *(int*)ptr;
+	ptr += sizeof(int);
+	
+	data_size = (int)strlen(ptr) + 1;	
+	qe->topicName = malloc(data_size);
+	strcpy(qe->topicName, ptr);
+	ptr += data_size;
+	
+	qe->topicLen = *(int*)ptr;
+	ptr += sizeof(int);
+
+	FUNC_EXIT;
+	return qe;
+}
+
+
+static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
+{
+	ListElement* index = NULL;
+	ListElement* current = NULL;
+
+	FUNC_ENTRY;
+	while (ListNextElement(list, &current) != NULL && index == NULL)
+	{
+		if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
+			index = current;
+	}
+	ListInsert(list, qEntry, size, index);
+	FUNC_EXIT;
+}
+
+
+/**
+ * Restores a queue of messages from persistence to memory
+ * @param c the client as ::Clients - the client object to restore the messages to
+ * @return return code, 0 if successful
+ */
+int MQTTPersistence_restoreMessageQueue(Clients* c)
+{
+	int rc = 0;
+	char **msgkeys;
+	int nkeys;
+	int i = 0;
+	int entries_restored = 0;
+
+	FUNC_ENTRY;
+	if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
+	{
+		while (rc == 0 && i < nkeys)
+		{
+			char *buffer = NULL;
+			int buflen;
+					
+			if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0)
+			{
+				;
+			}
+			else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
+			{
+				MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen);
+				
+				if (qe)
+				{	
+					qe->seqno = atoi(msgkeys[i]+2);
+					MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
+					free(buffer);
+					c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
+					entries_restored++;
+				}
+			}
+			if (msgkeys[i])
+			{
+				free(msgkeys[i]);
+			}
+			i++;
+		}
+		if (msgkeys != NULL)
+			free(msgkeys);
+	}
+	Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.h b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
new file mode 100644
index 0000000..9a938ba
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - async client updates
+ *    Ian Craggs - fix for bug 432903 - queue persistence
+ *******************************************************************************/
+
+#if defined(__cplusplus)
+ extern "C" {
+#endif
+
+#include "Clients.h"
+
+/** Stem of the key for a sent PUBLISH QoS1 or QoS2 */
+#define PERSISTENCE_PUBLISH_SENT "s-"
+/** Stem of the key for a sent PUBREL */
+#define PERSISTENCE_PUBREL "sc-"
+/** Stem of the key for a received PUBLISH QoS2 */
+#define PERSISTENCE_PUBLISH_RECEIVED "r-"
+/** Stem of the key for an async client command */
+#define PERSISTENCE_COMMAND_KEY "c-"
+/** Stem of the key for an async client message queue */
+#define PERSISTENCE_QUEUE_KEY "q-"
+#define PERSISTENCE_MAX_KEY_LENGTH 8
+
+int MQTTPersistence_create(MQTTClient_persistence** per, int type, void* pcontext);
+int MQTTPersistence_initialize(Clients* c, const char* serverURI);
+int MQTTPersistence_close(Clients* c);
+int MQTTPersistence_clear(Clients* c);
+int MQTTPersistence_restore(Clients* c);
+void* MQTTPersistence_restorePacket(char* buffer, size_t buflen);
+void MQTTPersistence_insertInOrder(List* list, void* content, size_t size);
+int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count, 
+								 char** buffers, size_t* buflens, int htype, int msgId, int scr);
+int MQTTPersistence_remove(Clients* c, char* type, int qos, int msgId);
+void MQTTPersistence_wrapMsgID(Clients *c);
+
+typedef struct
+{
+	char struct_id[4];
+	int struct_version;
+	int payloadlen;
+	void* payload;
+	int qos;
+	int retained;
+	int dup;
+	int msgid;
+} MQTTPersistence_message;
+
+typedef struct
+{
+	MQTTPersistence_message* msg;
+	char* topicName;
+	int topicLen;
+	unsigned int seqno; /* only used on restore */
+} MQTTPersistence_qEntry;
+
+int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe);
+int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe);
+int MQTTPersistence_restoreMessageQueue(Clients* c);
+#ifdef __cplusplus
+     }
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
new file mode 100644
index 0000000..35c1f53
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
@@ -0,0 +1,841 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2016 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - async client updates
+ *    Ian Craggs - fix for bug 484496
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief A file system based persistence implementation.
+ *
+ * A directory is specified when the MQTT client is created. When the persistence is then
+ * opened (see ::Persistence_open), a sub-directory is made beneath the base for this
+ * particular client ID and connection key. This allows one persistence base directory to
+ * be shared by multiple clients.
+ *
+ */
+
+#if !defined(NO_PERSISTENCE)
+
+#include "OsWrapper.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#if defined(WIN32) || defined(WIN64)
+	#include <direct.h>
+	/* Windows doesn't have strtok_r, so remap it to strtok */
+	#define strtok_r( A, B, C ) strtok( A, B )
+	int keysWin32(char *, char ***, int *);
+	int clearWin32(char *);
+	int containskeyWin32(char *, char *);
+#else
+	#include <sys/stat.h>
+	#include <dirent.h>
+	#include <unistd.h>
+	int keysUnix(char *, char ***, int *);
+	int clearUnix(char *);
+	int containskeyUnix(char *, char *);
+#endif
+
+#include "MQTTClientPersistence.h"
+#include "MQTTPersistenceDefault.h"
+#include "StackTrace.h"
+#include "Heap.h"
+
+/** Create persistence directory for the client: context/clientID-serverURI.
+ *  See ::Persistence_open
+ */
+
+int pstopen(void **handle, const char* clientID, const char* serverURI, void* context)
+{
+	int rc = 0;
+	char *dataDir = context;
+	char *clientDir;
+	char *pToken = NULL;
+	char *save_ptr = NULL;
+	char *pCrtDirName = NULL;
+	char *pTokDirName = NULL;
+	char *perserverURI = NULL, *ptraux;
+
+	FUNC_ENTRY;
+	/* Note that serverURI=address:port, but ":" not allowed in Windows directories */
+	perserverURI = malloc(strlen(serverURI) + 1);
+	strcpy(perserverURI, serverURI);
+	while ((ptraux = strstr(perserverURI, ":")) != NULL)
+		*ptraux = '-' ;
+
+	/* consider '/'  +  '-'  +  '\0' */
+	clientDir = malloc(strlen(dataDir) + strlen(clientID) + strlen(perserverURI) + 3);
+	sprintf(clientDir, "%s/%s-%s", dataDir, clientID, perserverURI);
+
+
+	/* create clientDir directory */
+
+	/* pCrtDirName - holds the directory name we are currently trying to create.           */
+	/*               This gets built up level by level until the full path name is created.*/
+	/* pTokDirName - holds the directory name that gets used by strtok.         */
+	pCrtDirName = (char*)malloc( strlen(clientDir) + 1 );
+	pTokDirName = (char*)malloc( strlen(clientDir) + 1 );
+	strcpy( pTokDirName, clientDir );
+
+	pToken = strtok_r( pTokDirName, "\\/", &save_ptr );
+
+	strcpy( pCrtDirName, pToken );
+	rc = pstmkdir( pCrtDirName );
+	pToken = strtok_r( NULL, "\\/", &save_ptr );
+	while ( (pToken != NULL) && (rc == 0) )
+	{
+		/* Append the next directory level and try to create it */
+		strcat( pCrtDirName, "/" );
+		strcat( pCrtDirName, pToken );
+		rc = pstmkdir( pCrtDirName );
+		pToken = strtok_r( NULL, "\\/", &save_ptr );
+	}
+
+	*handle = clientDir;
+
+	free(pTokDirName);
+	free(pCrtDirName);
+	free(perserverURI);
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+/** Function to create a directory.
+ * Returns 0 on success or if the directory already exists.
+ */
+int pstmkdir( char *pPathname )
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+	if ( _mkdir( pPathname ) != 0 )
+	{
+#else
+	/* Create a directory with read, write and execute access for the owner and read access for the group */
+#if !defined(_WRS_KERNEL)
+	if ( mkdir( pPathname, S_IRWXU | S_IRGRP ) != 0 )
+#else
+	if ( mkdir( pPathname ) != 0 )
+#endif /* !defined(_WRS_KERNEL) */
+	{
+#endif
+		if ( errno != EEXIST )
+			rc = MQTTCLIENT_PERSISTENCE_ERROR;
+	}
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+
+/** Write wire message to the client persistence directory.
+ *  See ::Persistence_put
+ */
+int pstput(void* handle, char* key, int bufcount, char* buffers[], int buflens[])
+{
+	int rc = 0;
+	char *clientDir = handle;
+	char *file;
+	FILE *fp;
+	size_t bytesWritten = 0,
+	       bytesTotal = 0;
+	int i;
+
+	FUNC_ENTRY;
+	if (clientDir == NULL)
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+	/* consider '/' + '\0' */
+	file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2 );
+	sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+	fp = fopen(file, "wb");
+	if ( fp != NULL )
+	{
+		for(i=0; i<bufcount; i++)
+		{
+			bytesTotal += buflens[i];
+			bytesWritten += fwrite(buffers[i], sizeof(char), buflens[i], fp );
+		}
+		fclose(fp);
+		fp = NULL;
+	} else
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+	if (bytesWritten != bytesTotal)
+	{
+		pstremove(handle, key);
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+	}
+
+	free(file);
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+};
+
+
+/** Retrieve a wire message from the client persistence directory.
+ *  See ::Persistence_get
+ */
+int pstget(void* handle, char* key, char** buffer, int* buflen)
+{
+	int rc = 0;
+	FILE *fp;
+	char *clientDir = handle;
+	char *file;
+	char *buf;
+	unsigned long fileLen = 0;
+	unsigned long bytesRead = 0;
+
+	FUNC_ENTRY;
+	if (clientDir == NULL)
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+	/* consider '/' + '\0' */
+	file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2);
+	sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+	fp = fopen(file, "rb");
+	if ( fp != NULL )
+	{
+		fseek(fp, 0, SEEK_END);
+		fileLen = ftell(fp);
+		fseek(fp, 0, SEEK_SET);
+		buf=(char *)malloc(fileLen);
+		bytesRead = (int)fread(buf, sizeof(char), fileLen, fp);
+		*buffer = buf;
+		*buflen = bytesRead;
+		if ( bytesRead != fileLen )
+			rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		fclose(fp);
+		fp = NULL;
+	} else
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+	free(file);
+	/* the caller must free buf */
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+
+/** Delete a persisted message from the client persistence directory.
+ *  See ::Persistence_remove
+ */
+int pstremove(void* handle, char* key)
+{
+	int rc = 0;
+	char *clientDir = handle;
+	char *file;
+
+	FUNC_ENTRY;
+	if (clientDir == NULL)
+	{
+		return rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+	/* consider '/' + '\0' */
+	file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2);
+	sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+#if defined(WIN32) || defined(WIN64)
+	if ( _unlink(file) != 0 )
+	{
+#else
+	if ( unlink(file) != 0 )
+	{
+#endif
+		if ( errno != ENOENT )
+			rc = MQTTCLIENT_PERSISTENCE_ERROR;
+	}
+
+	free(file);
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/** Delete client persistence directory (if empty).
+ *  See ::Persistence_close
+ */
+int pstclose(void* handle)
+{
+	int rc = 0;
+	char *clientDir = handle;
+
+	FUNC_ENTRY;
+	if (clientDir == NULL)
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+#if defined(WIN32) || defined(WIN64)
+	if ( _rmdir(clientDir) != 0 )
+	{
+#else
+	if ( rmdir(clientDir) != 0 )
+	{
+#endif
+		if ( errno != ENOENT && errno != ENOTEMPTY )
+			rc = MQTTCLIENT_PERSISTENCE_ERROR;
+	}
+
+	free(clientDir);
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/** Returns whether if a wire message is persisted in the client persistence directory.
+ * See ::Persistence_containskey
+ */
+int pstcontainskey(void *handle, char *key)
+{
+	int rc = 0;
+	char *clientDir = handle;
+
+	FUNC_ENTRY;
+	if (clientDir == NULL)
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+#if defined(WIN32) || defined(WIN64)
+	rc = containskeyWin32(clientDir, key);
+#else
+	rc = containskeyUnix(clientDir, key);
+#endif
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int containskeyWin32(char *dirname, char *key)
+{
+	int notFound = MQTTCLIENT_PERSISTENCE_ERROR;
+	int fFinished = 0;
+	char *filekey, *ptraux;
+	char dir[MAX_PATH+1];
+	WIN32_FIND_DATAA FileData;
+	HANDLE hDir;
+
+	FUNC_ENTRY;
+	sprintf(dir, "%s/*", dirname);
+
+	hDir = FindFirstFileA(dir, &FileData);
+	if (hDir != INVALID_HANDLE_VALUE)
+	{
+		while (!fFinished)
+		{
+			if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+			{
+				filekey = malloc(strlen(FileData.cFileName) + 1);
+				strcpy(filekey, FileData.cFileName);
+				ptraux = strstr(filekey, MESSAGE_FILENAME_EXTENSION);
+				if ( ptraux != NULL )
+					*ptraux = '\0' ;
+				if(strcmp(filekey, key) == 0)
+				{
+					notFound = 0;
+					fFinished = 1;
+				}
+				free(filekey);
+			}
+			if (!FindNextFileA(hDir, &FileData))
+			{
+				if (GetLastError() == ERROR_NO_MORE_FILES)
+					fFinished = 1;
+			}
+		}
+		FindClose(hDir);
+	}
+
+	FUNC_EXIT_RC(notFound);
+	return notFound;
+}
+#else
+int containskeyUnix(char *dirname, char *key)
+{
+	int notFound = MQTTCLIENT_PERSISTENCE_ERROR;
+	char *filekey, *ptraux;
+	DIR *dp;
+	struct dirent *dir_entry;
+	struct stat stat_info;
+
+	FUNC_ENTRY;
+	if((dp = opendir(dirname)) != NULL)
+	{
+		while((dir_entry = readdir(dp)) != NULL && notFound)
+		{
+			char* filename = malloc(strlen(dirname) + strlen(dir_entry->d_name) + 2);
+			sprintf(filename, "%s/%s", dirname, dir_entry->d_name);
+			lstat(filename, &stat_info);
+			free(filename);
+			if(S_ISREG(stat_info.st_mode))
+			{
+				filekey = malloc(strlen(dir_entry->d_name) + 1);
+				strcpy(filekey, dir_entry->d_name);
+				ptraux = strstr(filekey, MESSAGE_FILENAME_EXTENSION);
+				if ( ptraux != NULL )
+					*ptraux = '\0' ;
+				if(strcmp(filekey, key) == 0)
+					notFound = 0;
+				free(filekey);
+			}
+		}
+		closedir(dp);
+	}
+
+	FUNC_EXIT_RC(notFound);
+	return notFound;
+}
+#endif
+
+
+/** Delete all the persisted message in the client persistence directory.
+ * See ::Persistence_clear
+ */
+int pstclear(void *handle)
+{
+	int rc = 0;
+	char *clientDir = handle;
+
+	FUNC_ENTRY;
+	if (clientDir == NULL)
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+#if defined(WIN32) || defined(WIN64)
+	rc = clearWin32(clientDir);
+#else
+	rc = clearUnix(clientDir);
+#endif
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int clearWin32(char *dirname)
+{
+	int rc = 0;
+	int fFinished = 0;
+	char *file;
+	char dir[MAX_PATH+1];
+	WIN32_FIND_DATAA FileData;
+	HANDLE hDir;
+
+	FUNC_ENTRY;
+	sprintf(dir, "%s/*", dirname);
+
+	hDir = FindFirstFileA(dir, &FileData);
+	if (hDir != INVALID_HANDLE_VALUE)
+	{
+		while (!fFinished)
+		{
+			if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+			{
+				file = malloc(strlen(dirname) + strlen(FileData.cFileName) + 2);
+				sprintf(file, "%s/%s", dirname, FileData.cFileName);
+				rc = remove(file);
+				free(file);
+				if ( rc != 0 )
+				{
+					rc = MQTTCLIENT_PERSISTENCE_ERROR;
+					break;
+				}
+			}
+			if (!FindNextFileA(hDir, &FileData))
+			{
+				if (GetLastError() == ERROR_NO_MORE_FILES)
+					fFinished = 1;
+			}
+		}
+		FindClose(hDir);
+	} else
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+#else
+int clearUnix(char *dirname)
+{
+	int rc = 0;
+	DIR *dp;
+	struct dirent *dir_entry;
+	struct stat stat_info;
+
+	FUNC_ENTRY;
+	if((dp = opendir(dirname)) != NULL)
+	{
+		while((dir_entry = readdir(dp)) != NULL && rc == 0)
+		{
+			lstat(dir_entry->d_name, &stat_info);
+			if(S_ISREG(stat_info.st_mode))
+			{
+				if ( remove(dir_entry->d_name) != 0 )
+					rc = MQTTCLIENT_PERSISTENCE_ERROR;
+			}
+		}
+		closedir(dp);
+	} else
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+#endif
+
+
+/** Returns the keys (file names w/o the extension) in the client persistence directory.
+ *  See ::Persistence_keys
+ */
+int pstkeys(void *handle, char ***keys, int *nkeys)
+{
+	int rc = 0;
+	char *clientDir = handle;
+
+	FUNC_ENTRY;
+	if (clientDir == NULL)
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+#if defined(WIN32) || defined(WIN64)
+	rc = keysWin32(clientDir, keys, nkeys);
+#else
+	rc = keysUnix(clientDir, keys, nkeys);
+#endif
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int keysWin32(char *dirname, char ***keys, int *nkeys)
+{
+	int rc = 0;
+	char **fkeys = NULL;
+	int nfkeys = 0;
+	char dir[MAX_PATH+1];
+	WIN32_FIND_DATAA FileData;
+	HANDLE hDir;
+	int fFinished = 0;
+	char *ptraux;
+	int i;
+
+	FUNC_ENTRY;
+	sprintf(dir, "%s/*", dirname);
+
+	/* get number of keys */
+	hDir = FindFirstFileA(dir, &FileData);
+	if (hDir != INVALID_HANDLE_VALUE)
+	{
+		while (!fFinished)
+		{
+			if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+				nfkeys++;
+			if (!FindNextFileA(hDir, &FileData))
+			{
+				if (GetLastError() == ERROR_NO_MORE_FILES)
+					fFinished = 1;
+			}
+		}
+		FindClose(hDir);
+	} else
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+	if (nfkeys != 0 )
+		fkeys = (char **)malloc(nfkeys * sizeof(char *));
+
+	/* copy the keys */
+	hDir = FindFirstFileA(dir, &FileData);
+	if (hDir != INVALID_HANDLE_VALUE)
+	{
+		fFinished = 0;
+		i = 0;
+		while (!fFinished)
+		{
+			if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+			{
+				fkeys[i] = malloc(strlen(FileData.cFileName) + 1);
+				strcpy(fkeys[i], FileData.cFileName);
+				ptraux = strstr(fkeys[i], MESSAGE_FILENAME_EXTENSION);
+				if ( ptraux != NULL )
+					*ptraux = '\0' ;
+				i++;
+			}
+			if (!FindNextFileA(hDir, &FileData))
+			{
+				if (GetLastError() == ERROR_NO_MORE_FILES)
+					fFinished = 1;
+			}
+		}
+		FindClose(hDir);
+	} else
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+	*nkeys = nfkeys;
+	*keys = fkeys;
+	/* the caller must free keys */
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+#else
+int keysUnix(char *dirname, char ***keys, int *nkeys)
+{
+	int rc = 0;
+	char **fkeys = NULL;
+	int nfkeys = 0;
+	char *ptraux;
+	int i;
+	DIR *dp;
+	struct dirent *dir_entry;
+	struct stat stat_info;
+
+	FUNC_ENTRY;
+	/* get number of keys */
+	if((dp = opendir(dirname)) != NULL)
+	{
+		while((dir_entry = readdir(dp)) != NULL)
+		{
+			char* temp = malloc(strlen(dirname)+strlen(dir_entry->d_name)+2);
+
+			sprintf(temp, "%s/%s", dirname, dir_entry->d_name);
+			if (lstat(temp, &stat_info) == 0 && S_ISREG(stat_info.st_mode))
+				nfkeys++;
+			free(temp);
+		}
+		closedir(dp);
+	} else
+	{
+		rc = MQTTCLIENT_PERSISTENCE_ERROR;
+		goto exit;
+	}
+
+	if (nfkeys != 0)
+	{
+		fkeys = (char **)malloc(nfkeys * sizeof(char *));
+
+		/* copy the keys */
+		if((dp = opendir(dirname)) != NULL)
+		{
+			i = 0;
+			while((dir_entry = readdir(dp)) != NULL)
+			{
+				char* temp = malloc(strlen(dirname)+strlen(dir_entry->d_name)+2);
+	
+				sprintf(temp, "%s/%s", dirname, dir_entry->d_name);
+				if (lstat(temp, &stat_info) == 0 && S_ISREG(stat_info.st_mode))
+				{
+					fkeys[i] = malloc(strlen(dir_entry->d_name) + 1);
+					strcpy(fkeys[i], dir_entry->d_name);
+					ptraux = strstr(fkeys[i], MESSAGE_FILENAME_EXTENSION);
+					if ( ptraux != NULL )
+						*ptraux = '\0' ;
+					i++;
+				}
+				free(temp);
+			}
+			closedir(dp);
+		} else
+		{
+			rc = MQTTCLIENT_PERSISTENCE_ERROR;
+			goto exit;
+		}
+	}
+
+	*nkeys = nfkeys;
+	*keys = fkeys;
+	/* the caller must free keys */
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+#endif
+
+
+
+#if defined(UNIT_TESTS)
+int main (int argc, char *argv[])
+{
+#define MSTEM "m-"
+#define NMSGS 10
+#define NBUFS 4
+#define NDEL 2
+#define RC !rc ? "(Success)" : "(Failed) "
+
+	int rc;
+	char *handle;
+	char *perdir = ".";
+	const char *clientID = "TheUTClient";
+	const char *serverURI = "127.0.0.1:1883";
+
+	char *stem = MSTEM;
+	int msgId, i;
+	int nm[NDEL] = {5 , 8};  /* msgIds to get and remove */
+
+	char *key;
+	char **keys;
+	int nkeys;
+	char *buffer, *buff;
+	int buflen;
+
+	int nbufs = NBUFS;
+	char *bufs[NBUFS] = {"m0", "mm1", "mmm2" , "mmmm3"};  /* message content */
+	int buflens[NBUFS];
+	for(i=0;i<nbufs;i++)
+		buflens[i]=strlen(bufs[i]);
+
+	/* open */
+	/* printf("Persistence directory : %s\n", perdir); */
+	rc = pstopen((void**)&handle, clientID, serverURI, perdir);
+	printf("%s Persistence directory for client %s : %s\n", RC, clientID, handle);
+
+	/* put */
+	for(msgId=0;msgId<NMSGS;msgId++)
+	{
+		key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+		sprintf(key, "%s%d", stem, msgId);
+		rc = pstput(handle, key, nbufs, bufs, buflens);
+		printf("%s Adding message %s\n", RC, key);
+		free(key);
+	}
+
+	/* keys ,ie, list keys added */
+	rc = pstkeys(handle, &keys, &nkeys);
+	printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+	for(i=0;i<nkeys;i++)
+		printf("%13s\n", keys[i]);
+
+	if (keys !=NULL)
+		free(keys);
+
+	/* containskey */
+	for(i=0;i<NDEL;i++)
+	{
+		key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+		sprintf(key, "%s%d", stem, nm[i]);
+		rc = pstcontainskey(handle, key);
+		printf("%s Message %s is persisted ?\n", RC, key);
+		free(key);
+	}
+
+	/* get && remove*/
+	for(i=0;i<NDEL;i++)
+	{
+		key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+		sprintf(key, "%s%d", stem, nm[i]);
+		rc = pstget(handle, key, &buffer, &buflen);
+		buff = malloc(buflen+1);
+		memcpy(buff, buffer, buflen);
+		buff[buflen] = '\0';
+		printf("%s Retrieving message %s : %s\n", RC, key, buff);
+		rc = pstremove(handle, key);
+		printf("%s Removing message %s\n", RC, key);
+		free(key);
+		free(buff);
+		free(buffer);
+	}
+
+	/* containskey */
+	for(i=0;i<NDEL;i++)
+	{
+		key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+		sprintf(key, "%s%d", stem, nm[i]);
+		rc = pstcontainskey(handle, key);
+		printf("%s Message %s is persisted ?\n", RC, key);
+		free(key);
+	}
+
+	/* keys ,ie, list keys added */
+	rc = pstkeys(handle, &keys, &nkeys);
+	printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+	for(i=0;i<nkeys;i++)
+		printf("%13s\n", keys[i]);
+
+	if (keys != NULL)
+		free(keys);
+
+
+	/* close -> it will fail, since client persistence directory is not empty */
+	rc = pstclose(&handle);
+	printf("%s Closing client persistence directory for client %s\n", RC, clientID);
+
+	/* clear */
+	rc = pstclear(handle);
+	printf("%s Deleting all persisted messages in %s\n", RC, handle);
+
+	/* keys ,ie, list keys added */
+	rc = pstkeys(handle, &keys, &nkeys);
+	printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+	for(i=0;i<nkeys;i++)
+		printf("%13s\n", keys[i]);
+
+	if ( keys != NULL )
+		free(keys);
+
+	/* close */
+	rc = pstclose(&handle);
+	printf("%s Closing client persistence directory for client %s\n", RC, clientID);
+}
+#endif
+
+
+#endif /* NO_PERSISTENCE */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
new file mode 100644
index 0000000..27fedd6
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+/** 8.3 filesystem */
+#define MESSAGE_FILENAME_LENGTH 8    
+/** Extension of the filename */
+#define MESSAGE_FILENAME_EXTENSION ".msg"
+
+/* prototypes of the functions for the default file system persistence */
+int pstopen(void** handle, const char* clientID, const char* serverURI, void* context); 
+int pstclose(void* handle); 
+int pstput(void* handle, char* key, int bufcount, char* buffers[], int buflens[]); 
+int pstget(void* handle, char* key, char** buffer, int* buflen); 
+int pstremove(void* handle, char* key); 
+int pstkeys(void* handle, char*** keys, int* nkeys); 
+int pstclear(void* handle); 
+int pstcontainskey(void* handle, char* key);
+
+int pstmkdir(char *pPathname);
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocol.h b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
new file mode 100644
index 0000000..7478103
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - MQTT 3.1.1 updates
+ *******************************************************************************/
+
+#if !defined(MQTTPROTOCOL_H)
+#define MQTTPROTOCOL_H
+
+#include "LinkedList.h"
+#include "MQTTPacket.h"
+#include "Clients.h"
+
+#define MAX_MSG_ID 65535
+#define MAX_CLIENTID_LEN 65535
+
+typedef struct
+{
+	int socket;
+	Publications* p;
+} pending_write;
+
+
+typedef struct
+{
+	List publications;
+	unsigned int msgs_received;
+	unsigned int msgs_sent;
+	List pending_writes; /* for qos 0 writes not complete */
+} MQTTProtocol;
+
+
+#include "MQTTProtocolOut.h"
+
+#endif