You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/10 11:11:45 UTC

[03/19] nifi-minifi-cpp git commit: MINIFICPP-342: MQTT extension

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.c b/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.c
new file mode 100644
index 0000000..fa3ff63
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.c
@@ -0,0 +1,769 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - fix for bug 413429 - connectionLost not called
+ *    Ian Craggs - fix for bug 421103 - trying to write to same socket, in retry
+ *    Rong Xiang, Ian Craggs - C++ compatibility
+ *    Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief Functions dealing with the MQTT protocol exchanges
+ *
+ * Some other related functions are in the MQTTProtocolOut module
+ * */
+
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "MQTTProtocolClient.h"
+#if !defined(NO_PERSISTENCE)
+#include "MQTTPersistence.h"
+#endif
+#include "SocketBuffer.h"
+#include "StackTrace.h"
+#include "Heap.h"
+
+#if !defined(min)
+#define min(A,B) ( (A) < (B) ? (A):(B))
+#endif
+
+extern MQTTProtocol state;
+extern ClientStates* bstate;
+
+
+static void MQTTProtocol_storeQoS0(Clients* pubclient, Publish* publish);
+static int MQTTProtocol_startPublishCommon(
+		Clients* pubclient,
+		Publish* publish,
+		int qos,
+		int retained);
+static void MQTTProtocol_retries(time_t now, Clients* client, int regardless);
+
+/**
+ * List callback function for comparing Message structures by message id
+ * @param a first integer value
+ * @param b second integer value
+ * @return boolean indicating whether a and b are equal
+ */
+int messageIDCompare(void* a, void* b)
+{
+	Messages* msg = (Messages*)a;
+	return msg->msgid == *(int*)b;
+}
+
+
+/**
+ * Assign a new message id for a client.  Make sure it isn't already being used and does
+ * not exceed the maximum.
+ * @param client a client structure
+ * @return the next message id to use, or 0 if none available
+ */
+int MQTTProtocol_assignMsgId(Clients* client)
+{
+	int start_msgid = client->msgID;
+	int msgid = start_msgid;
+
+	FUNC_ENTRY;
+	msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
+	while (ListFindItem(client->outboundMsgs, &msgid, messageIDCompare) != NULL)
+	{
+		msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
+		if (msgid == start_msgid) 
+		{ /* we've tried them all - none free */
+			msgid = 0;
+			break;
+		}
+	}
+	if (msgid != 0)
+		client->msgID = msgid;
+	FUNC_EXIT_RC(msgid);
+	return msgid;
+}
+
+
+static void MQTTProtocol_storeQoS0(Clients* pubclient, Publish* publish)
+{
+	int len;
+	pending_write* pw = NULL;
+
+	FUNC_ENTRY;
+	/* store the publication until the write is finished */
+	pw = malloc(sizeof(pending_write));
+	Log(TRACE_MIN, 12, NULL);
+	pw->p = MQTTProtocol_storePublication(publish, &len);
+	pw->socket = pubclient->net.socket;
+	ListAppend(&(state.pending_writes), pw, sizeof(pending_write)+len);
+	/* we don't copy QoS 0 messages unless we have to, so now we have to tell the socket buffer where
+	the saved copy is */
+	if (SocketBuffer_updateWrite(pw->socket, pw->p->topic, pw->p->payload) == NULL)
+		Log(LOG_SEVERE, 0, "Error updating write");
+	FUNC_EXIT;
+}
+
+
+/**
+ * Utility function to start a new publish exchange.
+ * @param pubclient the client to send the publication to
+ * @param publish the publication data
+ * @param qos the MQTT QoS to use
+ * @param retained boolean - whether to set the MQTT retained flag
+ * @return the completion code
+ */
+static int MQTTProtocol_startPublishCommon(Clients* pubclient, Publish* publish, int qos, int retained)
+{
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	rc = MQTTPacket_send_publish(publish, 0, qos, retained, &pubclient->net, pubclient->clientID);
+	if (qos == 0 && rc == TCPSOCKET_INTERRUPTED)
+		MQTTProtocol_storeQoS0(pubclient, publish);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Start a new publish exchange.  Store any state necessary and try to send the packet
+ * @param pubclient the client to send the publication to
+ * @param publish the publication data
+ * @param qos the MQTT QoS to use
+ * @param retained boolean - whether to set the MQTT retained flag
+ * @param mm - pointer to the message to send
+ * @return the completion code
+ */
+int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int retained, Messages** mm)
+{
+	Publish p = *publish;
+	int rc = 0;
+
+	FUNC_ENTRY;
+	if (qos > 0)
+	{
+		*mm = MQTTProtocol_createMessage(publish, mm, qos, retained);
+		ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len);
+		/* we change these pointers to the saved message location just in case the packet could not be written
+		entirely; the socket buffer will use these locations to finish writing the packet */
+		p.payload = (*mm)->publish->payload;
+		p.topic = (*mm)->publish->topic;
+	}
+	rc = MQTTProtocol_startPublishCommon(pubclient, &p, qos, retained);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Copy and store message data for retries
+ * @param publish the publication data
+ * @param mm - pointer to the message data to store
+ * @param qos the MQTT QoS to use
+ * @param retained boolean - whether to set the MQTT retained flag
+ * @return pointer to the message data stored
+ */
+Messages* MQTTProtocol_createMessage(Publish* publish, Messages **mm, int qos, int retained)
+{
+	Messages* m = malloc(sizeof(Messages));
+
+	FUNC_ENTRY;
+	m->len = sizeof(Messages);
+	if (*mm == NULL || (*mm)->publish == NULL)
+	{
+		int len1;
+		*mm = m;
+		m->publish = MQTTProtocol_storePublication(publish, &len1);
+		m->len += len1;
+	}
+	else
+	{
+		++(((*mm)->publish)->refcount);
+		m->publish = (*mm)->publish;
+	}
+	m->msgid = publish->msgId;
+	m->qos = qos;
+	m->retain = retained;
+	time(&(m->lastTouch));
+	if (qos == 2)
+		m->nextMessageType = PUBREC;
+	FUNC_EXIT;
+	return m;
+}
+
+
+/**
+ * Store message data for possible retry
+ * @param publish the publication data
+ * @param len returned length of the data stored
+ * @return the publication stored
+ */
+Publications* MQTTProtocol_storePublication(Publish* publish, int* len)
+{
+	Publications* p = malloc(sizeof(Publications));
+
+	FUNC_ENTRY;
+	p->refcount = 1;
+
+	*len = (int)strlen(publish->topic)+1;
+	if (Heap_findItem(publish->topic))
+		p->topic = publish->topic;
+	else
+	{
+		p->topic = malloc(*len);
+		strcpy(p->topic, publish->topic);
+	}
+	*len += sizeof(Publications);
+
+	p->topiclen = publish->topiclen;
+	p->payloadlen = publish->payloadlen;
+	p->payload = malloc(publish->payloadlen);
+	memcpy(p->payload, publish->payload, p->payloadlen);
+	*len += publish->payloadlen;
+
+	ListAppend(&(state.publications), p, *len);
+	FUNC_EXIT;
+	return p;
+}
+
+/**
+ * Remove stored message data.  Opposite of storePublication
+ * @param p stored publication to remove
+ */
+void MQTTProtocol_removePublication(Publications* p)
+{
+	FUNC_ENTRY;
+	if (--(p->refcount) == 0)
+	{
+		free(p->payload);
+		free(p->topic);
+		ListRemove(&(state.publications), p);
+	}
+	FUNC_EXIT;
+}
+
+/**
+ * Process an incoming publish packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handlePublishes(void* pack, int sock)
+{
+	Publish* publish = (Publish*)pack;
+	Clients* client = NULL;
+	char* clientid = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	clientid = client->clientID;
+	Log(LOG_PROTOCOL, 11, NULL, sock, clientid, publish->msgId, publish->header.bits.qos,
+					publish->header.bits.retain, min(20, publish->payloadlen), publish->payload);
+
+	if (publish->header.bits.qos == 0)
+		Protocol_processPublication(publish, client);
+	else if (publish->header.bits.qos == 1)
+	{
+		/* send puback before processing the publications because a lot of return publications could fill up the socket buffer */
+		rc = MQTTPacket_send_puback(publish->msgId, &client->net, client->clientID);
+		/* if we get a socket error from sending the puback, should we ignore the publication? */
+		Protocol_processPublication(publish, client);
+	}
+	else if (publish->header.bits.qos == 2)
+	{
+		/* store publication in inbound list */
+		int len;
+		ListElement* listElem = NULL;
+		Messages* m = malloc(sizeof(Messages));
+		Publications* p = MQTTProtocol_storePublication(publish, &len);
+		m->publish = p;
+		m->msgid = publish->msgId;
+		m->qos = publish->header.bits.qos;
+		m->retain = publish->header.bits.retain;
+		m->nextMessageType = PUBREL;
+		if ( ( listElem = ListFindItem(client->inboundMsgs, &(m->msgid), messageIDCompare) ) != NULL )
+		{   /* discard queued publication with same msgID that the current incoming message */
+			Messages* msg = (Messages*)(listElem->content);
+			MQTTProtocol_removePublication(msg->publish);
+			ListInsert(client->inboundMsgs, m, sizeof(Messages) + len, listElem);
+			ListRemove(client->inboundMsgs, msg);
+		} else
+			ListAppend(client->inboundMsgs, m, sizeof(Messages) + len);
+		rc = MQTTPacket_send_pubrec(publish->msgId, &client->net, client->clientID);
+		publish->topic = NULL;
+	}
+	MQTTPacket_freePublish(publish);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+/**
+ * Process an incoming puback packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handlePubacks(void* pack, int sock)
+{
+	Puback* puback = (Puback*)pack;
+	Clients* client = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	Log(LOG_PROTOCOL, 14, NULL, sock, client->clientID, puback->msgId);
+
+	/* look for the message by message id in the records of outbound messages for this client */
+	if (ListFindItem(client->outboundMsgs, &(puback->msgId), messageIDCompare) == NULL)
+		Log(TRACE_MIN, 3, NULL, "PUBACK", client->clientID, puback->msgId);
+	else
+	{
+		Messages* m = (Messages*)(client->outboundMsgs->current->content);
+		if (m->qos != 1)
+			Log(TRACE_MIN, 4, NULL, "PUBACK", client->clientID, puback->msgId, m->qos);
+		else
+		{
+			Log(TRACE_MIN, 6, NULL, "PUBACK", client->clientID, puback->msgId);
+			#if !defined(NO_PERSISTENCE)
+				rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, puback->msgId);
+			#endif
+			MQTTProtocol_removePublication(m->publish);
+			ListRemove(client->outboundMsgs, m);
+		}
+	}
+	free(pack);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Process an incoming pubrec packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handlePubrecs(void* pack, int sock)
+{
+	Pubrec* pubrec = (Pubrec*)pack;
+	Clients* client = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	Log(LOG_PROTOCOL, 15, NULL, sock, client->clientID, pubrec->msgId);
+
+	/* look for the message by message id in the records of outbound messages for this client */
+	client->outboundMsgs->current = NULL;
+	if (ListFindItem(client->outboundMsgs, &(pubrec->msgId), messageIDCompare) == NULL)
+	{
+		if (pubrec->header.bits.dup == 0)
+			Log(TRACE_MIN, 3, NULL, "PUBREC", client->clientID, pubrec->msgId);
+	}
+	else
+	{
+		Messages* m = (Messages*)(client->outboundMsgs->current->content);
+		if (m->qos != 2)
+		{
+			if (pubrec->header.bits.dup == 0)
+				Log(TRACE_MIN, 4, NULL, "PUBREC", client->clientID, pubrec->msgId, m->qos);
+		}
+		else if (m->nextMessageType != PUBREC)
+		{
+			if (pubrec->header.bits.dup == 0)
+				Log(TRACE_MIN, 5, NULL, "PUBREC", client->clientID, pubrec->msgId);
+		}
+		else
+		{
+			rc = MQTTPacket_send_pubrel(pubrec->msgId, 0, &client->net, client->clientID);
+			m->nextMessageType = PUBCOMP;
+			time(&(m->lastTouch));
+		}
+	}
+	free(pack);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Process an incoming pubrel packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handlePubrels(void* pack, int sock)
+{
+	Pubrel* pubrel = (Pubrel*)pack;
+	Clients* client = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	Log(LOG_PROTOCOL, 17, NULL, sock, client->clientID, pubrel->msgId);
+
+	/* look for the message by message id in the records of inbound messages for this client */
+	if (ListFindItem(client->inboundMsgs, &(pubrel->msgId), messageIDCompare) == NULL)
+	{
+		if (pubrel->header.bits.dup == 0)
+			Log(TRACE_MIN, 3, NULL, "PUBREL", client->clientID, pubrel->msgId);
+		else
+			/* Apparently this is "normal" behaviour, so we don't need to issue a warning */
+			rc = MQTTPacket_send_pubcomp(pubrel->msgId, &client->net, client->clientID);
+	}
+	else
+	{
+		Messages* m = (Messages*)(client->inboundMsgs->current->content);
+		if (m->qos != 2)
+			Log(TRACE_MIN, 4, NULL, "PUBREL", client->clientID, pubrel->msgId, m->qos);
+		else if (m->nextMessageType != PUBREL)
+			Log(TRACE_MIN, 5, NULL, "PUBREL", client->clientID, pubrel->msgId);
+		else
+		{
+			Publish publish;
+
+			/* send pubcomp before processing the publications because a lot of return publications could fill up the socket buffer */
+			rc = MQTTPacket_send_pubcomp(pubrel->msgId, &client->net, client->clientID);
+			publish.header.bits.qos = m->qos;
+			publish.header.bits.retain = m->retain;
+			publish.msgId = m->msgid;
+			publish.topic = m->publish->topic;
+			publish.topiclen = m->publish->topiclen;
+			publish.payload = m->publish->payload;
+			publish.payloadlen = m->publish->payloadlen;
+			Protocol_processPublication(&publish, client);
+			#if !defined(NO_PERSISTENCE)
+				rc += MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_RECEIVED, m->qos, pubrel->msgId);
+			#endif
+			ListRemove(&(state.publications), m->publish);
+			ListRemove(client->inboundMsgs, m);
+			++(state.msgs_received);
+		}
+	}
+	free(pack);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Process an incoming pubcomp packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handlePubcomps(void* pack, int sock)
+{
+	Pubcomp* pubcomp = (Pubcomp*)pack;
+	Clients* client = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	Log(LOG_PROTOCOL, 19, NULL, sock, client->clientID, pubcomp->msgId);
+
+	/* look for the message by message id in the records of outbound messages for this client */
+	if (ListFindItem(client->outboundMsgs, &(pubcomp->msgId), messageIDCompare) == NULL)
+	{
+		if (pubcomp->header.bits.dup == 0)
+			Log(TRACE_MIN, 3, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
+	}
+	else
+	{
+		Messages* m = (Messages*)(client->outboundMsgs->current->content);
+		if (m->qos != 2)
+			Log(TRACE_MIN, 4, NULL, "PUBCOMP", client->clientID, pubcomp->msgId, m->qos);
+		else
+		{
+			if (m->nextMessageType != PUBCOMP)
+				Log(TRACE_MIN, 5, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
+			else
+			{
+				Log(TRACE_MIN, 6, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
+				#if !defined(NO_PERSISTENCE)
+					rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, pubcomp->msgId);
+				#endif
+				MQTTProtocol_removePublication(m->publish);
+				ListRemove(client->outboundMsgs, m);
+				(++state.msgs_sent);
+			}
+		}
+	}
+	free(pack);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * MQTT protocol keepAlive processing.  Sends PINGREQ packets as required.
+ * @param now current time
+ */
+void MQTTProtocol_keepalive(time_t now)
+{
+	ListElement* current = NULL;
+
+	FUNC_ENTRY;
+	ListNextElement(bstate->clients, &current);
+	while (current)
+	{
+		Clients* client =	(Clients*)(current->content);
+		ListNextElement(bstate->clients, &current); 
+		if (client->connected && client->keepAliveInterval > 0 &&
+			(difftime(now, client->net.lastSent) >= client->keepAliveInterval ||
+					difftime(now, client->net.lastReceived) >= client->keepAliveInterval))
+		{
+			if (client->ping_outstanding == 0)
+			{
+				if (Socket_noPendingWrites(client->net.socket))
+				{
+					if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE)
+					{
+						Log(TRACE_PROTOCOL, -1, "Error sending PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
+						MQTTProtocol_closeSession(client, 1);
+					}
+					else
+					{
+						client->net.lastSent = now;
+						client->ping_outstanding = 1;
+					}
+				}
+			}
+			else
+			{
+				Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
+				MQTTProtocol_closeSession(client, 1);
+			}
+		}
+	}
+	FUNC_EXIT;
+}
+
+
+/**
+ * MQTT retry processing per client
+ * @param now current time
+ * @param client - the client to which to apply the retry processing
+ * @param regardless boolean - retry packets regardless of retry interval (used on reconnect)
+ */
+static void MQTTProtocol_retries(time_t now, Clients* client, int regardless)
+{
+	ListElement* outcurrent = NULL;
+
+	FUNC_ENTRY;
+
+	if (!regardless && client->retryInterval <= 0) /* 0 or -ive retryInterval turns off retry except on reconnect */
+		goto exit;
+
+	while (client && ListNextElement(client->outboundMsgs, &outcurrent) &&
+		   client->connected && client->good &&        /* client is connected and has no errors */
+		   Socket_noPendingWrites(client->net.socket)) /* there aren't any previous packets still stacked up on the socket */
+	{
+		Messages* m = (Messages*)(outcurrent->content);
+		if (regardless || difftime(now, m->lastTouch) > max(client->retryInterval, 10))
+		{
+			if (m->qos == 1 || (m->qos == 2 && m->nextMessageType == PUBREC))
+			{
+				Publish publish;
+				int rc;
+
+				Log(TRACE_MIN, 7, NULL, "PUBLISH", client->clientID, client->net.socket, m->msgid);
+				publish.msgId = m->msgid;
+				publish.topic = m->publish->topic;
+				publish.payload = m->publish->payload;
+				publish.payloadlen = m->publish->payloadlen;
+				rc = MQTTPacket_send_publish(&publish, 1, m->qos, m->retain, &client->net, client->clientID);
+				if (rc == SOCKET_ERROR)
+				{
+					client->good = 0;
+					Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
+												Socket_getpeer(client->net.socket));
+					MQTTProtocol_closeSession(client, 1);
+					client = NULL;
+				}
+				else
+				{
+					if (m->qos == 0 && rc == TCPSOCKET_INTERRUPTED)
+						MQTTProtocol_storeQoS0(client, &publish);
+					time(&(m->lastTouch));
+				}
+			}
+			else if (m->qos && m->nextMessageType == PUBCOMP)
+			{
+				Log(TRACE_MIN, 7, NULL, "PUBREL", client->clientID, client->net.socket, m->msgid);
+				if (MQTTPacket_send_pubrel(m->msgid, 0, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
+				{
+					client->good = 0;
+					Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
+							Socket_getpeer(client->net.socket));
+					MQTTProtocol_closeSession(client, 1);
+					client = NULL;
+				}
+				else
+					time(&(m->lastTouch));
+			}
+			/* break; why not do all retries at once? */
+		}
+	}
+exit:
+	FUNC_EXIT;
+}
+
+
+/**
+ * MQTT retry protocol and socket pending writes processing.
+ * @param now current time
+ * @param doRetry boolean - retries as well as pending writes?
+ * @param regardless boolean - retry packets regardless of retry interval (used on reconnect)
+ */
+void MQTTProtocol_retry(time_t now, int doRetry, int regardless)
+{
+	ListElement* current = NULL;
+
+	FUNC_ENTRY;
+	ListNextElement(bstate->clients, &current);
+	/* look through the outbound message list of each client, checking to see if a retry is necessary */
+	while (current)
+	{
+		Clients* client = (Clients*)(current->content);
+		ListNextElement(bstate->clients, &current);
+		if (client->connected == 0)
+			continue;
+		if (client->good == 0)
+		{
+			MQTTProtocol_closeSession(client, 1);
+			continue;
+		}
+		if (Socket_noPendingWrites(client->net.socket) == 0)
+			continue;
+		if (doRetry)
+			MQTTProtocol_retries(now, client, regardless);
+	}
+	FUNC_EXIT;
+}
+
+
+/**
+ * Free a client structure
+ * @param client the client data to free
+ */
+void MQTTProtocol_freeClient(Clients* client)
+{
+	FUNC_ENTRY;
+	/* free up pending message lists here, and any other allocated data */
+	MQTTProtocol_freeMessageList(client->outboundMsgs);
+	MQTTProtocol_freeMessageList(client->inboundMsgs);
+	ListFree(client->messageQueue);
+	free(client->clientID);
+	if (client->will)
+	{
+		free(client->will->payload);
+		free(client->will->topic);
+		free(client->will);
+	}
+#if defined(OPENSSL)
+	if (client->sslopts)
+	{
+		if (client->sslopts->trustStore)
+			free((void*)client->sslopts->trustStore);
+		if (client->sslopts->keyStore)
+			free((void*)client->sslopts->keyStore);
+		if (client->sslopts->privateKey)
+			free((void*)client->sslopts->privateKey);
+		if (client->sslopts->privateKeyPassword)
+			free((void*)client->sslopts->privateKeyPassword);
+		if (client->sslopts->enabledCipherSuites)
+			free((void*)client->sslopts->enabledCipherSuites);
+		free(client->sslopts);
+	}
+#endif
+	/* don't free the client structure itself... this is done elsewhere */
+	FUNC_EXIT;
+}
+
+
+/**
+ * Empty a message list, leaving it able to accept new messages
+ * @param msgList the message list to empty
+ */
+void MQTTProtocol_emptyMessageList(List* msgList)
+{
+	ListElement* current = NULL;
+
+	FUNC_ENTRY;
+	while (ListNextElement(msgList, &current))
+	{
+		Messages* m = (Messages*)(current->content);
+		MQTTProtocol_removePublication(m->publish);
+	}
+	ListEmpty(msgList);
+	FUNC_EXIT;
+}
+
+
+/**
+ * Empty and free up all storage used by a message list
+ * @param msgList the message list to empty and free
+ */
+void MQTTProtocol_freeMessageList(List* msgList)
+{
+	FUNC_ENTRY;
+	MQTTProtocol_emptyMessageList(msgList);
+	ListFree(msgList);
+	FUNC_EXIT;
+}
+
+
+/**
+* Copy no more than dest_size -1 characters from the string pointed to by src to the array pointed to by dest.
+* The destination string will always be null-terminated.
+* @param dest the array which characters copy to
+* @param src the source string which characters copy from
+* @param dest_size the size of the memory pointed to by dest: copy no more than this -1 (allow for null).  Must be >= 1
+* @return the destination string pointer
+*/
+char* MQTTStrncpy(char *dest, const char *src, size_t dest_size)
+{
+  size_t count = dest_size;
+  char *temp = dest;
+
+  FUNC_ENTRY; 
+  if (dest_size < strlen(src))
+    Log(TRACE_MIN, -1, "the src string is truncated");
+
+  /* We must copy only the first (dest_size - 1) bytes */
+  while (count > 1 && (*temp++ = *src++))
+    count--;
+
+  *temp = '\0';
+
+  FUNC_EXIT;
+  return dest;
+}
+
+
+/**
+* Duplicate a string, safely, allocating space on the heap
+* @param src the source string which characters copy from
+* @return the duplicated, allocated string
+*/
+char* MQTTStrdup(const char* src)
+{
+	size_t mlen = strlen(src) + 1;
+	char* temp = malloc(mlen);
+	MQTTStrncpy(temp, src, mlen);
+	return temp;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.h b/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.h
new file mode 100644
index 0000000..36c2dd2
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTProtocolClient.h
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 updates
+ *    Rong Xiang, Ian Craggs - C++ compatibility
+ *******************************************************************************/
+
+#if !defined(MQTTPROTOCOLCLIENT_H)
+#define MQTTPROTOCOLCLIENT_H
+
+#include "LinkedList.h"
+#include "MQTTPacket.h"
+#include "Log.h"
+#include "MQTTProtocol.h"
+#include "Messages.h"
+
+#define MAX_MSG_ID 65535
+#define MAX_CLIENTID_LEN 65535
+
+int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int retained, Messages** m);
+Messages* MQTTProtocol_createMessage(Publish* publish, Messages** mm, int qos, int retained);
+Publications* MQTTProtocol_storePublication(Publish* publish, int* len);
+int messageIDCompare(void* a, void* b);
+int MQTTProtocol_assignMsgId(Clients* client);
+void MQTTProtocol_removePublication(Publications* p);
+void Protocol_processPublication(Publish* publish, Clients* client);
+
+int MQTTProtocol_handlePublishes(void* pack, int sock);
+int MQTTProtocol_handlePubacks(void* pack, int sock);
+int MQTTProtocol_handlePubrecs(void* pack, int sock);
+int MQTTProtocol_handlePubrels(void* pack, int sock);
+int MQTTProtocol_handlePubcomps(void* pack, int sock);
+
+void MQTTProtocol_closeSession(Clients* c, int sendwill);
+void MQTTProtocol_keepalive(time_t);
+void MQTTProtocol_retry(time_t, int, int);
+void MQTTProtocol_freeClient(Clients* client);
+void MQTTProtocol_emptyMessageList(List* msgList);
+void MQTTProtocol_freeMessageList(List* msgList);
+
+char* MQTTStrncpy(char *dest, const char* src, size_t num);
+char* MQTTStrdup(const char* src);
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.c b/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.c
new file mode 100644
index 0000000..90d38bf
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.c
@@ -0,0 +1,242 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - fix for buffer overflow in addressPort bug #433290
+ *    Ian Craggs - MQTT 3.1.1 support
+ *    Rong Xiang, Ian Craggs - C++ compatibility
+ *    Ian Craggs - fix for bug 479376
+ *    Ian Craggs - SNI support
+ *    Ian Craggs - fix for issue #164
+ *    Ian Craggs - fix for issue #179
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief Functions dealing with the MQTT protocol exchanges
+ *
+ * Some other related functions are in the MQTTProtocolClient module
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "MQTTProtocolOut.h"
+#include "StackTrace.h"
+#include "Heap.h"
+
+extern ClientStates* bstate;
+
+
+
+/**
+ * Separates an address:port into two separate values
+ * @param uri the input string - hostname:port
+ * @param port the returned port integer
+ * @return the address string
+ */
+char* MQTTProtocol_addressPort(const char* uri, int* port)
+{
+	char* colon_pos = strrchr(uri, ':'); /* reverse find to allow for ':' in IPv6 addresses */
+	char* buf = (char*)uri;
+	size_t len;
+
+	FUNC_ENTRY;
+	if (uri[0] == '[')
+	{  /* ip v6 */
+		if (colon_pos < strrchr(uri, ']'))
+			colon_pos = NULL;  /* means it was an IPv6 separator, not for host:port */
+	}
+
+	if (colon_pos) /* have to strip off the port */
+	{
+		size_t addr_len = colon_pos - uri;
+		buf = malloc(addr_len + 1);
+		*port = atoi(colon_pos + 1);
+		MQTTStrncpy(buf, uri, addr_len+1);
+	}
+	else
+		*port = DEFAULT_PORT;
+
+	len = strlen(buf);
+	if (buf[len - 1] == ']')
+	{
+		if (buf == (char*)uri)
+		{
+			buf = malloc(len);  /* we are stripping off the final ], so length is 1 shorter */
+			MQTTStrncpy(buf, uri, len);
+		}
+		else
+			buf[len - 1] = '\0';
+	}
+	FUNC_EXIT;
+	return buf;
+}
+
+
+/**
+ * MQTT outgoing connect processing for a client
+ * @param ip_address the TCP address:port to connect to
+ * @param aClient a structure with all MQTT data needed
+ * @param int ssl
+ * @param int MQTTVersion the MQTT version to connect with (3 or 4)
+ * @return return code
+ */
+#if defined(OPENSSL)
+int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int MQTTVersion)
+#else
+int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersion)
+#endif
+{
+	int rc, port;
+	char* addr;
+
+	FUNC_ENTRY;
+	aClient->good = 1;
+
+	addr = MQTTProtocol_addressPort(ip_address, &port);
+	rc = Socket_new(addr, port, &(aClient->net.socket));
+	if (rc == EINPROGRESS || rc == EWOULDBLOCK)
+		aClient->connect_state = 1; /* TCP connect called - wait for connect completion */
+	else if (rc == 0)
+	{	/* TCP connect completed. If SSL, send SSL connect */
+#if defined(OPENSSL)
+		if (ssl)
+		{
+			if (SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, addr) == 1)
+			{
+				rc = SSLSocket_connect(aClient->net.ssl, aClient->net.socket);
+				if (rc == TCPSOCKET_INTERRUPTED)
+					aClient->connect_state = 2; /* SSL connect called - wait for completion */
+			}
+			else
+				rc = SOCKET_ERROR;
+		}
+#endif
+		
+		if (rc == 0)
+		{
+			/* Now send the MQTT connect packet */
+			if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion)) == 0)
+				aClient->connect_state = 3; /* MQTT Connect sent - wait for CONNACK */ 
+			else
+				aClient->connect_state = 0;
+		}
+	}
+	if (addr != ip_address)
+		free(addr);
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Process an incoming pingresp packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handlePingresps(void* pack, int sock)
+{
+	Clients* client = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
+	client->ping_outstanding = 0;
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * MQTT outgoing subscribe processing for a client
+ * @param client the client structure
+ * @param topics list of topics
+ * @param qoss corresponding list of QoSs
+ * @return completion code
+ */
+int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	/* we should stack this up for retry processing too */
+	rc = MQTTPacket_send_subscribe(topics, qoss, msgID, 0, &client->net, client->clientID);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Process an incoming suback packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handleSubacks(void* pack, int sock)
+{
+	Suback* suback = (Suback*)pack;
+	Clients* client = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	Log(LOG_PROTOCOL, 23, NULL, sock, client->clientID, suback->msgId);
+	MQTTPacket_freeSuback(suback);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * MQTT outgoing unsubscribe processing for a client
+ * @param client the client structure
+ * @param topics list of topics
+ * @return completion code
+ */
+int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	/* we should stack this up for retry processing too? */
+	rc = MQTTPacket_send_unsubscribe(topics, msgID, 0, &client->net, client->clientID);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/**
+ * Process an incoming unsuback packet for a socket
+ * @param pack pointer to the publish packet
+ * @param sock the socket on which the packet was received
+ * @return completion code
+ */
+int MQTTProtocol_handleUnsubacks(void* pack, int sock)
+{
+	Unsuback* unsuback = (Unsuback*)pack;
+	Clients* client = NULL;
+	int rc = TCPSOCKET_COMPLETE;
+
+	FUNC_ENTRY;
+	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
+	Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
+	free(unsuback);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.h b/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.h
new file mode 100644
index 0000000..3b890e7
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTProtocolOut.h
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ *    Ian Craggs - SNI support
+ *******************************************************************************/
+
+#if !defined(MQTTPROTOCOLOUT_H)
+#define MQTTPROTOCOLOUT_H
+
+#include "LinkedList.h"
+#include "MQTTPacket.h"
+#include "Clients.h"
+#include "Log.h"
+#include "Messages.h"
+#include "MQTTProtocol.h"
+#include "MQTTProtocolClient.h"
+
+#define DEFAULT_PORT 1883
+
+char* MQTTProtocol_addressPort(const char* uri, int* port);
+void MQTTProtocol_reconnect(const char* ip_address, Clients* client);
+#if defined(OPENSSL)
+int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int ssl, int MQTTVersion);
+#else
+int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int MQTTVersion);
+#endif
+int MQTTProtocol_handlePingresps(void* pack, int sock);
+int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID);
+int MQTTProtocol_handleSubacks(void* pack, int sock);
+int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID);
+int MQTTProtocol_handleUnsubacks(void* pack, int sock);
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTVersion.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTVersion.c b/thirdparty/paho.mqtt.c/src/MQTTVersion.c
new file mode 100644
index 0000000..382033a
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTVersion.c
@@ -0,0 +1,230 @@
+/*******************************************************************************
+ * Copyright (c) 2012, 2015 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+#include <stdio.h>
+
+#if !defined(_WRS_KERNEL)
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <ctype.h>
+#include "MQTTAsync.h"
+
+#if defined(WIN32) || defined(WIN64)
+#include <windows.h>
+#include <tchar.h>
+#include <io.h>
+#include <sys/stat.h>
+#else
+#include <dlfcn.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#endif
+
+
+/**
+ *
+ * @file
+ * \brief MQTTVersion - display the version and build information strings for a library.
+ *
+ * With no arguments, we try to load and call the version string for the libraries we 
+ * know about: mqttv3c, mqttv3cs, mqttv3a, mqttv3as.
+ * With an argument:
+ *   1) we try to load the named library, call getVersionInfo and display those values. 
+ *   2) If that doesn't work, we look through the binary for eyecatchers, and display those.  
+ *      This will work if the library is not executable in the current environment.
+ *
+ * */
+ 
+ 
+ static const char* libraries[] = {"paho-mqtt3c", "paho-mqtt3cs", "paho-mqtt3a", "paho-mqtt3as"};
+ static const char* eyecatchers[] = {"MQTTAsyncV3_Version", "MQTTAsyncV3_Timestamp",
+ 					 "MQTTClientV3_Version", "MQTTClientV3_Timestamp"};
+ 
+
+char* FindString(char* filename, const char* eyecatcher_input);
+int printVersionInfo(MQTTAsync_nameValue* info);
+int loadandcall(char* libname);
+void printEyecatchers(char* filename);
+
+
+/**
+ * Finds an eyecatcher in a binary file and returns the following value.
+ * @param filename the name of the file
+ * @param eyecatcher_input the eyecatcher string to look for
+ * @return the value found - "" if not found 
+ */
+char* FindString(char* filename, const char* eyecatcher_input)
+{
+	FILE* infile = NULL;
+	static char value[100];
+	const char* eyecatcher = eyecatcher_input;
+	
+	memset(value, 0, 100);
+	if ((infile = fopen(filename, "rb")) != NULL)
+	{
+		size_t buflen = strlen(eyecatcher);
+		char* buffer = (char*) malloc(buflen);
+
+		if (buffer != NULL)
+		{
+			int c = fgetc(infile);
+
+			while (feof(infile) == 0)
+			{
+				int count = 0;
+				buffer[count++] = c;
+				if (memcmp(eyecatcher, buffer, buflen) == 0)
+				{
+					char* ptr = value;
+					c = fgetc(infile); /* skip space */
+					c = fgetc(infile);
+					while (isprint(c))
+					{
+						*ptr++ = c;
+						c = fgetc(infile);
+					}
+					break;
+				}
+				if (count == buflen)
+				{
+					memmove(buffer, &buffer[1], buflen - 1);
+					count--;
+				}
+				c = fgetc(infile);
+			}
+			free(buffer);
+		}
+
+		fclose(infile);
+	}
+	return value;
+}
+
+
+int printVersionInfo(MQTTAsync_nameValue* info)
+{
+	int rc = 0;
+	
+	while (info->name)
+	{
+		printf("%s: %s\n", info->name, info->value);
+		info++;
+		rc = 1;  /* at least one value printed */
+	}
+	return rc;
+}
+
+typedef MQTTAsync_nameValue* (*func_type)(void);
+
+int loadandcall(char* libname)
+{
+	int rc = 0;
+	MQTTAsync_nameValue* (*func_address)(void) = NULL;
+#if defined(WIN32) || defined(WIN64)
+	wchar_t wlibname[30];
+	HMODULE APILibrary;
+
+	mbstowcs(wlibname, libname, strlen(libname) + 1);
+	if ((APILibrary = LoadLibrary(wlibname)) == NULL)
+		printf("Error loading library %s, error code %d\n", libname, GetLastError());
+	else
+	{
+		func_address = (func_type)GetProcAddress(APILibrary, "MQTTAsync_getVersionInfo");
+		if (func_address == NULL) 
+			func_address = (func_type)GetProcAddress(APILibrary, "MQTTClient_getVersionInfo");
+		if (func_address)
+			rc = printVersionInfo((*func_address)());
+		FreeLibrary(APILibrary);
+	}
+#else
+	void* APILibrary = dlopen(libname, RTLD_LAZY); /* Open the Library in question */
+	char* ErrorOutput = dlerror(); 	               /* Check it opened properly */
+	if (ErrorOutput != NULL)
+		printf("Error loading library %s, error %s\n", libname, ErrorOutput);
+	else
+	{	
+		*(void **) (&func_address) = dlsym(APILibrary, "MQTTAsync_getVersionInfo");
+		if (func_address == NULL)
+			func_address = dlsym(APILibrary, "MQTTClient_getVersionInfo");
+		if (func_address)
+			rc = printVersionInfo((*func_address)());
+		dlclose(APILibrary);
+	}
+#endif
+	return rc;
+}
+ 
+	
+#if !defined(ARRAY_SIZE)
+#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
+#endif
+
+void printEyecatchers(char* filename)
+{
+	int i = 0;
+	
+	for (i = 0; i < ARRAY_SIZE(eyecatchers); ++i)
+	{
+		char* value = FindString(filename, eyecatchers[i]);
+		if (value[0]) 
+			printf("%s: %s\n", eyecatchers[i], value);
+	}
+}
+
+
+int main(int argc, char** argv)
+{
+	printf("MQTTVersion: print the version strings of an MQTT client library\n"); 
+	printf("Copyright (c) 2012, 2015 IBM Corp.\n");
+	
+	if (argc == 1)
+	{
+		int i = 0;
+		char namebuf[60];
+		
+		printf("Specify a particular library name if it is not in the current directory, or not executable on this platform\n");
+		 
+		for (i = 0; i < ARRAY_SIZE(libraries); ++i)
+		{
+#if defined(WIN32) || defined(WIN64)
+			sprintf(namebuf, "%s.dll", libraries[i]);
+#else
+			sprintf(namebuf, "lib%s.so.1", libraries[i]);
+#endif
+			printf("--- Trying library %s ---\n", libraries[i]);
+			if (!loadandcall(namebuf))
+				printEyecatchers(namebuf);
+		}
+	}
+	else
+	{
+		if (!loadandcall(argv[1]))
+			printEyecatchers(argv[1]);
+	}
+
+	return 0;
+}
+#else
+int main(void)
+{
+    fprintf(stderr, "This tool is not supported on this platform yet.\n");
+    return 1;
+}
+#endif /* !defined(_WRS_KERNEL) */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Messages.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Messages.c b/thirdparty/paho.mqtt.c/src/Messages.c
new file mode 100644
index 0000000..63bd193
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Messages.c
@@ -0,0 +1,104 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief Trace messages
+ *
+ */
+
+
+#include "Messages.h"
+#include "Log.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "Heap.h"
+
+#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
+
+#define max_msg_len 120
+
+static const char *protocol_message_list[] =
+{
+	"%d %s -> CONNECT cleansession: %d (%d)", /* 0, was 131, 68 and 69 */
+	"%d %s <- CONNACK rc: %d", /* 1, was 132 */
+	"%d %s -> CONNACK rc: %d (%d)", /* 2, was 138 */
+	"%d %s <- PINGREQ", /* 3, was 35 */
+	"%d %s -> PINGRESP (%d)", /* 4 */
+	"%d %s <- DISCONNECT", /* 5 */
+	"%d %s <- SUBSCRIBE msgid: %d", /* 6, was 39 */
+	"%d %s -> SUBACK msgid: %d (%d)", /* 7, was 40 */
+	"%d %s <- UNSUBSCRIBE msgid: %d", /* 8, was 41 */
+	"%d %s -> UNSUBACK msgid: %d (%d)", /* 9 */
+	"%d %s -> PUBLISH msgid: %d qos: %d retained: %d (%d) payload: %.*s", /* 10, was 42 */
+	"%d %s <- PUBLISH msgid: %d qos: %d retained: %d payload: %.*s", /* 11, was 46 */
+	"%d %s -> PUBACK msgid: %d (%d)", /* 12, was 47 */
+	"%d %s -> PUBREC msgid: %d (%d)", /* 13, was 48 */
+	"%d %s <- PUBACK msgid: %d", /* 14, was 49 */
+	"%d %s <- PUBREC msgid: %d", /* 15, was 53 */
+	"%d %s -> PUBREL msgid: %d (%d)", /* 16, was 57 */
+	"%d %s <- PUBREL msgid %d", /* 17, was 58 */
+	"%d %s -> PUBCOMP msgid %d (%d)", /* 18, was 62 */
+	"%d %s <- PUBCOMP msgid:%d", /* 19, was 63 */
+	"%d %s -> PINGREQ (%d)", /* 20, was 137 */
+	"%d %s <- PINGRESP", /* 21, was 70 */
+	"%d %s -> SUBSCRIBE msgid: %d (%d)", /* 22, was 72 */
+	"%d %s <- SUBACK msgid: %d", /* 23, was 73 */
+	"%d %s <- UNSUBACK msgid: %d", /* 24, was 74 */
+	"%d %s -> UNSUBSCRIBE msgid: %d (%d)", /* 25, was 106 */
+	"%d %s <- CONNECT", /* 26 */
+	"%d %s -> PUBLISH qos: 0 retained: %d (%d)", /* 27 */
+	"%d %s -> DISCONNECT (%d)", /* 28 */
+	"Socket error for client identifier %s, socket %d, peer address %s; ending connection", /* 29 */
+};
+
+static const char *trace_message_list[] =
+{
+	"Failed to remove client from bstate->clients", /* 0 */
+	"Removed client %s from bstate->clients, socket %d", /* 1 */
+	"Packet_Factory: unhandled packet type %d", /* 2 */
+	"Packet %s received from client %s for message identifier %d, but no record of that message identifier found", /* 3 */
+	"Packet %s received from client %s for message identifier %d, but message is wrong QoS, %d", /* 4 */
+	"Packet %s received from client %s for message identifier %d, but message is in wrong state", /* 5 */
+	"%s received from client %s for message id %d - removing publication", /* 6 */
+	"Trying %s again for client %s, socket %d, message identifier %d", /* 7 */
+	"", /* 8 */
+	"(%lu) %*s(%d)> %s:%d", /* 9 */
+	"(%lu) %*s(%d)< %s:%d", /* 10 */
+	"(%lu) %*s(%d)< %s:%d (%d)", /* 11 */
+	"Storing unsent QoS 0 message", /* 12 */
+};
+
+/**
+ * Get a log message by its index
+ * @param index the integer index
+ * @param log_level the log level, used to determine which message list to use
+ * @return the message format string
+ */
+const char* Messages_get(int index, enum LOG_LEVELS log_level)
+{
+	const char *msg = NULL;
+
+	if (log_level == TRACE_PROTOCOL)
+		msg = (index >= 0 && index < ARRAY_SIZE(protocol_message_list)) ? protocol_message_list[index] : NULL;
+	else
+		msg = (index >= 0 && index < ARRAY_SIZE(trace_message_list)) ? trace_message_list[index] : NULL;
+	return msg;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Messages.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Messages.h b/thirdparty/paho.mqtt.c/src/Messages.h
new file mode 100644
index 0000000..08f292f
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Messages.h
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+#if !defined(MESSAGES_H)
+#define MESSAGES_H
+
+#include "Log.h"
+
+const char* Messages_get(int, enum LOG_LEVELS);
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/OsWrapper.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/OsWrapper.c b/thirdparty/paho.mqtt.c/src/OsWrapper.c
new file mode 100644
index 0000000..6d2f97c
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/OsWrapper.c
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Copyright (c) 2016, 2017 logi.cals GmbH
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Gunter Raidl - timer support for VxWorks
+ *    Rainer Poisel - reusability
+ *******************************************************************************/
+
+#include "OsWrapper.h"
+
+#if defined(_WRS_KERNEL)
+void usleep(useconds_t useconds)
+{
+	struct timespec tv;
+	tv.tv_sec = useconds / 1000000;
+	tv.tv_nsec = (useconds % 1000000) * 1000;
+	nanosleep(&tv, NULL);
+}
+#endif /* defined(_WRS_KERNEL) */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/OsWrapper.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/OsWrapper.h b/thirdparty/paho.mqtt.c/src/OsWrapper.h
new file mode 100644
index 0000000..f657ab1
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/OsWrapper.h
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Copyright (c) 2016, 2017 logi.cals GmbH
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Gunter Raidl - timer support for VxWorks
+ *    Rainer Poisel - reusability
+ *******************************************************************************/
+
+#if !defined(OSWRAPPER_H)
+#define OSWRAPPER_H
+
+#if defined(_WRS_KERNEL)
+#include <time.h>
+
+#define lstat stat
+
+typedef unsigned long useconds_t;
+void usleep(useconds_t useconds);
+
+#define timersub(a, b, result) \
+	do \
+	{ \
+		(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
+		(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
+		if ((result)->tv_usec < 0) \
+		{ \
+			--(result)->tv_sec; \
+			(result)->tv_usec += 1000000L; \
+		} \
+	} while (0)
+#endif /* defined(_WRS_KERNEL) */
+
+#endif /* OSWRAPPER_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/SSLSocket.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/SSLSocket.c b/thirdparty/paho.mqtt.c/src/SSLSocket.c
new file mode 100644
index 0000000..d17c8bc
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/SSLSocket.c
@@ -0,0 +1,917 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs, Allan Stockdill-Mander - initial implementation
+ *    Ian Craggs - fix for bug #409702
+ *    Ian Craggs - allow compilation for OpenSSL < 1.0
+ *    Ian Craggs - fix for bug #453883
+ *    Ian Craggs - fix for bug #480363, issue 13
+ *    Ian Craggs - SNI support
+ *    Ian Craggs - fix for issues #155, #160
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief SSL  related functions
+ *
+ */
+
+#if defined(OPENSSL)
+
+#include "SocketBuffer.h"
+#include "MQTTClient.h"
+#include "SSLSocket.h"
+#include "Log.h"
+#include "StackTrace.h"
+#include "Socket.h"
+
+#include "Heap.h"
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <openssl/crypto.h>
+
+extern Sockets s;
+
+int SSLSocket_error(char* aString, SSL* ssl, int sock, int rc);
+char* SSL_get_verify_result_string(int rc);
+void SSL_CTX_info_callback(const SSL* ssl, int where, int ret);
+char* SSLSocket_get_version_string(int version);
+void SSL_CTX_msg_callback(
+		int write_p,
+		int version,
+		int content_type,
+		const void* buf, size_t len,
+		SSL* ssl, void* arg);
+int pem_passwd_cb(char* buf, int size, int rwflag, void* userdata);
+int SSL_create_mutex(ssl_mutex_type* mutex);
+int SSL_lock_mutex(ssl_mutex_type* mutex);
+int SSL_unlock_mutex(ssl_mutex_type* mutex);
+void SSL_destroy_mutex(ssl_mutex_type* mutex);
+#if (OPENSSL_VERSION_NUMBER >= 0x010000000)
+extern void SSLThread_id(CRYPTO_THREADID *id);
+#else
+extern unsigned long SSLThread_id(void);
+#endif
+extern void SSLLocks_callback(int mode, int n, const char *file, int line);
+int SSLSocket_createContext(networkHandles* net, MQTTClient_SSLOptions* opts);
+void SSLSocket_destroyContext(networkHandles* net);
+void SSLSocket_addPendingRead(int sock);
+
+/* 1 ~ we are responsible for initializing openssl; 0 ~ openssl init is done externally */
+static int handle_openssl_init = 1;
+static ssl_mutex_type* sslLocks = NULL;
+static ssl_mutex_type sslCoreMutex;
+
+#if defined(WIN32) || defined(WIN64)
+#define iov_len len
+#define iov_base buf
+#endif
+
+/**
+ * Gets the specific error corresponding to SOCKET_ERROR
+ * @param aString the function that was being used when the error occurred
+ * @param sock the socket on which the error occurred
+ * @return the specific TCP error code
+ */
+int SSLSocket_error(char* aString, SSL* ssl, int sock, int rc)
+{
+    int error;
+
+    FUNC_ENTRY;
+    if (ssl)
+        error = SSL_get_error(ssl, rc);
+    else
+        error = ERR_get_error();
+    if (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)
+    {
+		Log(TRACE_MIN, -1, "SSLSocket error WANT_READ/WANT_WRITE");
+    }
+    else
+    {
+        static char buf[120];
+
+        if (strcmp(aString, "shutdown") != 0)
+        	Log(TRACE_MIN, -1, "SSLSocket error %s(%d) in %s for socket %d rc %d errno %d %s\n", buf, error, aString, sock, rc, errno, strerror(errno));
+         ERR_print_errors_fp(stderr);
+		if (error == SSL_ERROR_SSL || error == SSL_ERROR_SYSCALL)
+			error = SSL_FATAL;
+    }
+    FUNC_EXIT_RC(error);
+    return error;
+}
+
+static struct
+{
+	int code;
+	char* string;
+}
+X509_message_table[] =
+{
+	{ X509_V_OK, "X509_V_OK" },
+	{ X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT, "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT" },
+	{ X509_V_ERR_UNABLE_TO_GET_CRL, "X509_V_ERR_UNABLE_TO_GET_CRL" },
+	{ X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE, "X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE" },
+	{ X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE, "X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE" },
+	{ X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY, "X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY" },
+	{ X509_V_ERR_CERT_SIGNATURE_FAILURE, "X509_V_ERR_CERT_SIGNATURE_FAILURE" },
+	{ X509_V_ERR_CRL_SIGNATURE_FAILURE, "X509_V_ERR_CRL_SIGNATURE_FAILURE" },
+	{ X509_V_ERR_CERT_NOT_YET_VALID, "X509_V_ERR_CERT_NOT_YET_VALID" },
+	{ X509_V_ERR_CERT_HAS_EXPIRED, "X509_V_ERR_CERT_HAS_EXPIRED" },
+	{ X509_V_ERR_CRL_NOT_YET_VALID, "X509_V_ERR_CRL_NOT_YET_VALID" },
+	{ X509_V_ERR_CRL_HAS_EXPIRED, "X509_V_ERR_CRL_HAS_EXPIRED" },
+	{ X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD, "X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD" },
+	{ X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD, "X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD" },
+	{ X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD, "X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD" },
+	{ X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD, "X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD" },
+	{ X509_V_ERR_OUT_OF_MEM, "X509_V_ERR_OUT_OF_MEM" },
+	{ X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT, "X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT" },
+	{ X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN, "X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN" },
+	{ X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY, "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY" },
+	{ X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE, "X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE" },
+	{ X509_V_ERR_CERT_CHAIN_TOO_LONG, "X509_V_ERR_CERT_CHAIN_TOO_LONG" },
+	{ X509_V_ERR_CERT_REVOKED, "X509_V_ERR_CERT_REVOKED" },
+	{ X509_V_ERR_INVALID_CA, "X509_V_ERR_INVALID_CA" },
+	{ X509_V_ERR_PATH_LENGTH_EXCEEDED, "X509_V_ERR_PATH_LENGTH_EXCEEDED" },
+	{ X509_V_ERR_INVALID_PURPOSE, "X509_V_ERR_INVALID_PURPOSE" },
+	{ X509_V_ERR_CERT_UNTRUSTED, "X509_V_ERR_CERT_UNTRUSTED" },
+	{ X509_V_ERR_CERT_REJECTED, "X509_V_ERR_CERT_REJECTED" },
+	{ X509_V_ERR_SUBJECT_ISSUER_MISMATCH, "X509_V_ERR_SUBJECT_ISSUER_MISMATCH" },
+	{ X509_V_ERR_AKID_SKID_MISMATCH, "X509_V_ERR_AKID_SKID_MISMATCH" },
+	{ X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH, "X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH" },
+	{ X509_V_ERR_KEYUSAGE_NO_CERTSIGN, "X509_V_ERR_KEYUSAGE_NO_CERTSIGN" },
+	{ X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER, "X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER" },
+	{ X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION, "X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION" },
+	{ X509_V_ERR_KEYUSAGE_NO_CRL_SIGN, "X509_V_ERR_KEYUSAGE_NO_CRL_SIGN" },
+	{ X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION, "X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION" },
+	{ X509_V_ERR_INVALID_NON_CA, "X509_V_ERR_INVALID_NON_CA" },
+	{ X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED, "X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED" },
+	{ X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE, "X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE" },
+	{ X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED, "X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED" },
+	{ X509_V_ERR_INVALID_EXTENSION, "X509_V_ERR_INVALID_EXTENSION" },
+	{ X509_V_ERR_INVALID_POLICY_EXTENSION, "X509_V_ERR_INVALID_POLICY_EXTENSION" },
+	{ X509_V_ERR_NO_EXPLICIT_POLICY, "X509_V_ERR_NO_EXPLICIT_POLICY" },
+	{ X509_V_ERR_UNNESTED_RESOURCE, "X509_V_ERR_UNNESTED_RESOURCE" },
+#if defined(X509_V_ERR_DIFFERENT_CRL_SCOPE)
+	{ X509_V_ERR_DIFFERENT_CRL_SCOPE, "X509_V_ERR_DIFFERENT_CRL_SCOPE" },
+	{ X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE, "X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE" },
+	{ X509_V_ERR_PERMITTED_VIOLATION, "X509_V_ERR_PERMITTED_VIOLATION" },
+	{ X509_V_ERR_EXCLUDED_VIOLATION, "X509_V_ERR_EXCLUDED_VIOLATION" },
+	{ X509_V_ERR_SUBTREE_MINMAX, "X509_V_ERR_SUBTREE_MINMAX" },
+	{ X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE, "X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE" },
+	{ X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX, "X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX" },
+	{ X509_V_ERR_UNSUPPORTED_NAME_SYNTAX, "X509_V_ERR_UNSUPPORTED_NAME_SYNTAX" },
+#endif
+};
+
+#if !defined(ARRAY_SIZE)
+/**
+ * Macro to calculate the number of entries in an array
+ */
+#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
+#endif
+
+char* SSL_get_verify_result_string(int rc)
+{
+	int i;
+	char* retstring = "undef";
+
+	for (i = 0; i < ARRAY_SIZE(X509_message_table); ++i)
+	{
+		if (X509_message_table[i].code == rc)
+		{
+			retstring = X509_message_table[i].string;
+			break;
+		}
+	}
+	return retstring;
+}
+
+
+void SSL_CTX_info_callback(const SSL* ssl, int where, int ret)
+{
+	if (where & SSL_CB_LOOP)
+	{
+		Log(TRACE_PROTOCOL, 1, "SSL state %s:%s:%s", 
+                  (where & SSL_ST_CONNECT) ? "connect" : (where & SSL_ST_ACCEPT) ? "accept" : "undef", 
+                    SSL_state_string_long(ssl), SSL_get_cipher_name(ssl));
+	}
+	else if (where & SSL_CB_EXIT)
+	{
+		Log(TRACE_PROTOCOL, 1, "SSL %s:%s",
+                  (where & SSL_ST_CONNECT) ? "connect" : (where & SSL_ST_ACCEPT) ? "accept" : "undef",
+                    SSL_state_string_long(ssl));
+	}
+	else if (where & SSL_CB_ALERT)
+	{
+		Log(TRACE_PROTOCOL, 1, "SSL alert %s:%s:%s",
+                  (where & SSL_CB_READ) ? "read" : "write", 
+                    SSL_alert_type_string_long(ret), SSL_alert_desc_string_long(ret));
+	}
+	else if (where & SSL_CB_HANDSHAKE_START)
+	{
+		Log(TRACE_PROTOCOL, 1, "SSL handshake started %s:%s:%s",
+                  (where & SSL_CB_READ) ? "read" : "write", 
+                    SSL_alert_type_string_long(ret), SSL_alert_desc_string_long(ret));
+	}
+	else if (where & SSL_CB_HANDSHAKE_DONE)
+	{
+		Log(TRACE_PROTOCOL, 1, "SSL handshake done %s:%s:%s", 
+                  (where & SSL_CB_READ) ? "read" : "write",
+                    SSL_alert_type_string_long(ret), SSL_alert_desc_string_long(ret));
+		Log(TRACE_PROTOCOL, 1, "SSL certificate verification: %s", 
+                    SSL_get_verify_result_string(SSL_get_verify_result(ssl)));
+	}
+	else
+	{
+		Log(TRACE_PROTOCOL, 1, "SSL state %s:%s:%s", SSL_state_string_long(ssl), 
+                   SSL_alert_type_string_long(ret), SSL_alert_desc_string_long(ret));
+	}
+}
+
+
+char* SSLSocket_get_version_string(int version)
+{
+	int i;
+	static char buf[20];
+	char* retstring = NULL;
+	static struct
+	{
+		int code;
+		char* string;
+	}
+	version_string_table[] =
+	{
+		{ SSL2_VERSION, "SSL 2.0" },
+		{ SSL3_VERSION, "SSL 3.0" },
+		{ TLS1_VERSION, "TLS 1.0" },
+#if defined(TLS2_VERSION)
+		{ TLS2_VERSION, "TLS 1.1" },
+#endif
+#if defined(TLS3_VERSION)
+		{ TLS3_VERSION, "TLS 1.2" },
+#endif
+	};
+
+	for (i = 0; i < ARRAY_SIZE(version_string_table); ++i)
+	{
+		if (version_string_table[i].code == version)
+		{
+			retstring = version_string_table[i].string;
+			break;
+		}
+	}
+	
+	if (retstring == NULL)
+	{
+		sprintf(buf, "%i", version);
+		retstring = buf;
+	}
+	return retstring;
+}
+
+
+void SSL_CTX_msg_callback(int write_p, int version, int content_type, const void* buf, size_t len, 
+        SSL* ssl, void* arg)
+{  
+
+/*  
+called by the SSL/TLS library for a protocol message, the function arguments have the following meaning:
+
+write_p
+This flag is 0 when a protocol message has been received and 1 when a protocol message has been sent.
+
+version
+The protocol version according to which the protocol message is interpreted by the library. Currently, this is one of SSL2_VERSION, SSL3_VERSION and TLS1_VERSION (for SSL 2.0, SSL 3.0 and TLS 1.0, respectively).
+
+content_type
+In the case of SSL 2.0, this is always 0. In the case of SSL 3.0 or TLS 1.0, this is one of the ContentType values defined in the protocol specification (change_cipher_spec(20), alert(21), handshake(22); but never application_data(23) because the callback will only be called for protocol messages).
+
+buf, len
+buf points to a buffer containing the protocol message, which consists of len bytes. The buffer is no longer valid after the callback function has returned.
+
+ssl
+The SSL object that received or sent the message.
+
+arg
+The user-defined argument optionally defined by SSL_CTX_set_msg_callback_arg() or SSL_set_msg_callback_arg().
+
+*/
+
+	Log(TRACE_PROTOCOL, -1, "%s %s %d buflen %d", (write_p ? "sent" : "received"), 
+		SSLSocket_get_version_string(version),
+		content_type, (int)len);	
+}
+
+
+int pem_passwd_cb(char* buf, int size, int rwflag, void* userdata)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+	if (!rwflag)
+	{
+		strncpy(buf, (char*)(userdata), size);
+		buf[size-1] = '\0';
+		rc = (int)strlen(buf);
+	}
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+int SSL_create_mutex(ssl_mutex_type* mutex)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+	*mutex = CreateMutex(NULL, 0, NULL);
+#else
+	rc = pthread_mutex_init(mutex, NULL);
+#endif
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+int SSL_lock_mutex(ssl_mutex_type* mutex)
+{
+	int rc = -1;
+
+	/* don't add entry/exit trace points, as trace gets lock too, and it might happen quite frequently  */
+#if defined(WIN32) || defined(WIN64)
+	if (WaitForSingleObject(*mutex, INFINITE) != WAIT_FAILED)
+#else
+	if ((rc = pthread_mutex_lock(mutex)) == 0)
+#endif
+	rc = 0;
+
+	return rc;
+}
+
+int SSL_unlock_mutex(ssl_mutex_type* mutex)
+{
+	int rc = -1;
+
+	/* don't add entry/exit trace points, as trace gets lock too, and it might happen quite frequently  */
+#if defined(WIN32) || defined(WIN64)
+	if (ReleaseMutex(*mutex) != 0)
+#else
+	if ((rc = pthread_mutex_unlock(mutex)) == 0)
+#endif
+	rc = 0;
+
+	return rc;
+}
+
+void SSL_destroy_mutex(ssl_mutex_type* mutex)
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+	rc = CloseHandle(*mutex);
+#else
+	rc = pthread_mutex_destroy(mutex);
+#endif
+	FUNC_EXIT_RC(rc);
+}
+
+
+
+#if (OPENSSL_VERSION_NUMBER >= 0x010000000)
+extern void SSLThread_id(CRYPTO_THREADID *id)
+{
+#if defined(WIN32) || defined(WIN64)
+	CRYPTO_THREADID_set_numeric(id, (unsigned long)GetCurrentThreadId());
+#else
+	CRYPTO_THREADID_set_numeric(id, (unsigned long)pthread_self());
+#endif
+}
+#else
+extern unsigned long SSLThread_id(void)
+{
+#if defined(WIN32) || defined(WIN64)
+	return (unsigned long)GetCurrentThreadId();
+#else
+	return (unsigned long)pthread_self();
+#endif
+}
+#endif
+
+extern void SSLLocks_callback(int mode, int n, const char *file, int line)
+{
+	if (sslLocks)
+	{
+		if (mode & CRYPTO_LOCK)
+			SSL_lock_mutex(&sslLocks[n]);
+		else
+			SSL_unlock_mutex(&sslLocks[n]);
+	}
+}
+
+
+void SSLSocket_handleOpensslInit(int bool_value)
+{
+	handle_openssl_init = bool_value;
+}
+
+
+int SSLSocket_initialize(void)
+{
+	int rc = 0;
+	/*int prc;*/
+	int i;
+	int lockMemSize;
+	
+	FUNC_ENTRY;
+
+	if (handle_openssl_init)
+	{
+		if ((rc = SSL_library_init()) != 1)
+			rc = -1;
+			
+		ERR_load_crypto_strings();
+		SSL_load_error_strings();
+		
+		/* OpenSSL 0.9.8o and 1.0.0a and later added SHA2 algorithms to SSL_library_init(). 
+		Applications which need to use SHA2 in earlier versions of OpenSSL should call 
+		OpenSSL_add_all_algorithms() as well. */
+		
+		OpenSSL_add_all_algorithms();
+		
+		lockMemSize = CRYPTO_num_locks() * sizeof(ssl_mutex_type);
+
+		sslLocks = malloc(lockMemSize);
+		if (!sslLocks)
+		{
+			rc = -1;
+			goto exit;
+		}
+		else
+			memset(sslLocks, 0, lockMemSize);
+
+		for (i = 0; i < CRYPTO_num_locks(); i++)
+		{
+			/* prc = */SSL_create_mutex(&sslLocks[i]);
+		}
+
+#if (OPENSSL_VERSION_NUMBER >= 0x010000000)
+		CRYPTO_THREADID_set_callback(SSLThread_id);
+#else
+		CRYPTO_set_id_callback(SSLThread_id);
+#endif
+		CRYPTO_set_locking_callback(SSLLocks_callback);
+		
+	}
+	
+	SSL_create_mutex(&sslCoreMutex);
+
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+void SSLSocket_terminate(void)
+{
+	FUNC_ENTRY;
+	
+	if (handle_openssl_init)
+	{
+		EVP_cleanup();
+		ERR_free_strings();
+		CRYPTO_set_locking_callback(NULL);
+		if (sslLocks)
+		{
+			int i = 0;
+
+			for (i = 0; i < CRYPTO_num_locks(); i++)
+			{
+				SSL_destroy_mutex(&sslLocks[i]);
+			}
+			free(sslLocks);
+		}
+	}
+	
+	SSL_destroy_mutex(&sslCoreMutex);
+	
+	FUNC_EXIT;
+}
+
+int SSLSocket_createContext(networkHandles* net, MQTTClient_SSLOptions* opts)
+{
+	int rc = 1;
+	const char* ciphers = NULL;
+	
+	FUNC_ENTRY;
+	if (net->ctx == NULL)
+	{
+		int sslVersion = MQTT_SSL_VERSION_DEFAULT;
+		if (opts->struct_version >= 1) sslVersion = opts->sslVersion;
+/* SSL_OP_NO_TLSv1_1 is defined in ssl.h if the library version supports TLSv1.1.
+ * OPENSSL_NO_TLS1 is defined in opensslconf.h or on the compiler command line
+ * if TLS1.x was removed at OpenSSL library build time via Configure options.
+ */
+		switch (sslVersion)
+		{
+		case MQTT_SSL_VERSION_DEFAULT:
+			net->ctx = SSL_CTX_new(SSLv23_client_method()); /* SSLv23 for compatibility with SSLv2, SSLv3 and TLSv1 */
+			break;
+#if defined(SSL_OP_NO_TLSv1) && !defined(OPENSSL_NO_TLS1)
+		case MQTT_SSL_VERSION_TLS_1_0:
+			net->ctx = SSL_CTX_new(TLSv1_client_method());
+			break;
+#endif
+#if defined(SSL_OP_NO_TLSv1_1) && !defined(OPENSSL_NO_TLS1)
+		case MQTT_SSL_VERSION_TLS_1_1:
+			net->ctx = SSL_CTX_new(TLSv1_1_client_method());
+			break;
+#endif
+#if defined(SSL_OP_NO_TLSv1_2) && !defined(OPENSSL_NO_TLS1)
+		case MQTT_SSL_VERSION_TLS_1_2:
+			net->ctx = SSL_CTX_new(TLSv1_2_client_method());
+			break;
+#endif
+		default:
+			break;
+		}
+		if (net->ctx == NULL)
+		{
+			SSLSocket_error("SSL_CTX_new", NULL, net->socket, rc);
+			goto exit;
+		}
+	}
+	
+	if (opts->keyStore)
+	{
+		if ((rc = SSL_CTX_use_certificate_chain_file(net->ctx, opts->keyStore)) != 1)
+		{
+			SSLSocket_error("SSL_CTX_use_certificate_chain_file", NULL, net->socket, rc);
+			goto free_ctx; /*If we can't load the certificate (chain) file then loading the privatekey won't work either as it needs a matching cert already loaded */
+		}	
+			
+		if (opts->privateKey == NULL)
+			opts->privateKey = opts->keyStore;   /* the privateKey can be included in the keyStore */
+
+		if (opts->privateKeyPassword != NULL)
+		{
+			SSL_CTX_set_default_passwd_cb(net->ctx, pem_passwd_cb);
+			SSL_CTX_set_default_passwd_cb_userdata(net->ctx, (void*)opts->privateKeyPassword);
+		}
+		
+		/* support for ASN.1 == DER format? DER can contain only one certificate? */
+		rc = SSL_CTX_use_PrivateKey_file(net->ctx, opts->privateKey, SSL_FILETYPE_PEM);
+		if (opts->privateKey == opts->keyStore)
+			opts->privateKey = NULL;
+		if (rc != 1)
+		{
+			SSLSocket_error("SSL_CTX_use_PrivateKey_file", NULL, net->socket, rc);
+			goto free_ctx;
+		}  
+	}
+
+	if (opts->trustStore)
+	{
+		if ((rc = SSL_CTX_load_verify_locations(net->ctx, opts->trustStore, NULL)) != 1)
+		{
+			SSLSocket_error("SSL_CTX_load_verify_locations", NULL, net->socket, rc);
+			goto free_ctx;
+		}                               
+	}
+	else if ((rc = SSL_CTX_set_default_verify_paths(net->ctx)) != 1)
+	{
+		SSLSocket_error("SSL_CTX_set_default_verify_paths", NULL, net->socket, rc);
+		goto free_ctx;
+	}
+
+	if (opts->enabledCipherSuites == NULL)
+		ciphers = "DEFAULT"; 
+	else
+		ciphers = opts->enabledCipherSuites;
+
+	if ((rc = SSL_CTX_set_cipher_list(net->ctx, ciphers)) != 1)
+	{
+		SSLSocket_error("SSL_CTX_set_cipher_list", NULL, net->socket, rc);
+		goto free_ctx;
+	}       
+	
+	SSL_CTX_set_mode(net->ctx, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
+
+	goto exit;
+free_ctx:
+	SSL_CTX_free(net->ctx);
+	net->ctx = NULL;
+	
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+int SSLSocket_setSocketForSSL(networkHandles* net, MQTTClient_SSLOptions* opts, char* hostname)
+{
+	int rc = 1;
+	
+	FUNC_ENTRY;
+	
+	if (net->ctx != NULL || (rc = SSLSocket_createContext(net, opts)) == 1)
+	{
+		int i;
+
+		SSL_CTX_set_info_callback(net->ctx, SSL_CTX_info_callback);
+		SSL_CTX_set_msg_callback(net->ctx, SSL_CTX_msg_callback);
+   		if (opts->enableServerCertAuth) 
+			SSL_CTX_set_verify(net->ctx, SSL_VERIFY_PEER, NULL);
+	
+		net->ssl = SSL_new(net->ctx);
+
+		/* Log all ciphers available to the SSL sessions (loaded in ctx) */
+		for (i = 0; ;i++)
+		{
+			const char* cipher = SSL_get_cipher_list(net->ssl, i);
+			if (cipher == NULL)
+				break;
+			Log(TRACE_PROTOCOL, 1, "SSL cipher available: %d:%s", i, cipher);
+	    	}	
+		if ((rc = SSL_set_fd(net->ssl, net->socket)) != 1)
+			SSLSocket_error("SSL_set_fd", net->ssl, net->socket, rc);
+
+		if ((rc = SSL_set_tlsext_host_name(net->ssl, hostname)) != 1)
+			SSLSocket_error("SSL_set_tlsext_host_name", NULL, net->socket, rc);
+	}
+		
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+int SSLSocket_connect(SSL* ssl, int sock)      
+{
+	int rc = 0;
+
+	FUNC_ENTRY;
+
+	rc = SSL_connect(ssl);
+	if (rc != 1)
+	{
+		int error;
+		error = SSLSocket_error("SSL_connect", ssl, sock, rc);
+		if (error == SSL_FATAL)
+			rc = error;
+		if (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)
+			rc = TCPSOCKET_INTERRUPTED;
+	}
+
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+
+/**
+ *  Reads one byte from a socket
+ *  @param socket the socket to read from
+ *  @param c the character read, returned
+ *  @return completion code
+ */
+int SSLSocket_getch(SSL* ssl, int socket, char* c)
+{
+	int rc = SOCKET_ERROR;
+
+	FUNC_ENTRY;
+	if ((rc = SocketBuffer_getQueuedChar(socket, c)) != SOCKETBUFFER_INTERRUPTED)
+		goto exit;
+
+	if ((rc = SSL_read(ssl, c, (size_t)1)) < 0)
+	{
+		int err = SSLSocket_error("SSL_read - getch", ssl, socket, rc);
+		if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE)
+		{
+			rc = TCPSOCKET_INTERRUPTED;
+			SocketBuffer_interrupted(socket, 0);
+		}
+	}
+	else if (rc == 0)
+		rc = SOCKET_ERROR; 	/* The return value from recv is 0 when the peer has performed an orderly shutdown. */
+	else if (rc == 1)
+	{
+		SocketBuffer_queueChar(socket, *c);
+		rc = TCPSOCKET_COMPLETE;
+	}
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+
+/**
+ *  Attempts to read a number of bytes from a socket, non-blocking. If a previous read did not
+ *  finish, then retrieve that data.
+ *  @param socket the socket to read from
+ *  @param bytes the number of bytes to read
+ *  @param actual_len the actual number of bytes read
+ *  @return completion code
+ */
+char *SSLSocket_getdata(SSL* ssl, int socket, size_t bytes, size_t* actual_len)
+{
+	int rc;
+	char* buf;
+
+	FUNC_ENTRY;
+	if (bytes == 0)
+	{
+		buf = SocketBuffer_complete(socket);
+		goto exit;
+	}
+
+	buf = SocketBuffer_getQueuedData(socket, bytes, actual_len);
+
+	if ((rc = SSL_read(ssl, buf + (*actual_len), (int)(bytes - (*actual_len)))) < 0)
+	{
+		rc = SSLSocket_error("SSL_read - getdata", ssl, socket, rc);
+		if (rc != SSL_ERROR_WANT_READ && rc != SSL_ERROR_WANT_WRITE)
+		{
+			buf = NULL;
+			goto exit;
+		}
+	}
+	else if (rc == 0) /* rc 0 means the other end closed the socket */
+	{
+		buf = NULL;
+		goto exit;
+	}
+	else
+		*actual_len += rc;
+
+	if (*actual_len == bytes)
+	{
+		SocketBuffer_complete(socket);
+		/* if we read the whole packet, there might still be data waiting in the SSL buffer, which
+		isn't picked up by select.  So here we should check for any data remaining in the SSL buffer, and
+		if so, add this socket to a new "pending SSL reads" list.
+		*/
+		if (SSL_pending(ssl) > 0) /* return no of bytes pending */
+			SSLSocket_addPendingRead(socket);
+	}
+	else /* we didn't read the whole packet */
+	{
+		SocketBuffer_interrupted(socket, *actual_len);
+		Log(TRACE_MAX, -1, "SSL_read: %d bytes expected but %d bytes now received", bytes, *actual_len);
+	}
+exit:
+	FUNC_EXIT;
+	return buf;
+}
+
+void SSLSocket_destroyContext(networkHandles* net)
+{
+	FUNC_ENTRY;
+	if (net->ctx)
+		SSL_CTX_free(net->ctx);
+	net->ctx = NULL;
+	FUNC_EXIT;
+}
+
+
+int SSLSocket_close(networkHandles* net)
+{
+	int rc = 1;
+	FUNC_ENTRY;
+	if (net->ssl) {
+		rc = SSL_shutdown(net->ssl);
+		SSL_free(net->ssl);
+		net->ssl = NULL;
+	}
+	SSLSocket_destroyContext(net);
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+
+/* No SSL_writev() provided by OpenSSL. Boo. */  
+int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees)
+{
+	int rc = 0;
+	int i;
+	char *ptr;
+	iobuf iovec;
+	int sslerror;
+
+	FUNC_ENTRY;
+	iovec.iov_len = (ULONG)buf0len;
+	for (i = 0; i < count; i++)
+		iovec.iov_len += (ULONG)buflens[i];
+
+	ptr = iovec.iov_base = (char *)malloc(iovec.iov_len);  
+	memcpy(ptr, buf0, buf0len);
+	ptr += buf0len;
+	for (i = 0; i < count; i++)
+	{
+		memcpy(ptr, buffers[i], buflens[i]);
+		ptr += buflens[i];
+	}
+
+	SSL_lock_mutex(&sslCoreMutex);
+	if ((rc = SSL_write(ssl, iovec.iov_base, iovec.iov_len)) == iovec.iov_len)
+		rc = TCPSOCKET_COMPLETE;
+	else 
+	{ 
+		sslerror = SSLSocket_error("SSL_write", ssl, socket, rc);
+		
+		if (sslerror == SSL_ERROR_WANT_WRITE)
+		{
+			int* sockmem = (int*)malloc(sizeof(int));
+			int free = 1;
+
+			Log(TRACE_MIN, -1, "Partial write: incomplete write of %d bytes on SSL socket %d",
+				iovec.iov_len, socket);
+			SocketBuffer_pendingWrite(socket, ssl, 1, &iovec, &free, iovec.iov_len, 0);
+			*sockmem = socket;
+			ListAppend(s.write_pending, sockmem, sizeof(int));
+			FD_SET(socket, &(s.pending_wset));
+			rc = TCPSOCKET_INTERRUPTED;
+		}
+		else 
+			rc = SOCKET_ERROR;
+	}
+	SSL_unlock_mutex(&sslCoreMutex);
+
+	if (rc != TCPSOCKET_INTERRUPTED)
+		free(iovec.iov_base);
+	else
+	{
+		int i;
+		free(buf0);
+		for (i = 0; i < count; ++i)
+		{
+			if (frees[i])
+				free(buffers[i]);
+		}	
+	}
+	FUNC_EXIT_RC(rc); 
+	return rc;
+}
+
+static List pending_reads = {NULL, NULL, NULL, 0, 0};
+
+void SSLSocket_addPendingRead(int sock)
+{
+	FUNC_ENTRY;
+	if (ListFindItem(&pending_reads, &sock, intcompare) == NULL) /* make sure we don't add the same socket twice */
+	{
+		int* psock = (int*)malloc(sizeof(sock));
+		*psock = sock;
+		ListAppend(&pending_reads, psock, sizeof(sock));
+	}
+	else
+		Log(TRACE_MIN, -1, "SSLSocket_addPendingRead: socket %d already in the list", sock);
+
+	FUNC_EXIT;
+}
+
+
+int SSLSocket_getPendingRead(void)
+{
+	int sock = -1;
+	
+	if (pending_reads.count > 0)
+	{
+		sock = *(int*)(pending_reads.first->content);
+		ListRemoveHead(&pending_reads);
+	}
+	return sock;
+}
+
+
+int SSLSocket_continueWrite(pending_writes* pw)
+{
+	int rc = 0; 
+	
+	FUNC_ENTRY;
+	if ((rc = SSL_write(pw->ssl, pw->iovecs[0].iov_base, pw->iovecs[0].iov_len)) == pw->iovecs[0].iov_len)
+	{
+		/* topic and payload buffers are freed elsewhere, when all references to them have been removed */
+		free(pw->iovecs[0].iov_base);
+		Log(TRACE_MIN, -1, "SSL continueWrite: partial write now complete for socket %d", pw->socket);
+		rc = 1;
+	}
+	else
+	{
+		int sslerror = SSLSocket_error("SSL_write", pw->ssl, pw->socket, rc);
+		if (sslerror == SSL_ERROR_WANT_WRITE)
+			rc = 0; /* indicate we haven't finished writing the payload yet */
+	}
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/SSLSocket.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/SSLSocket.h b/thirdparty/paho.mqtt.c/src/SSLSocket.h
new file mode 100644
index 0000000..ca18f62
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/SSLSocket.h
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs, Allan Stockdill-Mander - initial implementation 
+ *    Ian Craggs - SNI support
+ *******************************************************************************/
+#if !defined(SSLSOCKET_H)
+#define SSLSOCKET_H
+
+#if defined(WIN32) || defined(WIN64)
+	#define ssl_mutex_type HANDLE
+#else
+	#include <pthread.h>
+	#include <semaphore.h>
+	#define ssl_mutex_type pthread_mutex_t
+#endif
+
+#include <openssl/ssl.h>
+#include "SocketBuffer.h"
+#include "Clients.h"
+
+#define URI_SSL "ssl://"
+
+/** if we should handle openssl initialization (bool_value == 1) or depend on it to be initalized externally (bool_value == 0) */
+void SSLSocket_handleOpensslInit(int bool_value);
+
+int SSLSocket_initialize(void);
+void SSLSocket_terminate(void);
+int SSLSocket_setSocketForSSL(networkHandles* net, MQTTClient_SSLOptions* opts, char* hostname);
+
+int SSLSocket_getch(SSL* ssl, int socket, char* c);
+char *SSLSocket_getdata(SSL* ssl, int socket, size_t bytes, size_t* actual_len);
+
+int SSLSocket_close(networkHandles* net);
+int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees);
+int SSLSocket_connect(SSL* ssl, int socket);
+
+int SSLSocket_getPendingRead(void);
+int SSLSocket_continueWrite(pending_writes* pw);
+
+#endif