You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/10 11:11:46 UTC
[04/19] nifi-minifi-cpp git commit: MINIFICPP-342: MQTT extension
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.c b/thirdparty/paho.mqtt.c/src/MQTTPacket.c
new file mode 100644
index 0000000..c21a432
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.c
@@ -0,0 +1,755 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs, Allan Stockdill-Mander - SSL updates
+ * Ian Craggs - MQTT 3.1.1 support
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief functions to deal with reading and writing of MQTT packets from and to sockets
+ *
+ * Some other related functions are in the MQTTPacketOut module
+ */
+
+#include "MQTTPacket.h"
+#include "Log.h"
+#if !defined(NO_PERSISTENCE)
+ #include "MQTTPersistence.h"
+#endif
+#include "Messages.h"
+#include "StackTrace.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "Heap.h"
+
+#if !defined(min)
+#define min(A,B) ( (A) < (B) ? (A):(B))
+#endif
+
+/**
+ * List of the predefined MQTT v3 packet names.
+ */
+static const char *packet_names[] =
+{
+ "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
+ "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
+ "PINGREQ", "PINGRESP", "DISCONNECT"
+};
+
+const char** MQTTClient_packet_names = packet_names;
+
+
+/**
+ * Converts an MQTT packet code into its name
+ * @param ptype packet code
+ * @return the corresponding string, or "UNKNOWN"
+ */
+const char* MQTTPacket_name(int ptype)
+{
+ return (ptype >= 0 && ptype <= DISCONNECT) ? packet_names[ptype] : "UNKNOWN";
+}
+
+/**
+ * Array of functions to build packets, indexed according to packet code
+ */
+pf new_packets[] =
+{
+ NULL, /**< reserved */
+ NULL, /**< MQTTPacket_connect*/
+ MQTTPacket_connack, /**< CONNACK */
+ MQTTPacket_publish, /**< PUBLISH */
+ MQTTPacket_ack, /**< PUBACK */
+ MQTTPacket_ack, /**< PUBREC */
+ MQTTPacket_ack, /**< PUBREL */
+ MQTTPacket_ack, /**< PUBCOMP */
+ NULL, /**< MQTTPacket_subscribe*/
+ MQTTPacket_suback, /**< SUBACK */
+ NULL, /**< MQTTPacket_unsubscribe*/
+ MQTTPacket_ack, /**< UNSUBACK */
+ MQTTPacket_header_only, /**< PINGREQ */
+ MQTTPacket_header_only, /**< PINGRESP */
+ MQTTPacket_header_only /**< DISCONNECT */
+};
+
+
+static char* readUTFlen(char** pptr, char* enddata, int* len);
+static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net);
+
+/**
+ * Reads one MQTT packet from a socket.
+ * @param socket a socket from which to read an MQTT packet
+ * @param error pointer to the error code which is completed if no packet is returned
+ * @return the packet structure or NULL if there was an error
+ */
+void* MQTTPacket_Factory(networkHandles* net, int* error)
+{
+ char* data = NULL;
+ static Header header;
+ size_t remaining_length;
+ int ptype;
+ void* pack = NULL;
+ size_t actual_len = 0;
+
+ FUNC_ENTRY;
+ *error = SOCKET_ERROR; /* indicate whether an error occurred, or not */
+
+ /* read the packet data from the socket */
+#if defined(OPENSSL)
+ *error = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, &header.byte) : Socket_getch(net->socket, &header.byte);
+#else
+ *error = Socket_getch(net->socket, &header.byte);
+#endif
+ if (*error != TCPSOCKET_COMPLETE) /* first byte is the header byte */
+ goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
+
+ /* now read the remaining length, so we know how much more to read */
+ if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE)
+ goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
+
+ /* now read the rest, the variable header and payload */
+#if defined(OPENSSL)
+ data = (net->ssl) ? SSLSocket_getdata(net->ssl, net->socket, remaining_length, &actual_len) :
+ Socket_getdata(net->socket, remaining_length, &actual_len);
+#else
+ data = Socket_getdata(net->socket, remaining_length, &actual_len);
+#endif
+ if (data == NULL)
+ {
+ *error = SOCKET_ERROR;
+ goto exit; /* socket error */
+ }
+
+ if (actual_len != remaining_length)
+ *error = TCPSOCKET_INTERRUPTED;
+ else
+ {
+ ptype = header.bits.type;
+ if (ptype < CONNECT || ptype > DISCONNECT || new_packets[ptype] == NULL)
+ Log(TRACE_MIN, 2, NULL, ptype);
+ else
+ {
+ if ((pack = (*new_packets[ptype])(header.byte, data, remaining_length)) == NULL)
+ *error = BAD_MQTT_PACKET;
+#if !defined(NO_PERSISTENCE)
+ else if (header.bits.type == PUBLISH && header.bits.qos == 2)
+ {
+ int buf0len;
+ char *buf = malloc(10);
+ buf[0] = header.byte;
+ buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length);
+ *error = MQTTPersistence_put(net->socket, buf, buf0len, 1,
+ &data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1);
+ free(buf);
+ }
+#endif
+ }
+ }
+ if (pack)
+ time(&(net->lastReceived));
+exit:
+ FUNC_EXIT_RC(*error);
+ return pack;
+}
+
+
+/**
+ * Sends an MQTT packet in one system call write
+ * @param socket the socket to which to write the data
+ * @param header the one-byte MQTT header
+ * @param buffer the rest of the buffer to write (not including remaining length)
+ * @param buflen the length of the data in buffer to be written
+ * @return the completion code (TCPSOCKET_COMPLETE etc)
+ */
+int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData)
+{
+ int rc;
+ size_t buf0len;
+ char *buf;
+
+ FUNC_ENTRY;
+ buf = malloc(10);
+ buf[0] = header.byte;
+ buf0len = 1 + MQTTPacket_encode(&buf[1], buflen);
+#if !defined(NO_PERSISTENCE)
+ if (header.bits.type == PUBREL)
+ {
+ char* ptraux = buffer;
+ int msgId = readInt(&ptraux);
+ rc = MQTTPersistence_put(net->socket, buf, buf0len, 1, &buffer, &buflen,
+ header.bits.type, msgId, 0);
+ }
+#endif
+
+#if defined(OPENSSL)
+ if (net->ssl)
+ rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, 1, &buffer, &buflen, &freeData);
+ else
+#endif
+ rc = Socket_putdatas(net->socket, buf, buf0len, 1, &buffer, &buflen, &freeData);
+
+ if (rc == TCPSOCKET_COMPLETE)
+ time(&(net->lastSent));
+
+ if (rc != TCPSOCKET_INTERRUPTED)
+ free(buf);
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Sends an MQTT packet from multiple buffers in one system call write
+ * @param socket the socket to which to write the data
+ * @param header the one-byte MQTT header
+ * @param count the number of buffers
+ * @param buffers the rest of the buffers to write (not including remaining length)
+ * @param buflens the lengths of the data in the array of buffers to be written
+ * @return the completion code (TCPSOCKET_COMPLETE etc)
+ */
+int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees)
+{
+ int i, rc;
+ size_t buf0len, total = 0;
+ char *buf;
+
+ FUNC_ENTRY;
+ buf = malloc(10);
+ buf[0] = header.byte;
+ for (i = 0; i < count; i++)
+ total += buflens[i];
+ buf0len = 1 + MQTTPacket_encode(&buf[1], total);
+#if !defined(NO_PERSISTENCE)
+ if (header.bits.type == PUBLISH && header.bits.qos != 0)
+ { /* persist PUBLISH QoS1 and Qo2 */
+ char *ptraux = buffers[2];
+ int msgId = readInt(&ptraux);
+ rc = MQTTPersistence_put(net->socket, buf, buf0len, count, buffers, buflens,
+ header.bits.type, msgId, 0);
+ }
+#endif
+#if defined(OPENSSL)
+ if (net->ssl)
+ rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, count, buffers, buflens, frees);
+ else
+#endif
+ rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens, frees);
+
+ if (rc == TCPSOCKET_COMPLETE)
+ time(&(net->lastSent));
+
+ if (rc != TCPSOCKET_INTERRUPTED)
+ free(buf);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Encodes the message length according to the MQTT algorithm
+ * @param buf the buffer into which the encoded data is written
+ * @param length the length to be encoded
+ * @return the number of bytes written to buffer
+ */
+int MQTTPacket_encode(char* buf, size_t length)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ do
+ {
+ char d = length % 128;
+ length /= 128;
+ /* if there are more digits to encode, set the top bit of this digit */
+ if (length > 0)
+ d |= 0x80;
+ buf[rc++] = d;
+ } while (length > 0);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Decodes the message length according to the MQTT algorithm
+ * @param socket the socket from which to read the bytes
+ * @param value the decoded length returned
+ * @return the number of bytes read from the socket
+ */
+int MQTTPacket_decode(networkHandles* net, size_t* value)
+{
+ int rc = SOCKET_ERROR;
+ char c;
+ int multiplier = 1;
+ int len = 0;
+#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
+
+ FUNC_ENTRY;
+ *value = 0;
+ do
+ {
+ if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+ {
+ rc = SOCKET_ERROR; /* bad data */
+ goto exit;
+ }
+#if defined(OPENSSL)
+ rc = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, &c) : Socket_getch(net->socket, &c);
+#else
+ rc = Socket_getch(net->socket, &c);
+#endif
+ if (rc != TCPSOCKET_COMPLETE)
+ goto exit;
+ *value += (c & 127) * multiplier;
+ multiplier *= 128;
+ } while ((c & 128) != 0);
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Calculates an integer from two bytes read from the input buffer
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @return the integer value calculated
+ */
+int readInt(char** pptr)
+{
+ char* ptr = *pptr;
+ int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
+ *pptr += 2;
+ return len;
+}
+
+
+/**
+ * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
+ * a length delimited string. So it reads the two byte length then the data according to
+ * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
+ * by an incorrect length.
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @param enddata pointer to the end of the buffer not to be read beyond
+ * @param len returns the calculcated value of the length bytes read
+ * @return an allocated C string holding the characters read, or NULL if the length read would
+ * have caused an overrun.
+ *
+ */
+static char* readUTFlen(char** pptr, char* enddata, int* len)
+{
+ char* string = NULL;
+
+ FUNC_ENTRY;
+ if (enddata - (*pptr) > 1) /* enough length to read the integer? */
+ {
+ *len = readInt(pptr);
+ if (&(*pptr)[*len] <= enddata)
+ {
+ string = malloc(*len+1);
+ memcpy(string, *pptr, *len);
+ string[*len] = '\0';
+ *pptr += *len;
+ }
+ }
+ FUNC_EXIT;
+ return string;
+}
+
+
+/**
+ * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
+ * a length delimited string. So it reads the two byte length then the data according to
+ * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
+ * by an incorrect length.
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @param enddata pointer to the end of the buffer not to be read beyond
+ * @return an allocated C string holding the characters read, or NULL if the length read would
+ * have caused an overrun.
+ */
+char* readUTF(char** pptr, char* enddata)
+{
+ int len;
+ return readUTFlen(pptr, enddata, &len);
+}
+
+
+/**
+ * Reads one character from the input buffer.
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @return the character read
+ */
+unsigned char readChar(char** pptr)
+{
+ unsigned char c = **pptr;
+ (*pptr)++;
+ return c;
+}
+
+
+/**
+ * Writes one character to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param c the character to write
+ */
+void writeChar(char** pptr, char c)
+{
+ **pptr = c;
+ (*pptr)++;
+}
+
+
+/**
+ * Writes an integer as 2 bytes to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param anInt the integer to write
+ */
+void writeInt(char** pptr, int anInt)
+{
+ **pptr = (char)(anInt / 256);
+ (*pptr)++;
+ **pptr = (char)(anInt % 256);
+ (*pptr)++;
+}
+
+
+/**
+ * Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param string the C string to write
+ */
+void writeUTF(char** pptr, const char* string)
+{
+ size_t len = strlen(string);
+ writeInt(pptr, (int)len);
+ memcpy(*pptr, string, len);
+ *pptr += len;
+}
+
+
+/**
+ * Writes length delimited data to an output buffer
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param data the data to write
+ * @param datalen the length of the data to write
+ */
+void writeData(char** pptr, const void* data, int datalen)
+{
+ writeInt(pptr, datalen);
+ memcpy(*pptr, data, datalen);
+ *pptr += datalen;
+}
+
+
+/**
+ * Function used in the new packets table to create packets which have only a header.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen)
+{
+ static unsigned char header = 0;
+ header = aHeader;
+ return &header;
+}
+
+
+/**
+ * Send an MQTT disconnect packet down a socket.
+ * @param socket the open socket to send the data to
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID)
+{
+ Header header;
+ int rc = 0;
+
+ FUNC_ENTRY;
+ header.byte = 0;
+ header.bits.type = DISCONNECT;
+ rc = MQTTPacket_send(net, header, NULL, 0, 0);
+ Log(LOG_PROTOCOL, 28, NULL, net->socket, clientID, rc);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create publish packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen)
+{
+ Publish* pack = malloc(sizeof(Publish));
+ char* curdata = data;
+ char* enddata = &data[datalen];
+
+ FUNC_ENTRY;
+ pack->header.byte = aHeader;
+ if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
+ {
+ free(pack);
+ pack = NULL;
+ goto exit;
+ }
+ if (pack->header.bits.qos > 0) /* Msgid only exists for QoS 1 or 2 */
+ pack->msgId = readInt(&curdata);
+ else
+ pack->msgId = 0;
+ pack->payload = curdata;
+ pack->payloadlen = (int)(datalen-(curdata-data));
+exit:
+ FUNC_EXIT;
+ return pack;
+}
+
+
+/**
+ * Free allocated storage for a publish packet.
+ * @param pack pointer to the publish packet structure
+ */
+void MQTTPacket_freePublish(Publish* pack)
+{
+ FUNC_ENTRY;
+ if (pack->topic != NULL)
+ free(pack->topic);
+ free(pack);
+ FUNC_EXIT;
+}
+
+
+/**
+ * Send an MQTT acknowledgement packet down a socket.
+ * @param type the MQTT packet type e.g. SUBACK
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param net the network handle to send the data to
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net)
+{
+ Header header;
+ int rc;
+ char *buf = malloc(2);
+ char *ptr = buf;
+
+ FUNC_ENTRY;
+ header.byte = 0;
+ header.bits.type = type;
+ header.bits.dup = dup;
+ if (type == PUBREL)
+ header.bits.qos = 1;
+ writeInt(&ptr, msgid);
+ if ((rc = MQTTPacket_send(net, header, buf, 2, 1)) != TCPSOCKET_INTERRUPTED)
+ free(buf);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Send an MQTT PUBACK packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ rc = MQTTPacket_send_ack(PUBACK, msgid, 0, net);
+ Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Free allocated storage for a suback packet.
+ * @param pack pointer to the suback packet structure
+ */
+void MQTTPacket_freeSuback(Suback* pack)
+{
+ FUNC_ENTRY;
+ if (pack->qoss != NULL)
+ ListFree(pack->qoss);
+ free(pack);
+ FUNC_EXIT;
+}
+
+
+/**
+ * Send an MQTT PUBREC packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ rc = MQTTPacket_send_ack(PUBREC, msgid, 0, net);
+ Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Send an MQTT PUBREL packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ rc = MQTTPacket_send_ack(PUBREL, msgid, dup, net);
+ Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Send an MQTT PUBCOMP packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ rc = MQTTPacket_send_ack(PUBCOMP, msgid, 0, net);
+ Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create acknowledgement packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen)
+{
+ Ack* pack = malloc(sizeof(Ack));
+ char* curdata = data;
+
+ FUNC_ENTRY;
+ pack->header.byte = aHeader;
+ pack->msgId = readInt(&curdata);
+ FUNC_EXIT;
+ return pack;
+}
+
+
+/**
+ * Send an MQTT PUBLISH packet down a socket.
+ * @param pack a structure from which to get some values to use, e.g topic, payload
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param qos the value to use for the MQTT QoS setting
+ * @param retained boolean - whether to set the MQTT retained flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID)
+{
+ Header header;
+ char *topiclen;
+ int rc = -1;
+
+ FUNC_ENTRY;
+ topiclen = malloc(2);
+
+ header.bits.type = PUBLISH;
+ header.bits.dup = dup;
+ header.bits.qos = qos;
+ header.bits.retain = retained;
+ if (qos > 0)
+ {
+ char *buf = malloc(2);
+ char *ptr = buf;
+ char* bufs[4] = {topiclen, pack->topic, buf, pack->payload};
+ size_t lens[4] = {2, strlen(pack->topic), 2, pack->payloadlen};
+ int frees[4] = {1, 0, 1, 0};
+
+ writeInt(&ptr, pack->msgId);
+ ptr = topiclen;
+ writeInt(&ptr, (int)lens[1]);
+ rc = MQTTPacket_sends(net, header, 4, bufs, lens, frees);
+ if (rc != TCPSOCKET_INTERRUPTED)
+ free(buf);
+ }
+ else
+ {
+ char* ptr = topiclen;
+ char* bufs[3] = {topiclen, pack->topic, pack->payload};
+ size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
+ int frees[3] = {1, 0, 0};
+
+ writeInt(&ptr, (int)lens[1]);
+ rc = MQTTPacket_sends(net, header, 3, bufs, lens, frees);
+ }
+ if (rc != TCPSOCKET_INTERRUPTED)
+ free(topiclen);
+ if (qos == 0)
+ Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc);
+ else
+ Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc,
+ min(20, pack->payloadlen), pack->payload);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Free allocated storage for a various packet tyoes
+ * @param pack pointer to the suback packet structure
+ */
+void MQTTPacket_free_packet(MQTTPacket* pack)
+{
+ FUNC_ENTRY;
+ if (pack->header.bits.type == PUBLISH)
+ MQTTPacket_freePublish((Publish*)pack);
+ /*else if (pack->header.type == SUBSCRIBE)
+ MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
+ else if (pack->header.type == UNSUBSCRIBE)
+ MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
+ else
+ free(pack);
+ FUNC_EXIT;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.h b/thirdparty/paho.mqtt.c/src/MQTTPacket.h
new file mode 100644
index 0000000..8bad955
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.h
@@ -0,0 +1,262 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs, Allan Stockdill-Mander - SSL updates
+ * Ian Craggs - MQTT 3.1.1 support
+ * Ian Craggs - big endian Linux reversed definition
+ *******************************************************************************/
+
+#if !defined(MQTTPACKET_H)
+#define MQTTPACKET_H
+
+#include "Socket.h"
+#if defined(OPENSSL)
+#include "SSLSocket.h"
+#endif
+#include "LinkedList.h"
+#include "Clients.h"
+
+/*BE
+include "Socket"
+include "LinkedList"
+include "Clients"
+BE*/
+
+typedef unsigned int bool;
+typedef void* (*pf)(unsigned char, char*, size_t);
+
+#define BAD_MQTT_PACKET -4
+
+enum msgTypes
+{
+ CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
+ PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
+ PINGREQ, PINGRESP, DISCONNECT
+};
+
+#if defined(__linux__)
+#include <endian.h>
+#if __BYTE_ORDER == __BIG_ENDIAN
+ #define REVERSED 1
+#endif
+#endif
+
+/**
+ * Bitfields for the MQTT header byte.
+ */
+typedef union
+{
+ /*unsigned*/ char byte; /**< the whole byte */
+#if defined(REVERSED)
+ struct
+ {
+ unsigned int type : 4; /**< message type nibble */
+ bool dup : 1; /**< DUP flag bit */
+ unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */
+ bool retain : 1; /**< retained flag bit */
+ } bits;
+#else
+ struct
+ {
+ bool retain : 1; /**< retained flag bit */
+ unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */
+ bool dup : 1; /**< DUP flag bit */
+ unsigned int type : 4; /**< message type nibble */
+ } bits;
+#endif
+} Header;
+
+
+/**
+ * Data for a connect packet.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+ union
+ {
+ unsigned char all; /**< all connect flags */
+#if defined(REVERSED)
+ struct
+ {
+ bool username : 1; /**< 3.1 user name */
+ bool password : 1; /**< 3.1 password */
+ bool willRetain : 1; /**< will retain setting */
+ unsigned int willQoS : 2; /**< will QoS value */
+ bool will : 1; /**< will flag */
+ bool cleanstart : 1; /**< cleansession flag */
+ int : 1; /**< unused */
+ } bits;
+#else
+ struct
+ {
+ int : 1; /**< unused */
+ bool cleanstart : 1; /**< cleansession flag */
+ bool will : 1; /**< will flag */
+ unsigned int willQoS : 2; /**< will QoS value */
+ bool willRetain : 1; /**< will retain setting */
+ bool password : 1; /**< 3.1 password */
+ bool username : 1; /**< 3.1 user name */
+ } bits;
+#endif
+ } flags; /**< connect flags byte */
+
+ char *Protocol, /**< MQTT protocol name */
+ *clientID, /**< string client id */
+ *willTopic, /**< will topic */
+ *willMsg; /**< will payload */
+
+ int keepAliveTimer; /**< keepalive timeout value in seconds */
+ unsigned char version; /**< MQTT version number */
+} Connect;
+
+
+/**
+ * Data for a connack packet.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+ union
+ {
+ unsigned char all; /**< all connack flags */
+#if defined(REVERSED)
+ struct
+ {
+ unsigned int reserved : 7; /**< message type nibble */
+ bool sessionPresent : 1; /**< was a session found on the server? */
+ } bits;
+#else
+ struct
+ {
+ bool sessionPresent : 1; /**< was a session found on the server? */
+ unsigned int reserved : 7; /**< message type nibble */
+ } bits;
+#endif
+ } flags; /**< connack flags byte */
+ char rc; /**< connack return code */
+} Connack;
+
+
+/**
+ * Data for a packet with header only.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+} MQTTPacket;
+
+
+/**
+ * Data for a subscribe packet.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+ int msgId; /**< MQTT message id */
+ List* topics; /**< list of topic strings */
+ List* qoss; /**< list of corresponding QoSs */
+ int noTopics; /**< topic and qos count */
+} Subscribe;
+
+
+/**
+ * Data for a suback packet.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+ int msgId; /**< MQTT message id */
+ List* qoss; /**< list of granted QoSs */
+} Suback;
+
+
+/**
+ * Data for an unsubscribe packet.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+ int msgId; /**< MQTT message id */
+ List* topics; /**< list of topic strings */
+ int noTopics; /**< topic count */
+} Unsubscribe;
+
+
+/**
+ * Data for a publish packet.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+ char* topic; /**< topic string */
+ int topiclen;
+ int msgId; /**< MQTT message id */
+ char* payload; /**< binary payload, length delimited */
+ int payloadlen; /**< payload length */
+} Publish;
+
+
+/**
+ * Data for one of the ack packets.
+ */
+typedef struct
+{
+ Header header; /**< MQTT header byte */
+ int msgId; /**< MQTT message id */
+} Ack;
+
+typedef Ack Puback;
+typedef Ack Pubrec;
+typedef Ack Pubrel;
+typedef Ack Pubcomp;
+typedef Ack Unsuback;
+
+int MQTTPacket_encode(char* buf, size_t length);
+int MQTTPacket_decode(networkHandles* net, size_t* value);
+int readInt(char** pptr);
+char* readUTF(char** pptr, char* enddata);
+unsigned char readChar(char** pptr);
+void writeChar(char** pptr, char c);
+void writeInt(char** pptr, int anInt);
+void writeUTF(char** pptr, const char* string);
+void writeData(char** pptr, const void* data, int datalen);
+
+const char* MQTTPacket_name(int ptype);
+
+void* MQTTPacket_Factory(networkHandles* net, int* error);
+int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int free);
+int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees);
+
+void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen);
+int MQTTPacket_send_disconnect(networkHandles* net, const char* clientID);
+
+void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen);
+void MQTTPacket_freePublish(Publish* pack);
+int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID);
+int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID);
+void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen);
+
+void MQTTPacket_freeSuback(Suback* pack);
+int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID);
+int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID);
+int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID);
+
+void MQTTPacket_free_packet(MQTTPacket* pack);
+
+#if !defined(NO_BRIDGE)
+ #include "MQTTPacketOut.h"
+#endif
+
+#endif /* MQTTPACKET_H */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
new file mode 100644
index 0000000..b924085
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
@@ -0,0 +1,269 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs, Allan Stockdill-Mander - SSL updates
+ * Ian Craggs - MQTT 3.1.1 support
+ * Rong Xiang, Ian Craggs - C++ compatibility
+ * Ian Craggs - binary password and will payload
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief functions to deal with reading and writing of MQTT packets from and to sockets
+ *
+ * Some other related functions are in the MQTTPacket module
+ */
+
+
+#include "MQTTPacketOut.h"
+#include "Log.h"
+#include "StackTrace.h"
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "Heap.h"
+
+
+/**
+ * Send an MQTT CONNECT packet down a socket.
+ * @param client a structure from which to get all the required values
+ * @param MQTTVersion the MQTT version to connect with
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
+{
+ char *buf, *ptr;
+ Connect packet;
+ int rc = -1, len;
+
+ FUNC_ENTRY;
+ packet.header.byte = 0;
+ packet.header.bits.type = CONNECT;
+
+ len = ((MQTTVersion == 3) ? 12 : 10) + (int)strlen(client->clientID)+2;
+ if (client->will)
+ len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
+ if (client->username)
+ len += (int)strlen(client->username)+2;
+ if (client->password)
+ len += client->passwordlen+2;
+
+ ptr = buf = malloc(len);
+ if (MQTTVersion == 3)
+ {
+ writeUTF(&ptr, "MQIsdp");
+ writeChar(&ptr, (char)3);
+ }
+ else if (MQTTVersion == 4)
+ {
+ writeUTF(&ptr, "MQTT");
+ writeChar(&ptr, (char)4);
+ }
+ else
+ goto exit;
+
+ packet.flags.all = 0;
+ packet.flags.bits.cleanstart = client->cleansession;
+ packet.flags.bits.will = (client->will) ? 1 : 0;
+ if (packet.flags.bits.will)
+ {
+ packet.flags.bits.willQoS = client->will->qos;
+ packet.flags.bits.willRetain = client->will->retained;
+ }
+
+ if (client->username)
+ packet.flags.bits.username = 1;
+ if (client->password)
+ packet.flags.bits.password = 1;
+
+ writeChar(&ptr, packet.flags.all);
+ writeInt(&ptr, client->keepAliveInterval);
+ writeUTF(&ptr, client->clientID);
+ if (client->will)
+ {
+ writeUTF(&ptr, client->will->topic);
+ writeData(&ptr, client->will->payload, client->will->payloadlen);
+ }
+ if (client->username)
+ writeUTF(&ptr, client->username);
+ if (client->password)
+ writeData(&ptr, client->password, client->passwordlen);
+
+ rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1);
+ Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID, client->cleansession, rc);
+exit:
+ if (rc != TCPSOCKET_INTERRUPTED)
+ free(buf);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create connack packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen)
+{
+ Connack* pack = malloc(sizeof(Connack));
+ char* curdata = data;
+
+ FUNC_ENTRY;
+ pack->header.byte = aHeader;
+ pack->flags.all = readChar(&curdata);
+ pack->rc = readChar(&curdata);
+ FUNC_EXIT;
+ return pack;
+}
+
+
+/**
+ * Send an MQTT PINGREQ packet down a socket.
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
+{
+ Header header;
+ int rc = 0;
+ size_t buflen = 0;
+
+ FUNC_ENTRY;
+ header.byte = 0;
+ header.bits.type = PINGREQ;
+ rc = MQTTPacket_send(net, header, NULL, buflen,0);
+ Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Send an MQTT subscribe packet down a socket.
+ * @param topics list of topics
+ * @param qoss list of corresponding QoSs
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID)
+{
+ Header header;
+ char *data, *ptr;
+ int rc = -1;
+ ListElement *elem = NULL, *qosElem = NULL;
+ int datalen;
+
+ FUNC_ENTRY;
+ header.bits.type = SUBSCRIBE;
+ header.bits.dup = dup;
+ header.bits.qos = 1;
+ header.bits.retain = 0;
+
+ datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
+ while (ListNextElement(topics, &elem))
+ datalen += (int)strlen((char*)(elem->content));
+ ptr = data = malloc(datalen);
+
+ writeInt(&ptr, msgid);
+ elem = NULL;
+ while (ListNextElement(topics, &elem))
+ {
+ ListNextElement(qoss, &qosElem);
+ writeUTF(&ptr, (char*)(elem->content));
+ writeChar(&ptr, *(int*)(qosElem->content));
+ }
+ rc = MQTTPacket_send(net, header, data, datalen, 1);
+ Log(LOG_PROTOCOL, 22, NULL, net->socket, clientID, msgid, rc);
+ if (rc != TCPSOCKET_INTERRUPTED)
+ free(data);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create suback packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen)
+{
+ Suback* pack = malloc(sizeof(Suback));
+ char* curdata = data;
+
+ FUNC_ENTRY;
+ pack->header.byte = aHeader;
+ pack->msgId = readInt(&curdata);
+ pack->qoss = ListInitialize();
+ while ((size_t)(curdata - data) < datalen)
+ {
+ int* newint;
+ newint = malloc(sizeof(int));
+ *newint = (int)readChar(&curdata);
+ ListAppend(pack->qoss, newint, sizeof(int));
+ }
+ FUNC_EXIT;
+ return pack;
+}
+
+
+/**
+ * Send an MQTT unsubscribe packet down a socket.
+ * @param topics list of topics
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID)
+{
+ Header header;
+ char *data, *ptr;
+ int rc = -1;
+ ListElement *elem = NULL;
+ int datalen;
+
+ FUNC_ENTRY;
+ header.bits.type = UNSUBSCRIBE;
+ header.bits.dup = dup;
+ header.bits.qos = 1;
+ header.bits.retain = 0;
+
+ datalen = 2 + topics->count * 2; /* utf length == 2 */
+ while (ListNextElement(topics, &elem))
+ datalen += (int)strlen((char*)(elem->content));
+ ptr = data = malloc(datalen);
+
+ writeInt(&ptr, msgid);
+ elem = NULL;
+ while (ListNextElement(topics, &elem))
+ writeUTF(&ptr, (char*)(elem->content));
+ rc = MQTTPacket_send(net, header, data, datalen, 1);
+ Log(LOG_PROTOCOL, 25, NULL, net->socket, clientID, msgid, rc);
+ if (rc != TCPSOCKET_INTERRUPTED)
+ free(data);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
new file mode 100644
index 0000000..700db77
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs, Allan Stockdill-Mander - SSL updates
+ * Ian Craggs - MQTT 3.1.1 support
+ *******************************************************************************/
+
+#if !defined(MQTTPACKETOUT_H)
+#define MQTTPACKETOUT_H
+
+#include "MQTTPacket.h"
+
+int MQTTPacket_send_connect(Clients* client, int MQTTVersion);
+void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen);
+
+int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID);
+
+int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID);
+void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen);
+
+int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID);
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.c b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
new file mode 100644
index 0000000..24efb6d
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
@@ -0,0 +1,654 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - async client updates
+ * Ian Craggs - fix for bug 432903 - queue persistence
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief Functions that apply to persistence operations.
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include "MQTTPersistence.h"
+#include "MQTTPersistenceDefault.h"
+#include "MQTTProtocolClient.h"
+#include "Heap.h"
+
+
+static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen);
+static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
+
+/**
+ * Creates a ::MQTTClient_persistence structure representing a persistence implementation.
+ * @param persistence the ::MQTTClient_persistence structure.
+ * @param type the type of the persistence implementation. See ::MQTTClient_create.
+ * @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+#include "StackTrace.h"
+
+int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
+{
+ int rc = 0;
+ MQTTClient_persistence* per = NULL;
+
+ FUNC_ENTRY;
+#if !defined(NO_PERSISTENCE)
+ switch (type)
+ {
+ case MQTTCLIENT_PERSISTENCE_NONE :
+ per = NULL;
+ break;
+ case MQTTCLIENT_PERSISTENCE_DEFAULT :
+ per = malloc(sizeof(MQTTClient_persistence));
+ if ( per != NULL )
+ {
+ if ( pcontext != NULL )
+ {
+ per->context = malloc(strlen(pcontext) + 1);
+ strcpy(per->context, pcontext);
+ }
+ else
+ per->context = "."; /* working directory */
+ /* file system functions */
+ per->popen = pstopen;
+ per->pclose = pstclose;
+ per->pput = pstput;
+ per->pget = pstget;
+ per->premove = pstremove;
+ per->pkeys = pstkeys;
+ per->pclear = pstclear;
+ per->pcontainskey = pstcontainskey;
+ }
+ else
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ break;
+ case MQTTCLIENT_PERSISTENCE_USER :
+ per = (MQTTClient_persistence *)pcontext;
+ if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
+ per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
+ per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ break;
+ default:
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ break;
+ }
+#endif
+
+ *persistence = per;
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Open persistent store and restore any persisted messages.
+ * @param client the client as ::Clients.
+ * @param serverURI the URI of the remote end.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_initialize(Clients *c, const char *serverURI)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ if ( c->persistence != NULL )
+ {
+ rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
+ if ( rc == 0 )
+ rc = MQTTPersistence_restore(c);
+ }
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Close persistent store.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_close(Clients *c)
+{
+ int rc =0;
+
+ FUNC_ENTRY;
+ if (c->persistence != NULL)
+ {
+ rc = c->persistence->pclose(c->phandle);
+ c->phandle = NULL;
+#if !defined(NO_PERSISTENCE)
+ if ( c->persistence->popen == pstopen )
+ free(c->persistence);
+#endif
+ c->persistence = NULL;
+ }
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+/**
+ * Clears the persistent store.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_clear(Clients *c)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ if (c->persistence != NULL)
+ rc = c->persistence->pclear(c->phandle);
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Restores the persisted records to the outbound and inbound message queues of the
+ * client.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_restore(Clients *c)
+{
+ int rc = 0;
+ char **msgkeys = NULL,
+ *buffer = NULL;
+ int nkeys, buflen;
+ int i = 0;
+ int msgs_sent = 0;
+ int msgs_rcvd = 0;
+
+ FUNC_ENTRY;
+ if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
+ {
+ while (rc == 0 && i < nkeys)
+ {
+ if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0)
+ {
+ ;
+ }
+ else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0)
+ {
+ ;
+ }
+ else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
+ {
+ MQTTPacket* pack = MQTTPersistence_restorePacket(buffer, buflen);
+ if ( pack != NULL )
+ {
+ if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_RECEIVED) != NULL )
+ {
+ Publish* publish = (Publish*)pack;
+ Messages* msg = NULL;
+ msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain);
+ msg->nextMessageType = PUBREL;
+ /* order does not matter for persisted received messages */
+ ListAppend(c->inboundMsgs, msg, msg->len);
+ publish->topic = NULL;
+ MQTTPacket_freePublish(publish);
+ msgs_rcvd++;
+ }
+ else if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_SENT) != NULL )
+ {
+ Publish* publish = (Publish*)pack;
+ Messages* msg = NULL;
+ char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ sprintf(key, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
+ msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain);
+ if ( c->persistence->pcontainskey(c->phandle, key) == 0 )
+ /* PUBLISH Qo2 and PUBREL sent */
+ msg->nextMessageType = PUBCOMP;
+ /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
+ /* retry at the first opportunity */
+ msg->lastTouch = 0;
+ MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
+ publish->topic = NULL;
+ MQTTPacket_freePublish(publish);
+ free(key);
+ msgs_sent++;
+ }
+ else if ( strstr(msgkeys[i],PERSISTENCE_PUBREL) != NULL )
+ {
+ /* orphaned PUBRELs ? */
+ Pubrel* pubrel = (Pubrel*)pack;
+ char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
+ if ( c->persistence->pcontainskey(c->phandle, key) != 0 )
+ rc = c->persistence->premove(c->phandle, msgkeys[i]);
+ free(pubrel);
+ free(key);
+ }
+ }
+ else /* pack == NULL -> bad persisted record */
+ rc = c->persistence->premove(c->phandle, msgkeys[i]);
+ }
+ if (buffer)
+ {
+ free(buffer);
+ buffer = NULL;
+ }
+ if (msgkeys[i])
+ free(msgkeys[i]);
+ i++;
+ }
+ if (msgkeys)
+ free(msgkeys);
+ }
+ Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
+ msgs_sent, msgs_rcvd, c->clientID);
+ MQTTPersistence_wrapMsgID(c);
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Returns a MQTT packet restored from persisted data.
+ * @param buffer the persisted data.
+ * @param buflen the number of bytes of the data buffer.
+ */
+void* MQTTPersistence_restorePacket(char* buffer, size_t buflen)
+{
+ void* pack = NULL;
+ Header header;
+ int fixed_header_length = 1, ptype, remaining_length = 0;
+ char c;
+ int multiplier = 1;
+ extern pf new_packets[];
+
+ FUNC_ENTRY;
+ header.byte = buffer[0];
+ /* decode the message length according to the MQTT algorithm */
+ do
+ {
+ c = *(++buffer);
+ remaining_length += (c & 127) * multiplier;
+ multiplier *= 128;
+ fixed_header_length++;
+ } while ((c & 128) != 0);
+
+ if ( (fixed_header_length + remaining_length) == buflen )
+ {
+ ptype = header.bits.type;
+ if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
+ pack = (*new_packets[ptype])(header.byte, ++buffer, remaining_length);
+ }
+
+ FUNC_EXIT;
+ return pack;
+}
+
+
+/**
+ * Inserts the specified message into the list, maintaining message ID order.
+ * @param list the list to insert the message into.
+ * @param content the message to add.
+ * @param size size of the message.
+ */
+void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
+{
+ ListElement* index = NULL;
+ ListElement* current = NULL;
+
+ FUNC_ENTRY;
+ while(ListNextElement(list, ¤t) != NULL && index == NULL)
+ {
+ if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
+ index = current;
+ }
+
+ ListInsert(list, content, size, index);
+ FUNC_EXIT;
+}
+
+
+/**
+ * Adds a record to the persistent store. This function must not be called for QoS0
+ * messages.
+ * @param socket the socket of the client.
+ * @param buf0 fixed header.
+ * @param buf0len length of the fixed header.
+ * @param count number of buffers representing the variable header and/or the payload.
+ * @param buffers the buffers representing the variable header and/or the payload.
+ * @param buflens length of the buffers representing the variable header and/or the payload.
+ * @param msgId the message ID.
+ * @param scr 0 indicates message in the sending direction; 1 indicates message in the
+ * receiving direction.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count,
+ char** buffers, size_t* buflens, int htype, int msgId, int scr )
+{
+ int rc = 0;
+ extern ClientStates* bstate;
+ int nbufs, i;
+ int* lens = NULL;
+ char** bufs = NULL;
+ char *key;
+ Clients* client = NULL;
+
+ FUNC_ENTRY;
+ client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
+ if (client->persistence != NULL)
+ {
+ key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ nbufs = 1 + count;
+ lens = (int *)malloc(nbufs * sizeof(int));
+ bufs = (char **)malloc(nbufs * sizeof(char *));
+ lens[0] = (int)buf0len;
+ bufs[0] = buf0;
+ for (i = 0; i < count; i++)
+ {
+ lens[i+1] = (int)buflens[i];
+ bufs[i+1] = buffers[i];
+ }
+
+ /* key */
+ if ( scr == 0 )
+ { /* sending */
+ if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
+ sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId);
+ if (htype == PUBREL) /* PUBREL */
+ sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId);
+ }
+ if ( scr == 1 ) /* receiving PUBLISH QoS2 */
+ sprintf(key, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId);
+
+ rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
+
+ free(key);
+ free(lens);
+ free(bufs);
+ }
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Deletes a record from the persistent store.
+ * @param client the client as ::Clients.
+ * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
+ * or #PERSISTENCE_PUBLISH_RECEIVED.
+ * @param qos the qos field of the message.
+ * @param msgId the message ID.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ if (c->persistence != NULL)
+ {
+ char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ if ( (strcmp(type,PERSISTENCE_PUBLISH_SENT) == 0) && qos == 2 )
+ {
+ sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId) ;
+ rc = c->persistence->premove(c->phandle, key);
+ sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId) ;
+ rc = c->persistence->premove(c->phandle, key);
+ }
+ else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
+ { /* or PERSISTENCE_PUBLISH_RECEIVED */
+ sprintf(key, "%s%d", type, msgId) ;
+ rc = c->persistence->premove(c->phandle, key);
+ }
+ free(key);
+ }
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
+ * message IDs in the outboundMsgs queue.
+ * @param client the client as ::Clients.
+ */
+void MQTTPersistence_wrapMsgID(Clients *client)
+{
+ ListElement* wrapel = NULL;
+ ListElement* current = NULL;
+
+ FUNC_ENTRY;
+ if ( client->outboundMsgs->count > 0 )
+ {
+ int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
+ int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
+ int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
+ current = ListNextElement(client->outboundMsgs, ¤t);
+
+ while(ListNextElement(client->outboundMsgs, ¤t) != NULL)
+ {
+ int curMsgID = ((Messages*)current->content)->msgid;
+ int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
+ int curgap = curMsgID - curPrevMsgID;
+ if ( curgap > gap )
+ {
+ gap = curgap;
+ wrapel = current;
+ }
+ }
+ }
+
+ if ( wrapel != NULL )
+ {
+ /* put wrapel at the beginning of the queue */
+ client->outboundMsgs->first->prev = client->outboundMsgs->last;
+ client->outboundMsgs->last->next = client->outboundMsgs->first;
+ client->outboundMsgs->first = wrapel;
+ client->outboundMsgs->last = wrapel->prev;
+ client->outboundMsgs->first->prev = NULL;
+ client->outboundMsgs->last->next = NULL;
+ }
+ FUNC_EXIT;
+}
+
+
+#if !defined(NO_PERSISTENCE)
+int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
+{
+ int rc = 0;
+ char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
+
+ FUNC_ENTRY;
+ sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
+ if ((rc = client->persistence->premove(client->phandle, key)) != 0)
+ Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
+{
+ int rc = 0;
+ int nbufs = 8;
+ int bufindex = 0;
+ char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
+ int* lens = NULL;
+ void** bufs = NULL;
+
+ FUNC_ENTRY;
+ lens = (int*)malloc(nbufs * sizeof(int));
+ bufs = malloc(nbufs * sizeof(char *));
+
+ bufs[bufindex] = &qe->msg->payloadlen;
+ lens[bufindex++] = sizeof(qe->msg->payloadlen);
+
+ bufs[bufindex] = qe->msg->payload;
+ lens[bufindex++] = qe->msg->payloadlen;
+
+ bufs[bufindex] = &qe->msg->qos;
+ lens[bufindex++] = sizeof(qe->msg->qos);
+
+ bufs[bufindex] = &qe->msg->retained;
+ lens[bufindex++] = sizeof(qe->msg->retained);
+
+ bufs[bufindex] = &qe->msg->dup;
+ lens[bufindex++] = sizeof(qe->msg->dup);
+
+ bufs[bufindex] = &qe->msg->msgid;
+ lens[bufindex++] = sizeof(qe->msg->msgid);
+
+ bufs[bufindex] = qe->topicName;
+ lens[bufindex++] = (int)strlen(qe->topicName) + 1;
+
+ bufs[bufindex] = &qe->topicLen;
+ lens[bufindex++] = sizeof(qe->topicLen);
+
+ sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno);
+ qe->seqno = aclient->qentry_seqno;
+
+ if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, (char**)bufs, lens)) != 0)
+ Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
+
+ free(lens);
+ free(bufs);
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen)
+{
+ MQTTPersistence_qEntry* qe = NULL;
+ char* ptr = buffer;
+ int data_size;
+
+ FUNC_ENTRY;
+ qe = malloc(sizeof(MQTTPersistence_qEntry));
+ memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
+
+ qe->msg = malloc(sizeof(MQTTPersistence_message));
+ memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
+
+ qe->msg->payloadlen = *(int*)ptr;
+ ptr += sizeof(int);
+
+ data_size = qe->msg->payloadlen;
+ qe->msg->payload = malloc(data_size);
+ memcpy(qe->msg->payload, ptr, data_size);
+ ptr += data_size;
+
+ qe->msg->qos = *(int*)ptr;
+ ptr += sizeof(int);
+
+ qe->msg->retained = *(int*)ptr;
+ ptr += sizeof(int);
+
+ qe->msg->dup = *(int*)ptr;
+ ptr += sizeof(int);
+
+ qe->msg->msgid = *(int*)ptr;
+ ptr += sizeof(int);
+
+ data_size = (int)strlen(ptr) + 1;
+ qe->topicName = malloc(data_size);
+ strcpy(qe->topicName, ptr);
+ ptr += data_size;
+
+ qe->topicLen = *(int*)ptr;
+ ptr += sizeof(int);
+
+ FUNC_EXIT;
+ return qe;
+}
+
+
+static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
+{
+ ListElement* index = NULL;
+ ListElement* current = NULL;
+
+ FUNC_ENTRY;
+ while (ListNextElement(list, ¤t) != NULL && index == NULL)
+ {
+ if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
+ index = current;
+ }
+ ListInsert(list, qEntry, size, index);
+ FUNC_EXIT;
+}
+
+
+/**
+ * Restores a queue of messages from persistence to memory
+ * @param c the client as ::Clients - the client object to restore the messages to
+ * @return return code, 0 if successful
+ */
+int MQTTPersistence_restoreMessageQueue(Clients* c)
+{
+ int rc = 0;
+ char **msgkeys;
+ int nkeys;
+ int i = 0;
+ int entries_restored = 0;
+
+ FUNC_ENTRY;
+ if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
+ {
+ while (rc == 0 && i < nkeys)
+ {
+ char *buffer = NULL;
+ int buflen;
+
+ if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0)
+ {
+ ;
+ }
+ else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
+ {
+ MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen);
+
+ if (qe)
+ {
+ qe->seqno = atoi(msgkeys[i]+2);
+ MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
+ free(buffer);
+ c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
+ entries_restored++;
+ }
+ }
+ if (msgkeys[i])
+ {
+ free(msgkeys[i]);
+ }
+ i++;
+ }
+ if (msgkeys != NULL)
+ free(msgkeys);
+ }
+ Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.h b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
new file mode 100644
index 0000000..9a938ba
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - async client updates
+ * Ian Craggs - fix for bug 432903 - queue persistence
+ *******************************************************************************/
+
+#if defined(__cplusplus)
+ extern "C" {
+#endif
+
+#include "Clients.h"
+
+/** Stem of the key for a sent PUBLISH QoS1 or QoS2 */
+#define PERSISTENCE_PUBLISH_SENT "s-"
+/** Stem of the key for a sent PUBREL */
+#define PERSISTENCE_PUBREL "sc-"
+/** Stem of the key for a received PUBLISH QoS2 */
+#define PERSISTENCE_PUBLISH_RECEIVED "r-"
+/** Stem of the key for an async client command */
+#define PERSISTENCE_COMMAND_KEY "c-"
+/** Stem of the key for an async client message queue */
+#define PERSISTENCE_QUEUE_KEY "q-"
+#define PERSISTENCE_MAX_KEY_LENGTH 8
+
+int MQTTPersistence_create(MQTTClient_persistence** per, int type, void* pcontext);
+int MQTTPersistence_initialize(Clients* c, const char* serverURI);
+int MQTTPersistence_close(Clients* c);
+int MQTTPersistence_clear(Clients* c);
+int MQTTPersistence_restore(Clients* c);
+void* MQTTPersistence_restorePacket(char* buffer, size_t buflen);
+void MQTTPersistence_insertInOrder(List* list, void* content, size_t size);
+int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count,
+ char** buffers, size_t* buflens, int htype, int msgId, int scr);
+int MQTTPersistence_remove(Clients* c, char* type, int qos, int msgId);
+void MQTTPersistence_wrapMsgID(Clients *c);
+
+typedef struct
+{
+ char struct_id[4];
+ int struct_version;
+ int payloadlen;
+ void* payload;
+ int qos;
+ int retained;
+ int dup;
+ int msgid;
+} MQTTPersistence_message;
+
+typedef struct
+{
+ MQTTPersistence_message* msg;
+ char* topicName;
+ int topicLen;
+ unsigned int seqno; /* only used on restore */
+} MQTTPersistence_qEntry;
+
+int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe);
+int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe);
+int MQTTPersistence_restoreMessageQueue(Clients* c);
+#ifdef __cplusplus
+ }
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
new file mode 100644
index 0000000..35c1f53
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
@@ -0,0 +1,841 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2016 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - async client updates
+ * Ian Craggs - fix for bug 484496
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief A file system based persistence implementation.
+ *
+ * A directory is specified when the MQTT client is created. When the persistence is then
+ * opened (see ::Persistence_open), a sub-directory is made beneath the base for this
+ * particular client ID and connection key. This allows one persistence base directory to
+ * be shared by multiple clients.
+ *
+ */
+
+#if !defined(NO_PERSISTENCE)
+
+#include "OsWrapper.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#if defined(WIN32) || defined(WIN64)
+ #include <direct.h>
+ /* Windows doesn't have strtok_r, so remap it to strtok */
+ #define strtok_r( A, B, C ) strtok( A, B )
+ int keysWin32(char *, char ***, int *);
+ int clearWin32(char *);
+ int containskeyWin32(char *, char *);
+#else
+ #include <sys/stat.h>
+ #include <dirent.h>
+ #include <unistd.h>
+ int keysUnix(char *, char ***, int *);
+ int clearUnix(char *);
+ int containskeyUnix(char *, char *);
+#endif
+
+#include "MQTTClientPersistence.h"
+#include "MQTTPersistenceDefault.h"
+#include "StackTrace.h"
+#include "Heap.h"
+
+/** Create persistence directory for the client: context/clientID-serverURI.
+ * See ::Persistence_open
+ */
+
+int pstopen(void **handle, const char* clientID, const char* serverURI, void* context)
+{
+ int rc = 0;
+ char *dataDir = context;
+ char *clientDir;
+ char *pToken = NULL;
+ char *save_ptr = NULL;
+ char *pCrtDirName = NULL;
+ char *pTokDirName = NULL;
+ char *perserverURI = NULL, *ptraux;
+
+ FUNC_ENTRY;
+ /* Note that serverURI=address:port, but ":" not allowed in Windows directories */
+ perserverURI = malloc(strlen(serverURI) + 1);
+ strcpy(perserverURI, serverURI);
+ while ((ptraux = strstr(perserverURI, ":")) != NULL)
+ *ptraux = '-' ;
+
+ /* consider '/' + '-' + '\0' */
+ clientDir = malloc(strlen(dataDir) + strlen(clientID) + strlen(perserverURI) + 3);
+ sprintf(clientDir, "%s/%s-%s", dataDir, clientID, perserverURI);
+
+
+ /* create clientDir directory */
+
+ /* pCrtDirName - holds the directory name we are currently trying to create. */
+ /* This gets built up level by level until the full path name is created.*/
+ /* pTokDirName - holds the directory name that gets used by strtok. */
+ pCrtDirName = (char*)malloc( strlen(clientDir) + 1 );
+ pTokDirName = (char*)malloc( strlen(clientDir) + 1 );
+ strcpy( pTokDirName, clientDir );
+
+ pToken = strtok_r( pTokDirName, "\\/", &save_ptr );
+
+ strcpy( pCrtDirName, pToken );
+ rc = pstmkdir( pCrtDirName );
+ pToken = strtok_r( NULL, "\\/", &save_ptr );
+ while ( (pToken != NULL) && (rc == 0) )
+ {
+ /* Append the next directory level and try to create it */
+ strcat( pCrtDirName, "/" );
+ strcat( pCrtDirName, pToken );
+ rc = pstmkdir( pCrtDirName );
+ pToken = strtok_r( NULL, "\\/", &save_ptr );
+ }
+
+ *handle = clientDir;
+
+ free(pTokDirName);
+ free(pCrtDirName);
+ free(perserverURI);
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+/** Function to create a directory.
+ * Returns 0 on success or if the directory already exists.
+ */
+int pstmkdir( char *pPathname )
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+ if ( _mkdir( pPathname ) != 0 )
+ {
+#else
+ /* Create a directory with read, write and execute access for the owner and read access for the group */
+#if !defined(_WRS_KERNEL)
+ if ( mkdir( pPathname, S_IRWXU | S_IRGRP ) != 0 )
+#else
+ if ( mkdir( pPathname ) != 0 )
+#endif /* !defined(_WRS_KERNEL) */
+ {
+#endif
+ if ( errno != EEXIST )
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ }
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+
+/** Write wire message to the client persistence directory.
+ * See ::Persistence_put
+ */
+int pstput(void* handle, char* key, int bufcount, char* buffers[], int buflens[])
+{
+ int rc = 0;
+ char *clientDir = handle;
+ char *file;
+ FILE *fp;
+ size_t bytesWritten = 0,
+ bytesTotal = 0;
+ int i;
+
+ FUNC_ENTRY;
+ if (clientDir == NULL)
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+ /* consider '/' + '\0' */
+ file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2 );
+ sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+ fp = fopen(file, "wb");
+ if ( fp != NULL )
+ {
+ for(i=0; i<bufcount; i++)
+ {
+ bytesTotal += buflens[i];
+ bytesWritten += fwrite(buffers[i], sizeof(char), buflens[i], fp );
+ }
+ fclose(fp);
+ fp = NULL;
+ } else
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+ if (bytesWritten != bytesTotal)
+ {
+ pstremove(handle, key);
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ }
+
+ free(file);
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+};
+
+
+/** Retrieve a wire message from the client persistence directory.
+ * See ::Persistence_get
+ */
+int pstget(void* handle, char* key, char** buffer, int* buflen)
+{
+ int rc = 0;
+ FILE *fp;
+ char *clientDir = handle;
+ char *file;
+ char *buf;
+ unsigned long fileLen = 0;
+ unsigned long bytesRead = 0;
+
+ FUNC_ENTRY;
+ if (clientDir == NULL)
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+ /* consider '/' + '\0' */
+ file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2);
+ sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+ fp = fopen(file, "rb");
+ if ( fp != NULL )
+ {
+ fseek(fp, 0, SEEK_END);
+ fileLen = ftell(fp);
+ fseek(fp, 0, SEEK_SET);
+ buf=(char *)malloc(fileLen);
+ bytesRead = (int)fread(buf, sizeof(char), fileLen, fp);
+ *buffer = buf;
+ *buflen = bytesRead;
+ if ( bytesRead != fileLen )
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ fclose(fp);
+ fp = NULL;
+ } else
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+ free(file);
+ /* the caller must free buf */
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+
+/** Delete a persisted message from the client persistence directory.
+ * See ::Persistence_remove
+ */
+int pstremove(void* handle, char* key)
+{
+ int rc = 0;
+ char *clientDir = handle;
+ char *file;
+
+ FUNC_ENTRY;
+ if (clientDir == NULL)
+ {
+ return rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+ /* consider '/' + '\0' */
+ file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2);
+ sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+#if defined(WIN32) || defined(WIN64)
+ if ( _unlink(file) != 0 )
+ {
+#else
+ if ( unlink(file) != 0 )
+ {
+#endif
+ if ( errno != ENOENT )
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ }
+
+ free(file);
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/** Delete client persistence directory (if empty).
+ * See ::Persistence_close
+ */
+int pstclose(void* handle)
+{
+ int rc = 0;
+ char *clientDir = handle;
+
+ FUNC_ENTRY;
+ if (clientDir == NULL)
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+#if defined(WIN32) || defined(WIN64)
+ if ( _rmdir(clientDir) != 0 )
+ {
+#else
+ if ( rmdir(clientDir) != 0 )
+ {
+#endif
+ if ( errno != ENOENT && errno != ENOTEMPTY )
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ }
+
+ free(clientDir);
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/** Returns whether if a wire message is persisted in the client persistence directory.
+ * See ::Persistence_containskey
+ */
+int pstcontainskey(void *handle, char *key)
+{
+ int rc = 0;
+ char *clientDir = handle;
+
+ FUNC_ENTRY;
+ if (clientDir == NULL)
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+#if defined(WIN32) || defined(WIN64)
+ rc = containskeyWin32(clientDir, key);
+#else
+ rc = containskeyUnix(clientDir, key);
+#endif
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int containskeyWin32(char *dirname, char *key)
+{
+ int notFound = MQTTCLIENT_PERSISTENCE_ERROR;
+ int fFinished = 0;
+ char *filekey, *ptraux;
+ char dir[MAX_PATH+1];
+ WIN32_FIND_DATAA FileData;
+ HANDLE hDir;
+
+ FUNC_ENTRY;
+ sprintf(dir, "%s/*", dirname);
+
+ hDir = FindFirstFileA(dir, &FileData);
+ if (hDir != INVALID_HANDLE_VALUE)
+ {
+ while (!fFinished)
+ {
+ if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+ {
+ filekey = malloc(strlen(FileData.cFileName) + 1);
+ strcpy(filekey, FileData.cFileName);
+ ptraux = strstr(filekey, MESSAGE_FILENAME_EXTENSION);
+ if ( ptraux != NULL )
+ *ptraux = '\0' ;
+ if(strcmp(filekey, key) == 0)
+ {
+ notFound = 0;
+ fFinished = 1;
+ }
+ free(filekey);
+ }
+ if (!FindNextFileA(hDir, &FileData))
+ {
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ fFinished = 1;
+ }
+ }
+ FindClose(hDir);
+ }
+
+ FUNC_EXIT_RC(notFound);
+ return notFound;
+}
+#else
+int containskeyUnix(char *dirname, char *key)
+{
+ int notFound = MQTTCLIENT_PERSISTENCE_ERROR;
+ char *filekey, *ptraux;
+ DIR *dp;
+ struct dirent *dir_entry;
+ struct stat stat_info;
+
+ FUNC_ENTRY;
+ if((dp = opendir(dirname)) != NULL)
+ {
+ while((dir_entry = readdir(dp)) != NULL && notFound)
+ {
+ char* filename = malloc(strlen(dirname) + strlen(dir_entry->d_name) + 2);
+ sprintf(filename, "%s/%s", dirname, dir_entry->d_name);
+ lstat(filename, &stat_info);
+ free(filename);
+ if(S_ISREG(stat_info.st_mode))
+ {
+ filekey = malloc(strlen(dir_entry->d_name) + 1);
+ strcpy(filekey, dir_entry->d_name);
+ ptraux = strstr(filekey, MESSAGE_FILENAME_EXTENSION);
+ if ( ptraux != NULL )
+ *ptraux = '\0' ;
+ if(strcmp(filekey, key) == 0)
+ notFound = 0;
+ free(filekey);
+ }
+ }
+ closedir(dp);
+ }
+
+ FUNC_EXIT_RC(notFound);
+ return notFound;
+}
+#endif
+
+
+/** Delete all the persisted message in the client persistence directory.
+ * See ::Persistence_clear
+ */
+int pstclear(void *handle)
+{
+ int rc = 0;
+ char *clientDir = handle;
+
+ FUNC_ENTRY;
+ if (clientDir == NULL)
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+#if defined(WIN32) || defined(WIN64)
+ rc = clearWin32(clientDir);
+#else
+ rc = clearUnix(clientDir);
+#endif
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int clearWin32(char *dirname)
+{
+ int rc = 0;
+ int fFinished = 0;
+ char *file;
+ char dir[MAX_PATH+1];
+ WIN32_FIND_DATAA FileData;
+ HANDLE hDir;
+
+ FUNC_ENTRY;
+ sprintf(dir, "%s/*", dirname);
+
+ hDir = FindFirstFileA(dir, &FileData);
+ if (hDir != INVALID_HANDLE_VALUE)
+ {
+ while (!fFinished)
+ {
+ if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+ {
+ file = malloc(strlen(dirname) + strlen(FileData.cFileName) + 2);
+ sprintf(file, "%s/%s", dirname, FileData.cFileName);
+ rc = remove(file);
+ free(file);
+ if ( rc != 0 )
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ break;
+ }
+ }
+ if (!FindNextFileA(hDir, &FileData))
+ {
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ fFinished = 1;
+ }
+ }
+ FindClose(hDir);
+ } else
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+#else
+int clearUnix(char *dirname)
+{
+ int rc = 0;
+ DIR *dp;
+ struct dirent *dir_entry;
+ struct stat stat_info;
+
+ FUNC_ENTRY;
+ if((dp = opendir(dirname)) != NULL)
+ {
+ while((dir_entry = readdir(dp)) != NULL && rc == 0)
+ {
+ lstat(dir_entry->d_name, &stat_info);
+ if(S_ISREG(stat_info.st_mode))
+ {
+ if ( remove(dir_entry->d_name) != 0 )
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ }
+ }
+ closedir(dp);
+ } else
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+#endif
+
+
+/** Returns the keys (file names w/o the extension) in the client persistence directory.
+ * See ::Persistence_keys
+ */
+int pstkeys(void *handle, char ***keys, int *nkeys)
+{
+ int rc = 0;
+ char *clientDir = handle;
+
+ FUNC_ENTRY;
+ if (clientDir == NULL)
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+#if defined(WIN32) || defined(WIN64)
+ rc = keysWin32(clientDir, keys, nkeys);
+#else
+ rc = keysUnix(clientDir, keys, nkeys);
+#endif
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int keysWin32(char *dirname, char ***keys, int *nkeys)
+{
+ int rc = 0;
+ char **fkeys = NULL;
+ int nfkeys = 0;
+ char dir[MAX_PATH+1];
+ WIN32_FIND_DATAA FileData;
+ HANDLE hDir;
+ int fFinished = 0;
+ char *ptraux;
+ int i;
+
+ FUNC_ENTRY;
+ sprintf(dir, "%s/*", dirname);
+
+ /* get number of keys */
+ hDir = FindFirstFileA(dir, &FileData);
+ if (hDir != INVALID_HANDLE_VALUE)
+ {
+ while (!fFinished)
+ {
+ if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+ nfkeys++;
+ if (!FindNextFileA(hDir, &FileData))
+ {
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ fFinished = 1;
+ }
+ }
+ FindClose(hDir);
+ } else
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+ if (nfkeys != 0 )
+ fkeys = (char **)malloc(nfkeys * sizeof(char *));
+
+ /* copy the keys */
+ hDir = FindFirstFileA(dir, &FileData);
+ if (hDir != INVALID_HANDLE_VALUE)
+ {
+ fFinished = 0;
+ i = 0;
+ while (!fFinished)
+ {
+ if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+ {
+ fkeys[i] = malloc(strlen(FileData.cFileName) + 1);
+ strcpy(fkeys[i], FileData.cFileName);
+ ptraux = strstr(fkeys[i], MESSAGE_FILENAME_EXTENSION);
+ if ( ptraux != NULL )
+ *ptraux = '\0' ;
+ i++;
+ }
+ if (!FindNextFileA(hDir, &FileData))
+ {
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ fFinished = 1;
+ }
+ }
+ FindClose(hDir);
+ } else
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+ *nkeys = nfkeys;
+ *keys = fkeys;
+ /* the caller must free keys */
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+#else
+int keysUnix(char *dirname, char ***keys, int *nkeys)
+{
+ int rc = 0;
+ char **fkeys = NULL;
+ int nfkeys = 0;
+ char *ptraux;
+ int i;
+ DIR *dp;
+ struct dirent *dir_entry;
+ struct stat stat_info;
+
+ FUNC_ENTRY;
+ /* get number of keys */
+ if((dp = opendir(dirname)) != NULL)
+ {
+ while((dir_entry = readdir(dp)) != NULL)
+ {
+ char* temp = malloc(strlen(dirname)+strlen(dir_entry->d_name)+2);
+
+ sprintf(temp, "%s/%s", dirname, dir_entry->d_name);
+ if (lstat(temp, &stat_info) == 0 && S_ISREG(stat_info.st_mode))
+ nfkeys++;
+ free(temp);
+ }
+ closedir(dp);
+ } else
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+
+ if (nfkeys != 0)
+ {
+ fkeys = (char **)malloc(nfkeys * sizeof(char *));
+
+ /* copy the keys */
+ if((dp = opendir(dirname)) != NULL)
+ {
+ i = 0;
+ while((dir_entry = readdir(dp)) != NULL)
+ {
+ char* temp = malloc(strlen(dirname)+strlen(dir_entry->d_name)+2);
+
+ sprintf(temp, "%s/%s", dirname, dir_entry->d_name);
+ if (lstat(temp, &stat_info) == 0 && S_ISREG(stat_info.st_mode))
+ {
+ fkeys[i] = malloc(strlen(dir_entry->d_name) + 1);
+ strcpy(fkeys[i], dir_entry->d_name);
+ ptraux = strstr(fkeys[i], MESSAGE_FILENAME_EXTENSION);
+ if ( ptraux != NULL )
+ *ptraux = '\0' ;
+ i++;
+ }
+ free(temp);
+ }
+ closedir(dp);
+ } else
+ {
+ rc = MQTTCLIENT_PERSISTENCE_ERROR;
+ goto exit;
+ }
+ }
+
+ *nkeys = nfkeys;
+ *keys = fkeys;
+ /* the caller must free keys */
+
+exit:
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+#endif
+
+
+
+#if defined(UNIT_TESTS)
+int main (int argc, char *argv[])
+{
+#define MSTEM "m-"
+#define NMSGS 10
+#define NBUFS 4
+#define NDEL 2
+#define RC !rc ? "(Success)" : "(Failed) "
+
+ int rc;
+ char *handle;
+ char *perdir = ".";
+ const char *clientID = "TheUTClient";
+ const char *serverURI = "127.0.0.1:1883";
+
+ char *stem = MSTEM;
+ int msgId, i;
+ int nm[NDEL] = {5 , 8}; /* msgIds to get and remove */
+
+ char *key;
+ char **keys;
+ int nkeys;
+ char *buffer, *buff;
+ int buflen;
+
+ int nbufs = NBUFS;
+ char *bufs[NBUFS] = {"m0", "mm1", "mmm2" , "mmmm3"}; /* message content */
+ int buflens[NBUFS];
+ for(i=0;i<nbufs;i++)
+ buflens[i]=strlen(bufs[i]);
+
+ /* open */
+ /* printf("Persistence directory : %s\n", perdir); */
+ rc = pstopen((void**)&handle, clientID, serverURI, perdir);
+ printf("%s Persistence directory for client %s : %s\n", RC, clientID, handle);
+
+ /* put */
+ for(msgId=0;msgId<NMSGS;msgId++)
+ {
+ key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ sprintf(key, "%s%d", stem, msgId);
+ rc = pstput(handle, key, nbufs, bufs, buflens);
+ printf("%s Adding message %s\n", RC, key);
+ free(key);
+ }
+
+ /* keys ,ie, list keys added */
+ rc = pstkeys(handle, &keys, &nkeys);
+ printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+ for(i=0;i<nkeys;i++)
+ printf("%13s\n", keys[i]);
+
+ if (keys !=NULL)
+ free(keys);
+
+ /* containskey */
+ for(i=0;i<NDEL;i++)
+ {
+ key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ sprintf(key, "%s%d", stem, nm[i]);
+ rc = pstcontainskey(handle, key);
+ printf("%s Message %s is persisted ?\n", RC, key);
+ free(key);
+ }
+
+ /* get && remove*/
+ for(i=0;i<NDEL;i++)
+ {
+ key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ sprintf(key, "%s%d", stem, nm[i]);
+ rc = pstget(handle, key, &buffer, &buflen);
+ buff = malloc(buflen+1);
+ memcpy(buff, buffer, buflen);
+ buff[buflen] = '\0';
+ printf("%s Retrieving message %s : %s\n", RC, key, buff);
+ rc = pstremove(handle, key);
+ printf("%s Removing message %s\n", RC, key);
+ free(key);
+ free(buff);
+ free(buffer);
+ }
+
+ /* containskey */
+ for(i=0;i<NDEL;i++)
+ {
+ key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+ sprintf(key, "%s%d", stem, nm[i]);
+ rc = pstcontainskey(handle, key);
+ printf("%s Message %s is persisted ?\n", RC, key);
+ free(key);
+ }
+
+ /* keys ,ie, list keys added */
+ rc = pstkeys(handle, &keys, &nkeys);
+ printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+ for(i=0;i<nkeys;i++)
+ printf("%13s\n", keys[i]);
+
+ if (keys != NULL)
+ free(keys);
+
+
+ /* close -> it will fail, since client persistence directory is not empty */
+ rc = pstclose(&handle);
+ printf("%s Closing client persistence directory for client %s\n", RC, clientID);
+
+ /* clear */
+ rc = pstclear(handle);
+ printf("%s Deleting all persisted messages in %s\n", RC, handle);
+
+ /* keys ,ie, list keys added */
+ rc = pstkeys(handle, &keys, &nkeys);
+ printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+ for(i=0;i<nkeys;i++)
+ printf("%13s\n", keys[i]);
+
+ if ( keys != NULL )
+ free(keys);
+
+ /* close */
+ rc = pstclose(&handle);
+ printf("%s Closing client persistence directory for client %s\n", RC, clientID);
+}
+#endif
+
+
+#endif /* NO_PERSISTENCE */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
new file mode 100644
index 0000000..27fedd6
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+/** 8.3 filesystem */
+#define MESSAGE_FILENAME_LENGTH 8
+/** Extension of the filename */
+#define MESSAGE_FILENAME_EXTENSION ".msg"
+
+/* prototypes of the functions for the default file system persistence */
+int pstopen(void** handle, const char* clientID, const char* serverURI, void* context);
+int pstclose(void* handle);
+int pstput(void* handle, char* key, int bufcount, char* buffers[], int buflens[]);
+int pstget(void* handle, char* key, char** buffer, int* buflen);
+int pstremove(void* handle, char* key);
+int pstkeys(void* handle, char*** keys, int* nkeys);
+int pstclear(void* handle);
+int pstcontainskey(void* handle, char* key);
+
+int pstmkdir(char *pPathname);
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocol.h b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
new file mode 100644
index 0000000..7478103
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - MQTT 3.1.1 updates
+ *******************************************************************************/
+
+#if !defined(MQTTPROTOCOL_H)
+#define MQTTPROTOCOL_H
+
+#include "LinkedList.h"
+#include "MQTTPacket.h"
+#include "Clients.h"
+
+#define MAX_MSG_ID 65535
+#define MAX_CLIENTID_LEN 65535
+
+typedef struct
+{
+ int socket;
+ Publications* p;
+} pending_write;
+
+
+typedef struct
+{
+ List publications;
+ unsigned int msgs_received;
+ unsigned int msgs_sent;
+ List pending_writes; /* for qos 0 writes not complete */
+} MQTTProtocol;
+
+
+#include "MQTTProtocolOut.h"
+
+#endif