You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:44 UTC

[16/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h
deleted file mode 100644
index 49d1d29..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdlist.h"
-
-extern const char *rd_kafka_topic_state_names[];
-
-
-/* rd_kafka_itopic_t: internal representation of a topic */
-struct rd_kafka_itopic_s {
-	TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;
-
-	rd_refcnt_t        rkt_refcnt;
-
-	rwlock_t           rkt_lock;
-	rd_kafkap_str_t   *rkt_topic;
-
-	shptr_rd_kafka_toppar_t  *rkt_ua;  /* unassigned partition */
-	shptr_rd_kafka_toppar_t **rkt_p;
-	int32_t            rkt_partition_cnt;
-
-        rd_list_t          rkt_desp;              /* Desired partitions
-                                                   * that are not yet seen
-                                                   * in the cluster. */
-
-	rd_ts_t            rkt_ts_metadata; /* Timestamp of last metadata
-					     * update for this topic. */
-
-        mtx_t              rkt_app_lock;    /* Protects rkt_app_* */
-        rd_kafka_topic_t *rkt_app_rkt;      /* A shared topic pointer
-                                             * to be used for callbacks
-                                             * to the application. */
-
-	int               rkt_app_refcnt;   /* Number of active rkt's new()ed
-					     * by application. */
-
-	enum {
-		RD_KAFKA_TOPIC_S_UNKNOWN,   /* No cluster information yet */
-		RD_KAFKA_TOPIC_S_EXISTS,    /* Topic exists in cluster */
-		RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
-	} rkt_state;
-
-        int               rkt_flags;
-#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL   0x1 /* Leader lost/unavailable
-                                               * for at least one partition. */
-
-	rd_kafka_t       *rkt_rk;
-
-        shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */
-
-	rd_kafka_topic_conf_t rkt_conf;
-};
-
-#define rd_kafka_topic_rdlock(rkt)     rwlock_rdlock(&(rkt)->rkt_lock)
-#define rd_kafka_topic_wrlock(rkt)     rwlock_wrlock(&(rkt)->rkt_lock)
-#define rd_kafka_topic_rdunlock(rkt)   rwlock_rdunlock(&(rkt)->rkt_lock)
-#define rd_kafka_topic_wrunlock(rkt)   rwlock_wrunlock(&(rkt)->rkt_lock)
-
-
-/* Converts a shptr..itopic_t to an internal itopic_t */
-#define rd_kafka_topic_s2i(s_rkt) rd_shared_ptr_obj(s_rkt)
-
-/* Converts an application topic_t (a shptr topic) to an internal itopic_t */
-#define rd_kafka_topic_a2i(app_rkt) \
-        rd_kafka_topic_s2i((shptr_rd_kafka_itopic_t *)app_rkt)
-
-/* Converts a shptr..itopic_t to an app topic_t (they are the same thing) */
-#define rd_kafka_topic_s2a(s_rkt) ((rd_kafka_topic_t *)(s_rkt))
-
-/* Converts an app topic_t to a shptr..itopic_t (they are the same thing) */
-#define rd_kafka_topic_a2s(app_rkt) ((shptr_rd_kafka_itopic_t *)(app_rkt))
-
-
-
-
-
-/**
- * Returns a shared pointer for the topic.
- */
-#define rd_kafka_topic_keep(rkt) \
-        rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, shptr_rd_kafka_itopic_t)
-
-/* Same, but casts to an app topic_t */
-#define rd_kafka_topic_keep_a(rkt)                                      \
-        ((rd_kafka_topic_t *)rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, \
-                                               shptr_rd_kafka_itopic_t))
-
-void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt);
-
-
-/**
- * Frees a shared pointer previously returned by ..topic_keep()
- */
-static RD_INLINE RD_UNUSED void
-rd_kafka_topic_destroy0 (shptr_rd_kafka_itopic_t *s_rkt) {
-        rd_shared_ptr_put(s_rkt,
-                          &rd_kafka_topic_s2i(s_rkt)->rkt_refcnt,
-                          rd_kafka_topic_destroy_final(
-                                  rd_kafka_topic_s2i(s_rkt)));
-}
-
-
-shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic,
-                                              rd_kafka_topic_conf_t *conf,
-                                              int *existing, int do_lock);
-
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line,
-                                                 rd_kafka_t *rk,
-                                                 const char *topic,
-                                                 int do_lock);
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line,
-                                                  rd_kafka_t *rk,
-                                                  const rd_kafkap_str_t *topic);
-#define rd_kafka_topic_find(rk,topic,do_lock)                           \
-        rd_kafka_topic_find_fl(__FUNCTION__,__LINE__,rk,topic,do_lock)
-#define rd_kafka_topic_find0(rk,topic)                                  \
-        rd_kafka_topic_find0_fl(__FUNCTION__,__LINE__,rk,topic)
-int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b);
-
-void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt);
-
-void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt);
-
-int rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb,
-                                     const struct rd_kafka_metadata_topic *mdt);
-
-int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);
-
-
-typedef struct rd_kafka_topic_info_s {
-	const char *topic;          /**< Allocated along with struct */
-	int   partition_cnt;
-} rd_kafka_topic_info_t;
-
-
-int rd_kafka_topic_info_cmp (const void *_a, const void *_b);
-rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic,
-						int partition_cnt);
-void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti);
-
-int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern,
-			  const char *topic);
-
-int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp,
-                                   int32_t leader_id, rd_kafka_broker_t *rkb);
-
-rd_kafka_resp_err_t
-rd_kafka_topics_leader_query_sync (rd_kafka_t *rk, int all_topics,
-                                   const rd_list_t *topics, int timeout_ms);
-void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt,
-                                   int do_rk_lock);
-#define rd_kafka_topic_leader_query(rk,rkt) \
-        rd_kafka_topic_leader_query0(rk,rkt,1/*lock*/)
-
-#define rd_kafka_topic_fast_leader_query(rk) \
-        rd_kafka_metadata_fast_leader_query(rk)
-
-void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c
deleted file mode 100644
index 3a5f93c..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c
+++ /dev/null
@@ -1,1523 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#ifdef _MSC_VER
-#pragma comment(lib, "ws2_32.lib")
-#endif
-
-#define __need_IOV_MAX
-
-#define _DARWIN_C_SOURCE  /* MSG_DONTWAIT */
-
-#include "rdkafka_int.h"
-#include "rdaddr.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_transport_int.h"
-#include "rdkafka_broker.h"
-
-#include <errno.h>
-
-#if WITH_VALGRIND
-/* OpenSSL relies on uninitialized memory, which Valgrind will whine about.
- * We use in-code Valgrind macros to suppress those warnings. */
-#include <valgrind/memcheck.h>
-#else
-#define VALGRIND_MAKE_MEM_DEFINED(A,B)
-#endif
-
-
-#ifdef _MSC_VER
-#define socket_errno WSAGetLastError()
-#else
-#include <sys/socket.h>
-#define socket_errno errno
-#define SOCKET_ERROR -1
-#endif
-
-/* AIX doesn't have MSG_DONTWAIT */
-#ifndef MSG_DONTWAIT
-#  define MSG_DONTWAIT MSG_NONBLOCK
-#endif
-
-
-#if WITH_SSL
-static mtx_t *rd_kafka_ssl_locks;
-static int    rd_kafka_ssl_locks_cnt;
-#endif
-
-
-
-/**
- * Low-level socket close
- */
-static void rd_kafka_transport_close0 (rd_kafka_t *rk, int s) {
-        if (rk->rk_conf.closesocket_cb)
-                rk->rk_conf.closesocket_cb(s, rk->rk_conf.opaque);
-        else {
-#ifndef _MSC_VER
-		close(s);
-#else
-		closesocket(s);
-#endif
-        }
-
-}
-
-/**
- * Close and destroy a transport handle
- */
-void rd_kafka_transport_close (rd_kafka_transport_t *rktrans) {
-#if WITH_SSL
-	if (rktrans->rktrans_ssl) {
-		SSL_shutdown(rktrans->rktrans_ssl);
-		SSL_free(rktrans->rktrans_ssl);
-	}
-#endif
-
-        rd_kafka_sasl_close(rktrans);
-
-	if (rktrans->rktrans_recv_buf)
-		rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
-
-	if (rktrans->rktrans_s != -1)
-                rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,
-                                          rktrans->rktrans_s);
-
-	rd_free(rktrans);
-}
-
-
-static const char *socket_strerror(int err) {
-#ifdef _MSC_VER
-	static RD_TLS char buf[256];
-        rd_strerror_w32(err, buf, sizeof(buf));
-	return buf;
-#else
-	return rd_strerror(err);
-#endif
-}
-
-
-
-
-#ifndef _MSC_VER
-/**
- * @brief sendmsg() abstraction, converting a list of segments to iovecs.
- * @remark should only be called if the number of segments is > 1.
- */
-ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
-                                           rd_slice_t *slice,
-                                           char *errstr, size_t errstr_size) {
-        struct iovec iov[IOV_MAX];
-        struct msghdr msg = { .msg_iov = iov };
-        size_t iovlen;
-        ssize_t r;
-
-        rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX,
-                         /* FIXME: Measure the effects of this */
-                         rktrans->rktrans_sndbuf_size);
-        msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen;
-
-#ifdef sun
-        /* See recvmsg() comment. Setting it here to be safe. */
-        socket_errno = EAGAIN;
-#endif
-
-        r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT
-#ifdef MSG_NOSIGNAL
-                    | MSG_NOSIGNAL
-#endif
-                );
-
-        if (r == -1) {
-                if (socket_errno == EAGAIN)
-                        return 0;
-                rd_snprintf(errstr, errstr_size, "%s", rd_strerror(errno));
-        }
-
-        /* Update buffer read position */
-        rd_slice_read(slice, NULL, (size_t)r);
-
-        return r;
-}
-#endif
-
-
-/**
- * @brief Plain send() abstraction
- */
-static ssize_t
-rd_kafka_transport_socket_send0 (rd_kafka_transport_t *rktrans,
-                                 rd_slice_t *slice,
-                                 char *errstr, size_t errstr_size) {
-        ssize_t sum = 0;
-        const void *p;
-        size_t rlen;
-
-        while ((rlen = rd_slice_peeker(slice, &p))) {
-                ssize_t r;
-
-                r = send(rktrans->rktrans_s, p,
-#ifdef _MSC_VER
-                         (int)rlen, (int)0
-#else
-                         rlen, 0
-#endif
-                );
-
-#ifdef _MSC_VER
-                if (unlikely(r == SOCKET_ERROR)) {
-                        if (sum > 0 || WSAGetLastError() == WSAEWOULDBLOCK)
-                                return sum;
-                        else {
-                                rd_snprintf(errstr, errstr_size, "%s",
-                                            socket_strerror(WSAGetLastError()));
-                                return -1;
-                        }
-                }
-#else
-                if (unlikely(r <= 0)) {
-                        if (r == 0 || errno == EAGAIN)
-                                return 0;
-                        rd_snprintf(errstr, errstr_size, "%s",
-                                    socket_strerror(socket_errno));
-                        return -1;
-                }
-#endif
-
-                /* Update buffer read position */
-                rd_slice_read(slice, NULL, (size_t)r);
-
-                sum += r;
-
-                /* FIXME: remove this and try again immediately and let
-                 *        the next write() call fail instead? */
-                if ((size_t)r < rlen)
-                        break;
-        }
-
-        return sum;
-}
-
-
-static ssize_t
-rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans,
-                                rd_slice_t *slice,
-                                char *errstr, size_t errstr_size) {
-#ifndef _MSC_VER
-        /* FIXME: Use sendmsg() with iovecs if there's more than one segment
-         * remaining, otherwise (or if platform does not have sendmsg)
-         * use plain send(). */
-        return rd_kafka_transport_socket_sendmsg(rktrans, slice,
-                                                 errstr, errstr_size);
-#endif
-        return rd_kafka_transport_socket_send0(rktrans, slice,
-                                               errstr, errstr_size);
-}
-
-
-
-#ifndef _MSC_VER
-/**
- * @brief recvmsg() abstraction, converting a list of segments to iovecs.
- * @remark should only be called if the number of segments is > 1.
- */
-static ssize_t
-rd_kafka_transport_socket_recvmsg (rd_kafka_transport_t *rktrans,
-                                   rd_buf_t *rbuf,
-                                   char *errstr, size_t errstr_size) {
-        ssize_t r;
-        struct iovec iov[IOV_MAX];
-        struct msghdr msg = { .msg_iov = iov };
-        size_t iovlen;
-
-        rd_buf_get_write_iov(rbuf, msg.msg_iov, &iovlen, IOV_MAX,
-                             /* FIXME: Measure the effects of this */
-                             rktrans->rktrans_rcvbuf_size);
-        msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen;
-
-#ifdef sun
-        /* SunOS doesn't seem to set errno when recvmsg() fails
-         * due to no data and MSG_DONTWAIT is set. */
-        socket_errno = EAGAIN;
-#endif
-        r = recvmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT);
-        if (unlikely(r <= 0)) {
-                if (r == -1 && socket_errno == EAGAIN)
-                        return 0;
-                else if (r == 0) {
-                        /* Receive 0 after POLLIN event means
-                         * connection closed. */
-                        rd_snprintf(errstr, errstr_size, "Disconnected");
-                        return -1;
-                } else if (r == -1) {
-                        rd_snprintf(errstr, errstr_size, "%s",
-                                    rd_strerror(errno));
-                        return -1;
-                }
-        }
-
-        /* Update buffer write position */
-        rd_buf_write(rbuf, NULL, (size_t)r);
-
-        return r;
-}
-#endif
-
-
-/**
- * @brief Plain recv()
- */
-static ssize_t
-rd_kafka_transport_socket_recv0 (rd_kafka_transport_t *rktrans,
-                                 rd_buf_t *rbuf,
-                                 char *errstr, size_t errstr_size) {
-        ssize_t sum = 0;
-        void *p;
-        size_t len;
-
-        while ((len = rd_buf_get_writable(rbuf, &p))) {
-                ssize_t r;
-
-                r = recv(rktrans->rktrans_s, p,
-#ifdef _MSC_VER
-                         (int)
-#endif
-                         len,
-                         0);
-
-#ifdef _MSC_VER
-                if (unlikely(r == SOCKET_ERROR)) {
-                        if (WSAGetLastError() == WSAEWOULDBLOCK)
-                                return sum;
-                        rd_snprintf(errstr, errstr_size, "%s",
-                                    socket_strerror(WSAGetLastError()));
-                        return -1;
-                }
-#else
-                if (unlikely(r <= 0)) {
-                        if (r == -1 && socket_errno == EAGAIN)
-                                return 0;
-                        else if (r == 0) {
-                                /* Receive 0 after POLLIN event means
-                                 * connection closed. */
-                                rd_snprintf(errstr, errstr_size,
-                                            "Disconnected");
-                                return -1;
-                        } else if (r == -1) {
-                                rd_snprintf(errstr, errstr_size, "%s",
-                                            rd_strerror(errno));
-                                return -1;
-                        }
-                }
-#endif
-
-                /* Update buffer write position */
-                rd_buf_write(rbuf, NULL, (size_t)r);
-
-                sum += r;
-
-                /* FIXME: remove this and try again immediately and let
-                 *        the next recv() call fail instead? */
-                if ((size_t)r < len)
-                        break;
-        }
-        return sum;
-}
-
-
-static ssize_t
-rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans,
-                                rd_buf_t *buf,
-                                char *errstr, size_t errstr_size) {
-#ifndef _MSC_VER
-        /* FIXME: Use recvmsg() with iovecs if there's more than one segment
-         * remaining, otherwise (or if platform does not have sendmsg)
-         * use plain send(). */
-        return rd_kafka_transport_socket_recvmsg(rktrans, buf,
-                                                 errstr, errstr_size);
-#endif
-        return rd_kafka_transport_socket_recv0(rktrans, buf,
-                                               errstr, errstr_size);
-}
-
-
-
-
-
-/**
- * CONNECT state is failed (errstr!=NULL) or done (TCP is up, SSL is working..).
- * From this state we either hand control back to the broker code,
- * or if authentication is configured we ente the AUTH state.
- */
-void rd_kafka_transport_connect_done (rd_kafka_transport_t *rktrans,
-				      char *errstr) {
-	rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
-
-	rd_kafka_broker_connect_done(rkb, errstr);
-}
-
-
-
-#if WITH_SSL
-
-
-/**
- * Serves the entire OpenSSL error queue and logs each error.
- * The last error is not logged but returned in 'errstr'.
- *
- * If 'rkb' is non-NULL broker-specific logging will be used,
- * else it will fall back on global 'rk' debugging.
- */
-static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
-				 char *errstr, size_t errstr_size) {
-    unsigned long l;
-    const char *file, *data;
-    int line, flags;
-    int cnt = 0;
-
-    while ((l = ERR_get_error_line_data(&file, &line, &data, &flags)) != 0) {
-	char buf[256];
-
-	if (cnt++ > 0) {
-		/* Log last message */
-		if (rkb)
-			rd_rkb_log(rkb, LOG_ERR, "SSL", "%s", errstr);
-		else
-			rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr);
-	}
-	
-	ERR_error_string_n(l, buf, sizeof(buf));
-
-	rd_snprintf(errstr, errstr_size, "%s:%d: %s: %s",
-		    file, line, buf, (flags & ERR_TXT_STRING) ? data : "");
-
-    }
-
-    if (cnt == 0)
-    	    rd_snprintf(errstr, errstr_size, "No error");
-    
-    return errstr;
-}
-
-
-static void rd_kafka_transport_ssl_lock_cb (int mode, int i,
-					    const char *file, int line) {
-	if (mode & CRYPTO_LOCK)
-		mtx_lock(&rd_kafka_ssl_locks[i]);
-	else
-		mtx_unlock(&rd_kafka_ssl_locks[i]);
-}
-
-static unsigned long rd_kafka_transport_ssl_threadid_cb (void) {
-#ifdef _MSC_VER
-        /* Windows makes a distinction between thread handle
-         * and thread id, which means we can't use the
-         * thrd_current() API that returns the handle. */
-        return (unsigned long)GetCurrentThreadId();
-#else
-        return (unsigned long)(intptr_t)thrd_current();
-#endif
-}
-
-
-/**
- * Global OpenSSL cleanup.
- */
-void rd_kafka_transport_ssl_term (void) {
-	int i;
-
-	CRYPTO_set_id_callback(NULL);
-	CRYPTO_set_locking_callback(NULL);
-        CRYPTO_cleanup_all_ex_data();
-
-	for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++)
-		mtx_destroy(&rd_kafka_ssl_locks[i]);
-
-	rd_free(rd_kafka_ssl_locks);
-
-}
-
-
-/**
- * Global OpenSSL init.
- */
-void rd_kafka_transport_ssl_init (void) {
-	int i;
-	
-	rd_kafka_ssl_locks_cnt = CRYPTO_num_locks();
-	rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt *
-				       sizeof(*rd_kafka_ssl_locks));
-	for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++)
-		mtx_init(&rd_kafka_ssl_locks[i], mtx_plain);
-
-	CRYPTO_set_id_callback(rd_kafka_transport_ssl_threadid_cb);
-	CRYPTO_set_locking_callback(rd_kafka_transport_ssl_lock_cb);
-	
-	SSL_load_error_strings();
-	SSL_library_init();
-	OpenSSL_add_all_algorithms();
-}
-
-
-/**
- * Set transport IO event polling based on SSL error.
- *
- * Returns -1 on permanent errors.
- *
- * Locality: broker thread
- */
-static RD_INLINE int
-rd_kafka_transport_ssl_io_update (rd_kafka_transport_t *rktrans, int ret,
-				  char *errstr, size_t errstr_size) {
-	int serr = SSL_get_error(rktrans->rktrans_ssl, ret);
-	int serr2;
-
-	switch (serr)
-	{
-	case SSL_ERROR_WANT_READ:
-		rd_kafka_transport_poll_set(rktrans, POLLIN);
-		break;
-
-	case SSL_ERROR_WANT_WRITE:
-	case SSL_ERROR_WANT_CONNECT:
-		rd_kafka_transport_poll_set(rktrans, POLLOUT);
-		break;
-
-	case SSL_ERROR_SYSCALL:
-		if (!(serr2 = SSL_get_error(rktrans->rktrans_ssl, ret))) {
-			if (ret == 0)
-				errno = ECONNRESET;
-			rd_snprintf(errstr, errstr_size,
-				    "SSL syscall error: %s", rd_strerror(errno));
-		} else
-			rd_snprintf(errstr, errstr_size,
-				    "SSL syscall error number: %d: %s", serr2,
-				    rd_strerror(errno));
-		return -1;
-
-        case SSL_ERROR_ZERO_RETURN:
-                rd_snprintf(errstr, errstr_size, "Disconnected");
-                return -1;
-
-	default:
-		rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb,
-				   errstr, errstr_size);
-		return -1;
-	}
-
-	return 0;
-}
-
-static ssize_t
-rd_kafka_transport_ssl_send (rd_kafka_transport_t *rktrans,
-                             rd_slice_t *slice,
-                             char *errstr, size_t errstr_size) {
-	ssize_t sum = 0;
-        const void *p;
-        size_t rlen;
-
-        while ((rlen = rd_slice_peeker(slice, &p))) {
-                int r;
-
-                r = SSL_write(rktrans->rktrans_ssl, p, (int)rlen);
-
-		if (unlikely(r <= 0)) {
-			if (rd_kafka_transport_ssl_io_update(rktrans, r,
-							     errstr,
-							     errstr_size) == -1)
-				return -1;
-			else
-				return sum;
-		}
-
-                /* Update buffer read position */
-                rd_slice_read(slice, NULL, (size_t)r);
-
-		sum += r;
-                 /* FIXME: remove this and try again immediately and let
-                  *        the next SSL_write() call fail instead? */
-                if ((size_t)r < rlen)
-                        break;
-
-	}
-	return sum;
-}
-
-static ssize_t
-rd_kafka_transport_ssl_recv (rd_kafka_transport_t *rktrans,
-                             rd_buf_t *rbuf, char *errstr, size_t errstr_size) {
-	ssize_t sum = 0;
-        void *p;
-        size_t len;
-
-        while ((len = rd_buf_get_writable(rbuf, &p))) {
-		int r;
-
-                r = SSL_read(rktrans->rktrans_ssl, p, (int)len);
-
-		if (unlikely(r <= 0)) {
-			if (rd_kafka_transport_ssl_io_update(rktrans, r,
-							     errstr,
-							     errstr_size) == -1)
-				return -1;
-			else
-				return sum;
-		}
-
-                VALGRIND_MAKE_MEM_DEFINED(p, r);
-
-                /* Update buffer write position */
-                rd_buf_write(rbuf, NULL, (size_t)r);
-
-		sum += r;
-
-                 /* FIXME: remove this and try again immediately and let
-                  *        the next SSL_read() call fail instead? */
-                if ((size_t)r < len)
-                        break;
-
-	}
-	return sum;
-
-}
-
-
-/**
- * OpenSSL password query callback
- *
- * Locality: application thread
- */
-static int rd_kafka_transport_ssl_passwd_cb (char *buf, int size, int rwflag,
-					     void *userdata) {
-	rd_kafka_t *rk = userdata;
-	int pwlen;
-
-	rd_kafka_dbg(rk, SECURITY, "SSLPASSWD",
-		     "Private key file \"%s\" requires password",
-		     rk->rk_conf.ssl.key_location);
-
-	if (!rk->rk_conf.ssl.key_password) {
-		rd_kafka_log(rk, LOG_WARNING, "SSLPASSWD",
-			     "Private key file \"%s\" requires password but "
-			     "no password configured (ssl.key.password)",
-			     rk->rk_conf.ssl.key_location);
-		return -1;
-	}
-
-
-	pwlen = (int) strlen(rk->rk_conf.ssl.key_password);
-	memcpy(buf, rk->rk_conf.ssl.key_password, RD_MIN(pwlen, size));
-
-	return pwlen;
-}
-
-/**
- * Set up SSL for a newly connected connection
- *
- * Returns -1 on failure, else 0.
- */
-static int rd_kafka_transport_ssl_connect (rd_kafka_broker_t *rkb,
-					   rd_kafka_transport_t *rktrans,
-					   char *errstr, size_t errstr_size) {
-	int r;
-	char name[RD_KAFKA_NODENAME_SIZE];
-	char *t;
-
-	rktrans->rktrans_ssl = SSL_new(rkb->rkb_rk->rk_conf.ssl.ctx);
-	if (!rktrans->rktrans_ssl)
-		goto fail;
-
-	if (!SSL_set_fd(rktrans->rktrans_ssl, rktrans->rktrans_s))
-		goto fail;
-
-#if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT)
-	/* If non-numerical hostname, send it for SNI */
-	rd_snprintf(name, sizeof(name), "%s", rkb->rkb_nodename);
-	if ((t = strrchr(name, ':')))
-		*t = '\0';
-	if (!(/*ipv6*/(strchr(name, ':') &&
-		       strspn(name, "0123456789abcdefABCDEF:.[]%") == strlen(name)) ||
-	      /*ipv4*/strspn(name, "0123456789.") == strlen(name)) &&
-	    !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name))
-		goto fail;
-#endif
-
-	r = SSL_connect(rktrans->rktrans_ssl);
-	if (r == 1) {
-		/* Connected, highly unlikely since this is a
-		 * non-blocking operation. */
-		rd_kafka_transport_connect_done(rktrans, NULL);
-		return 0;
-	}
-
-		
-	if (rd_kafka_transport_ssl_io_update(rktrans, r,
-					     errstr, errstr_size) == -1)
-		return -1;
-	
-	return 0;
-
- fail:
-	rd_kafka_ssl_error(NULL, rkb, errstr, errstr_size);
-	return -1;
-}
-
-
-static RD_UNUSED int
-rd_kafka_transport_ssl_io_event (rd_kafka_transport_t *rktrans, int events) {
-	int r;
-	char errstr[512];
-
-	if (events & POLLOUT) {
-		r = SSL_write(rktrans->rktrans_ssl, NULL, 0);
-		if (rd_kafka_transport_ssl_io_update(rktrans, r,
-						     errstr,
-						     sizeof(errstr)) == -1)
-			goto fail;
-	}
-
-	return 0;
-
- fail:
-	/* Permanent error */
-	rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
-                             RD_KAFKA_RESP_ERR__TRANSPORT,
-			     "%s", errstr);
-	return -1;
-}
-
-
-/**
- * Verify SSL handshake was valid.
- */
-static int rd_kafka_transport_ssl_verify (rd_kafka_transport_t *rktrans) {
-	long int rl;
-	X509 *cert;
-
-	cert = SSL_get_peer_certificate(rktrans->rktrans_ssl);
-	X509_free(cert);
-	if (!cert) {
-		rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
-				     RD_KAFKA_RESP_ERR__SSL,
-				     "Broker did not provide a certificate");
-		return -1;
-	}
-
-	if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) {
-		rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
-				     RD_KAFKA_RESP_ERR__SSL,
-				     "Failed to verify broker certificate: %s",
-				     X509_verify_cert_error_string(rl));
-		return -1;
-	}
-
-	rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY",
-		   "Broker SSL certificate verified");
-	return 0;
-}
-
-/**
- * SSL handshake handling.
- * Call repeatedly (based on IO events) until handshake is done.
- *
- * Returns -1 on error, 0 if handshake is still in progress, or 1 on completion.
- */
-static int rd_kafka_transport_ssl_handshake (rd_kafka_transport_t *rktrans) {
-	rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
-	char errstr[512];
-	int r;
-
-	r = SSL_do_handshake(rktrans->rktrans_ssl);
-	if (r == 1) {
-		/* SSL handshake done. Verify. */
-		if (rd_kafka_transport_ssl_verify(rktrans) == -1)
-			return -1;
-
-		rd_kafka_transport_connect_done(rktrans, NULL);
-		return 1;
-
-	} else if (rd_kafka_transport_ssl_io_update(rktrans, r,
-						    errstr,
-						    sizeof(errstr)) == -1) {
-		rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__SSL,
-				     "SSL handshake failed: %s%s", errstr,
-				     strstr(errstr, "unexpected message") ?
-				     ": client authentication might be "
-				     "required (see broker log)" : "");
-		return -1;
-	}
-
-	return 0;
-}
-
-
-/**
- * Once per rd_kafka_t handle cleanup of OpenSSL
- *
- * Locality: any thread
- *
- * NOTE: rd_kafka_wrlock() MUST be held
- */
-void rd_kafka_transport_ssl_ctx_term (rd_kafka_t *rk) {
-	SSL_CTX_free(rk->rk_conf.ssl.ctx);
-	rk->rk_conf.ssl.ctx = NULL;
-}
-
-/**
- * Once per rd_kafka_t handle initialization of OpenSSL
- *
- * Locality: application thread
- *
- * NOTE: rd_kafka_wrlock() MUST be held
- */
-int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
-				     char *errstr, size_t errstr_size) {
-	int r;
-	SSL_CTX *ctx;
-
-        if (errstr_size > 0)
-                errstr[0] = '\0';
-
-	ctx = SSL_CTX_new(SSLv23_client_method());
-        if (!ctx) {
-                rd_snprintf(errstr, errstr_size,
-                            "SSLv23_client_method() failed: ");
-                goto fail;
-        }
-
-#ifdef SSL_OP_NO_SSLv3
-	/* Disable SSLv3 (unsafe) */
-	SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3);
-#endif
-
-	/* Key file password callback */
-	SSL_CTX_set_default_passwd_cb(ctx, rd_kafka_transport_ssl_passwd_cb);
-	SSL_CTX_set_default_passwd_cb_userdata(ctx, rk);
-
-	/* Ciphers */
-	if (rk->rk_conf.ssl.cipher_suites) {
-		rd_kafka_dbg(rk, SECURITY, "SSL",
-			     "Setting cipher list: %s",
-			     rk->rk_conf.ssl.cipher_suites);
-		if (!SSL_CTX_set_cipher_list(ctx,
-					     rk->rk_conf.ssl.cipher_suites)) {
-                        /* Set a string that will prefix the
-                         * the OpenSSL error message (which is lousy)
-                         * to make it more meaningful. */
-                        rd_snprintf(errstr, errstr_size,
-                                    "ssl.cipher.suites failed: ");
-                        goto fail;
-		}
-	}
-
-
-	if (rk->rk_conf.ssl.ca_location) {
-		/* CA certificate location, either file or directory. */
-		int is_dir = rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location);
-
-		rd_kafka_dbg(rk, SECURITY, "SSL",
-			     "Loading CA certificate(s) from %s %s",
-			     is_dir ? "directory":"file",
-			     rk->rk_conf.ssl.ca_location);
-		
-		r = SSL_CTX_load_verify_locations(ctx,
-						  !is_dir ?
-						  rk->rk_conf.ssl.
-						  ca_location : NULL,
-						  is_dir ?
-						  rk->rk_conf.ssl.
-						  ca_location : NULL);
-
-                if (r != 1) {
-                        rd_snprintf(errstr, errstr_size,
-                                    "ssl.ca.location failed: ");
-                        goto fail;
-                }
-        } else {
-                /* Use default CA certificate paths: ignore failures. */
-                r = SSL_CTX_set_default_verify_paths(ctx);
-                if (r != 1)
-                        rd_kafka_dbg(rk, SECURITY, "SSL",
-                                     "SSL_CTX_set_default_verify_paths() "
-                                     "failed: ignoring");
-        }
-
-	if (rk->rk_conf.ssl.crl_location) {
-		rd_kafka_dbg(rk, SECURITY, "SSL",
-			     "Loading CRL from file %s",
-			     rk->rk_conf.ssl.crl_location);
-
-		r = SSL_CTX_load_verify_locations(ctx,
-						  rk->rk_conf.ssl.crl_location,
-						  NULL);
-
-                if (r != 1) {
-                        rd_snprintf(errstr, errstr_size,
-                                    "ssl.crl.location failed: ");
-                        goto fail;
-                }
-
-
-		rd_kafka_dbg(rk, SECURITY, "SSL",
-			     "Enabling CRL checks");
-
-		X509_STORE_set_flags(SSL_CTX_get_cert_store(ctx),
-				     X509_V_FLAG_CRL_CHECK);
-	}
-
-	if (rk->rk_conf.ssl.cert_location) {
-		rd_kafka_dbg(rk, SECURITY, "SSL",
-			     "Loading certificate from file %s",
-			     rk->rk_conf.ssl.cert_location);
-
-		r = SSL_CTX_use_certificate_chain_file(ctx,
-						       rk->rk_conf.ssl.cert_location);
-
-                if (r != 1) {
-                        rd_snprintf(errstr, errstr_size,
-                                    "ssl.certificate.location failed: ");
-                        goto fail;
-                }
-	}
-
-	if (rk->rk_conf.ssl.key_location) {
-		rd_kafka_dbg(rk, SECURITY, "SSL",
-			     "Loading private key file from %s",
-			     rk->rk_conf.ssl.key_location);
-
-		r = SSL_CTX_use_PrivateKey_file(ctx,
-						rk->rk_conf.ssl.key_location,
-						SSL_FILETYPE_PEM);
-                if (r != 1) {
-                        rd_snprintf(errstr, errstr_size,
-                                    "ssl.key.location failed: ");
-                        goto fail;
-                }
-	}
-
-
-	SSL_CTX_set_mode(ctx, SSL_MODE_ENABLE_PARTIAL_WRITE);
-
-	rk->rk_conf.ssl.ctx = ctx;
-	return 0;
-
- fail:
-        r = (int)strlen(errstr);
-        rd_kafka_ssl_error(rk, NULL, errstr+r,
-                           (int)errstr_size > r ? (int)errstr_size - r : 0);
-	SSL_CTX_free(ctx);
-
-	return -1;
-}
-
-
-#endif /* WITH_SSL */
-
-
-ssize_t
-rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
-                         rd_slice_t *slice, char *errstr, size_t errstr_size) {
-
-#if WITH_SSL
-        if (rktrans->rktrans_ssl)
-                return rd_kafka_transport_ssl_send(rktrans, slice,
-                                                   errstr, errstr_size);
-        else
-#endif
-                return rd_kafka_transport_socket_send(rktrans, slice,
-                                                      errstr, errstr_size);
-}
-
-
-ssize_t
-rd_kafka_transport_recv (rd_kafka_transport_t *rktrans, rd_buf_t *rbuf,
-                         char *errstr, size_t errstr_size) {
-#if WITH_SSL
-	if (rktrans->rktrans_ssl)
-                return rd_kafka_transport_ssl_recv(rktrans, rbuf,
-                                                   errstr, errstr_size);
-	else
-#endif
-                return rd_kafka_transport_socket_recv(rktrans, rbuf,
-                                                      errstr, errstr_size);
-}
-
-
-
-
-/**
- * Length framed receive handling.
- * Currently only supports a the following framing:
- *     [int32_t:big_endian_length_of_payload][payload]
- *
- * To be used on POLLIN event, will return:
- *   -1: on fatal error (errstr will be updated, *rkbufp remains unset)
- *    0: still waiting for data (*rkbufp remains unset)
- *    1: data complete, (buffer returned in *rkbufp)
- */
-int rd_kafka_transport_framed_recv (rd_kafka_transport_t *rktrans,
-                                    rd_kafka_buf_t **rkbufp,
-                                    char *errstr, size_t errstr_size) {
-	rd_kafka_buf_t *rkbuf = rktrans->rktrans_recv_buf;
-	ssize_t r;
-	const int log_decode_errors = LOG_ERR;
-
-	/* States:
-	 *   !rktrans_recv_buf: initial state; set up buf to receive header.
-	 *    rkbuf_totlen == 0:   awaiting header
-	 *    rkbuf_totlen > 0:    awaiting payload
-	 */
-
-	if (!rkbuf) {
-                rkbuf = rd_kafka_buf_new(1, 4/*length field's length*/);
-                /* Set up buffer reader for the length field */
-                rd_buf_write_ensure(&rkbuf->rkbuf_buf, 4, 4);
-		rktrans->rktrans_recv_buf = rkbuf;
-	}
-
-
-        r = rd_kafka_transport_recv(rktrans, &rkbuf->rkbuf_buf,
-                                    errstr, errstr_size);
-	if (r == 0)
-		return 0;
-	else if (r == -1)
-		return -1;
-
-	if (rkbuf->rkbuf_totlen == 0) {
-		/* Frame length not known yet. */
-		int32_t frame_len;
-
-		if (rd_buf_write_pos(&rkbuf->rkbuf_buf) < sizeof(frame_len)) {
-			/* Wait for entire frame header. */
-			return 0;
-		}
-
-                /* Initialize reader */
-                rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, 4);
-
-		/* Reader header: payload length */
-		rd_kafka_buf_read_i32(rkbuf, &frame_len);
-
-		if (frame_len < 0 ||
-		    frame_len > rktrans->rktrans_rkb->
-		    rkb_rk->rk_conf.recv_max_msg_size) {
-			rd_snprintf(errstr, errstr_size,
-				    "Invalid frame size %"PRId32, frame_len);
-			return -1;
-		}
-
-		rkbuf->rkbuf_totlen = 4 + frame_len;
-		if (frame_len == 0) {
-			/* Payload is empty, we're done. */
-			rktrans->rktrans_recv_buf = NULL;
-			*rkbufp = rkbuf;
-			return 1;
-		}
-
-		/* Allocate memory to hold entire frame payload in contigious
-		 * memory. */
-                rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf, frame_len);
-
-                /* Try reading directly, there is probably more data available*/
-                return rd_kafka_transport_framed_recv(rktrans, rkbufp,
-                                                      errstr, errstr_size);
-	}
-
-	if (rd_buf_write_pos(&rkbuf->rkbuf_buf) == rkbuf->rkbuf_totlen) {
-		/* Payload is complete. */
-		rktrans->rktrans_recv_buf = NULL;
-		*rkbufp = rkbuf;
-		return 1;
-	}
-
-	/* Wait for more data */
-	return 0;
-
- err_parse:
-	if (rkbuf)
-		rd_kafka_buf_destroy(rkbuf);
-        rd_snprintf(errstr, errstr_size, "Frame header parsing failed: %s",
-                    rd_kafka_err2str(rkbuf->rkbuf_err));
-	return -1;
-}
-
-
-/**
- * TCP connection established.
- * Set up socket options, SSL, etc.
- *
- * Locality: broker thread
- */
-static void rd_kafka_transport_connected (rd_kafka_transport_t *rktrans) {
-	rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
-        unsigned int slen;
-
-        rd_rkb_dbg(rkb, BROKER, "CONNECT",
-                   "Connected to %s",
-                   rd_sockaddr2str(rkb->rkb_addr_last,
-                                   RD_SOCKADDR2STR_F_PORT |
-                                   RD_SOCKADDR2STR_F_FAMILY));
-
-	/* Set socket send & receive buffer sizes if configuerd */
-	if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
-		if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
-			       (void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
-			       sizeof(rkb->rkb_rk->rk_conf.
-				      socket_sndbuf_size)) == SOCKET_ERROR)
-			rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
-				   "Failed to set socket send "
-				   "buffer size to %i: %s",
-				   rkb->rkb_rk->rk_conf.socket_sndbuf_size,
-				   socket_strerror(socket_errno));
-	}
-
-	if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
-		if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
-			       (void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
-			       sizeof(rkb->rkb_rk->rk_conf.
-				      socket_rcvbuf_size)) == SOCKET_ERROR)
-			rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
-				   "Failed to set socket receive "
-				   "buffer size to %i: %s",
-				   rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
-				   socket_strerror(socket_errno));
-	}
-
-        /* Get send and receive buffer sizes to allow limiting
-         * the total number of bytes passed with iovecs to sendmsg()
-         * and recvmsg(). */
-        slen = sizeof(rktrans->rktrans_rcvbuf_size);
-        if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
-                       (void *)&rktrans->rktrans_rcvbuf_size,
-                       &slen) == SOCKET_ERROR) {
-                rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
-                           "Failed to get socket receive "
-                           "buffer size: %s: assuming 1MB",
-                           socket_strerror(socket_errno));
-                rktrans->rktrans_rcvbuf_size = 1024*1024;
-        } else if (rktrans->rktrans_rcvbuf_size < 1024 * 64)
-                rktrans->rktrans_rcvbuf_size = 1024*64; /* Use at least 64KB */
-
-        slen = sizeof(rktrans->rktrans_sndbuf_size);
-        if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
-                       (void *)&rktrans->rktrans_sndbuf_size,
-                       &slen) == SOCKET_ERROR) {
-                rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
-                           "Failed to get socket send "
-                           "buffer size: %s: assuming 1MB",
-                           socket_strerror(socket_errno));
-                rktrans->rktrans_sndbuf_size = 1024*1024;
-        } else if (rktrans->rktrans_sndbuf_size < 1024 * 64)
-                rktrans->rktrans_sndbuf_size = 1024*64; /* Use at least 64KB */
-
-
-#ifdef TCP_NODELAY
-	if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
-		int one = 1;
-		if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY,
-			       (void *)&one, sizeof(one)) == SOCKET_ERROR)
-			rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
-				   "Failed to disable Nagle (TCP_NODELAY) "
-				   "on socket %d: %s",
-				   socket_strerror(socket_errno));
-	}
-#endif
-
-
-#if WITH_SSL
-	if (rkb->rkb_proto == RD_KAFKA_PROTO_SSL ||
-	    rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL) {
-		char errstr[512];
-
-		/* Set up SSL connection.
-		 * This is also an asynchronous operation so dont
-		 * propagate to broker_connect_done() just yet. */
-		if (rd_kafka_transport_ssl_connect(rkb, rktrans,
-						   errstr,
-						   sizeof(errstr)) == -1) {
-			rd_kafka_transport_connect_done(rktrans, errstr);
-			return;
-		}
-		return;
-	}
-#endif
-
-	/* Propagate connect success */
-	rd_kafka_transport_connect_done(rktrans, NULL);
-}
-
-
-
-/**
- * @brief the kernel SO_ERROR in \p errp for the given transport.
- * @returns 0 if getsockopt() was succesful (and \p and errp can be trusted),
- * else -1 in which case \p errp 's value is undefined.
- */
-static int rd_kafka_transport_get_socket_error (rd_kafka_transport_t *rktrans,
-						int *errp) {
-	socklen_t intlen = sizeof(*errp);
-
-	if (getsockopt(rktrans->rktrans_s, SOL_SOCKET,
-		       SO_ERROR, (void *)errp, &intlen) == -1) {
-		rd_rkb_dbg(rktrans->rktrans_rkb, BROKER, "SO_ERROR",
-			   "Failed to get socket error: %s",
-			   socket_strerror(socket_errno));
-		return -1;
-	}
-
-	return 0;
-}
-
-
-/**
- * IO event handler.
- *
- * Locality: broker thread
- */
-static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans,
-					 int events) {
-	char errstr[512];
-	int r;
-	rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
-
-	switch (rkb->rkb_state)
-	{
-	case RD_KAFKA_BROKER_STATE_CONNECT:
-#if WITH_SSL
-		if (rktrans->rktrans_ssl) {
-			/* Currently setting up SSL connection:
-			 * perform handshake. */
-			rd_kafka_transport_ssl_handshake(rktrans);
-			return;
-		}
-#endif
-
-		/* Asynchronous connect finished, read status. */
-		if (!(events & (POLLOUT|POLLERR|POLLHUP)))
-			return;
-
-		if (rd_kafka_transport_get_socket_error(rktrans, &r) == -1) {
-			rd_kafka_broker_fail(
-                                rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
-                                "Connect to %s failed: "
-                                "unable to get status from "
-                                "socket %d: %s",
-                                rd_sockaddr2str(rkb->rkb_addr_last,
-                                                RD_SOCKADDR2STR_F_PORT |
-                                                RD_SOCKADDR2STR_F_FAMILY),
-                                rktrans->rktrans_s,
-                                rd_strerror(socket_errno));
-		} else if (r != 0) {
-			/* Connect failed */
-                        errno = r;
-			rd_snprintf(errstr, sizeof(errstr),
-				    "Connect to %s failed: %s",
-                                    rd_sockaddr2str(rkb->rkb_addr_last,
-                                                    RD_SOCKADDR2STR_F_PORT |
-                                                    RD_SOCKADDR2STR_F_FAMILY),
-                                    rd_strerror(r));
-
-			rd_kafka_transport_connect_done(rktrans, errstr);
-		} else {
-			/* Connect succeeded */
-			rd_kafka_transport_connected(rktrans);
-		}
-		break;
-
-	case RD_KAFKA_BROKER_STATE_AUTH:
-		/* SASL handshake */
-		if (rd_kafka_sasl_io_event(rktrans, events,
-					   errstr, sizeof(errstr)) == -1) {
-			errno = EINVAL;
-			rd_kafka_broker_fail(rkb, LOG_ERR,
-					     RD_KAFKA_RESP_ERR__AUTHENTICATION,
-					     "SASL authentication failure: %s",
-					     errstr);
-			return;
-		}
-		break;
-
-	case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
-	case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
-	case RD_KAFKA_BROKER_STATE_UP:
-	case RD_KAFKA_BROKER_STATE_UPDATE:
-
-		if (events & POLLIN) {
-			while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
-			       rd_kafka_recv(rkb) > 0)
-				;
-		}
-
-		if (events & POLLHUP) {
-			rd_kafka_broker_fail(rkb,
-                                             rkb->rkb_rk->rk_conf.
-                                             log_connection_close ?
-                                             LOG_NOTICE : LOG_DEBUG,
-                                             RD_KAFKA_RESP_ERR__TRANSPORT,
-					     "Connection closed");
-			return;
-		}
-
-		if (events & POLLOUT) {
-			while (rd_kafka_send(rkb) > 0)
-				;
-		}
-		break;
-
-	case RD_KAFKA_BROKER_STATE_INIT:
-	case RD_KAFKA_BROKER_STATE_DOWN:
-		rd_kafka_assert(rkb->rkb_rk, !*"bad state");
-	}
-}
-
-
-/**
- * Poll and serve IOs
- *
- * Locality: broker thread 
- */
-void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
-                                  int timeout_ms) {
-	rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
-	int events;
-
-	if (rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
-	    rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0)
-		rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT);
-
-	if ((events = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0)
-                return;
-
-        rd_kafka_transport_poll_clear(rktrans, POLLOUT);
-
-	rd_kafka_transport_io_event(rktrans, events);
-}
-
-
-/**
- * Initiate asynchronous connection attempt.
- *
- * Locality: broker thread
- */
-rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
-						  const rd_sockaddr_inx_t *sinx,
-						  char *errstr,
-						  size_t errstr_size) {
-	rd_kafka_transport_t *rktrans;
-	int s = -1;
-	int on = 1;
-        int r;
-
-        rkb->rkb_addr_last = sinx;
-
-	s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,
-					   SOCK_STREAM, IPPROTO_TCP,
-					   rkb->rkb_rk->rk_conf.opaque);
-	if (s == -1) {
-		rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
-			    socket_strerror(socket_errno));
-		return NULL;
-	}
-
-
-#ifdef SO_NOSIGPIPE
-	/* Disable SIGPIPE signalling for this socket on OSX */
-	if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) 
-		rd_rkb_dbg(rkb, BROKER, "SOCKET",
-			   "Failed to set SO_NOSIGPIPE: %s",
-			   socket_strerror(socket_errno));
-#endif
-
-	/* Enable TCP keep-alives, if configured. */
-	if (rkb->rkb_rk->rk_conf.socket_keepalive) {
-#ifdef SO_KEEPALIVE
-		if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE,
-			       (void *)&on, sizeof(on)) == SOCKET_ERROR)
-			rd_rkb_dbg(rkb, BROKER, "SOCKET",
-				   "Failed to set SO_KEEPALIVE: %s",
-				   socket_strerror(socket_errno));
-#else
-		rd_rkb_dbg(rkb, BROKER, "SOCKET",
-			   "System does not support "
-			   "socket.keepalive.enable (SO_KEEPALIVE)");
-#endif
-	}
-
-        /* Set the socket to non-blocking */
-        if ((r = rd_fd_set_nonblocking(s))) {
-                rd_snprintf(errstr, errstr_size,
-                            "Failed to set socket non-blocking: %s",
-                            socket_strerror(r));
-                goto err;
-        }
-
-	rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) "
-		   "with socket %i",
-		   rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY |
-				   RD_SOCKADDR2STR_F_PORT),
-		   rd_kafka_secproto_names[rkb->rkb_proto], s);
-
-	/* Connect to broker */
-        if (rkb->rkb_rk->rk_conf.connect_cb) {
-                r = rkb->rkb_rk->rk_conf.connect_cb(
-                        s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
-                        rkb->rkb_name, rkb->rkb_rk->rk_conf.opaque);
-        } else {
-                if (connect(s, (struct sockaddr *)sinx,
-                            RD_SOCKADDR_INX_LEN(sinx)) == SOCKET_ERROR &&
-                    (socket_errno != EINPROGRESS
-#ifdef _MSC_VER
-                     && socket_errno != WSAEWOULDBLOCK
-#endif
-                            ))
-                        r = socket_errno;
-                else
-                        r = 0;
-        }
-
-        if (r != 0) {
-		rd_rkb_dbg(rkb, BROKER, "CONNECT",
-			   "couldn't connect to %s: %s (%i)",
-			   rd_sockaddr2str(sinx,
-					   RD_SOCKADDR2STR_F_PORT |
-					   RD_SOCKADDR2STR_F_FAMILY),
-			   socket_strerror(r), r);
-		rd_snprintf(errstr, errstr_size,
-			    "Failed to connect to broker at %s: %s",
-			    rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
-			    socket_strerror(r));
-		goto err;
-	}
-
-	/* Create transport handle */
-	rktrans = rd_calloc(1, sizeof(*rktrans));
-	rktrans->rktrans_rkb = rkb;
-	rktrans->rktrans_s = s;
-	rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;
-        if (rkb->rkb_wakeup_fd[0] != -1) {
-                rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
-                rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];
-        }
-
-
-	/* Poll writability to trigger on connection success/failure. */
-	rd_kafka_transport_poll_set(rktrans, POLLOUT);
-
-	return rktrans;
-
- err:
-	if (s != -1)
-                rd_kafka_transport_close0(rkb->rkb_rk, s);
-
-	return NULL;
-}
-
-
-
-void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event) {
-	rktrans->rktrans_pfd[0].events |= event;
-}
-
-void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event) {
-	rktrans->rktrans_pfd[0].events &= ~event;
-}
-
-
-int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) {
-        int r;
-#ifndef _MSC_VER
-	r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
-	if (r <= 0)
-		return r;
-#else
-	r = WSAPoll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
-	if (r == 0) {
-		/* Workaround for broken WSAPoll() while connecting:
-		 * failed connection attempts are not indicated at all by WSAPoll()
-		 * so we need to check the socket error when Poll returns 0.
-		 * Issue #525 */
-		r = ECONNRESET;
-		if (unlikely(rktrans->rktrans_rkb->rkb_state ==
-			     RD_KAFKA_BROKER_STATE_CONNECT &&
-			     (rd_kafka_transport_get_socket_error(rktrans,
-								  &r) == -1 ||
-			      r != 0))) {
-			char errstr[512];
-			errno = r;
-			rd_snprintf(errstr, sizeof(errstr),
-				    "Connect to %s failed: %s",
-				    rd_sockaddr2str(rktrans->rktrans_rkb->
-						    rkb_addr_last,
-						    RD_SOCKADDR2STR_F_PORT |
-                                                    RD_SOCKADDR2STR_F_FAMILY),
-                                    socket_strerror(r));
-			rd_kafka_transport_connect_done(rktrans, errstr);
-			return -1;
-		} else
-			return 0;
-	} else if (r == SOCKET_ERROR)
-		return -1;
-#endif
-        rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);
-
-        if (rktrans->rktrans_pfd[1].revents & POLLIN) {
-                /* Read wake-up fd data and throw away, just used for wake-ups*/
-                char buf[512];
-                if (rd_read((int)rktrans->rktrans_pfd[1].fd,
-                            buf, sizeof(buf)) == -1) {
-                        /* Ignore warning */
-                }
-        }
-
-        return rktrans->rktrans_pfd[0].revents;
-}
-
-
-
-
-
-#if 0
-/**
- * Global cleanup.
- * This is dangerous and SHOULD NOT be called since it will rip
- * the rug from under the application if it uses any of this functionality
- * in its own code. This means we might leak some memory on exit.
- */
-void rd_kafka_transport_term (void) {
-#ifdef _MSC_VER
-	(void)WSACleanup(); /* FIXME: dangerous */
-#endif
-}
-#endif
- 
-void rd_kafka_transport_init(void) {
-#ifdef _MSC_VER
-	WSADATA d;
-	(void)WSAStartup(MAKEWORD(2, 2), &d);
-#endif
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h
deleted file mode 100644
index fcd2580..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#ifndef _MSC_VER
-#include <poll.h>
-#endif
-
-#include "rdbuf.h"
-#include "rdaddr.h"
-
-typedef struct rd_kafka_transport_s rd_kafka_transport_t;
-
-void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
-                                  int timeout_ms);
-
-ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
-                                 rd_slice_t *slice,
-                                 char *errstr, size_t errstr_size);
-ssize_t rd_kafka_transport_recv (rd_kafka_transport_t *rktrans,
-                                 rd_buf_t *rbuf,
-                                 char *errstr, size_t errstr_size);
-int rd_kafka_transport_framed_recv (rd_kafka_transport_t *rktrans,
-                                    rd_kafka_buf_t **rkbufp,
-                                    char *errstr, size_t errstr_size);
-struct rd_kafka_broker_s;
-rd_kafka_transport_t *rd_kafka_transport_connect(struct rd_kafka_broker_s *rkb, const rd_sockaddr_inx_t *sinx,
-                                                 char *errstr, size_t errstr_size);
-void rd_kafka_transport_connect_done (rd_kafka_transport_t *rktrans,
-				      char *errstr);
-
-void rd_kafka_transport_close(rd_kafka_transport_t *rktrans);
-void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event);
-void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event);
-int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout);
-
-#if WITH_SSL
-void rd_kafka_transport_ssl_ctx_term (rd_kafka_t *rk);
-int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
-				     char *errstr, size_t errstr_size);
-
-void rd_kafka_transport_ssl_term (void);
-void rd_kafka_transport_ssl_init (void);
-#endif
-void rd_kafka_transport_term (void);
-void rd_kafka_transport_init(void);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h
deleted file mode 100644
index 8ae79b4..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-/* This header file is to be used by .c files needing access to the
- * rd_kafka_transport_t struct internals. */
-
-#include "rdkafka_sasl.h"
-
-#if WITH_SSL
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-#endif
-
-struct rd_kafka_transport_s {	
-	int rktrans_s;
-	
-	rd_kafka_broker_t *rktrans_rkb;
-
-#if WITH_SSL
-	SSL *rktrans_ssl;
-#endif
-
-	struct {
-                void *state;               /* SASL implementation
-                                            * state handle */
-
-                int           complete;    /* Auth was completed early
-					    * from the client's perspective
-					    * (but we might still have to
-                                            *  wait for server reply). */
-
-                /* SASL framing buffers */
-		struct msghdr msg;
-		struct iovec  iov[2];
-
-		char          *recv_buf;
-		int            recv_of;    /* Received byte count */
-		int            recv_len;   /* Expected receive length for
-					    * current frame. */
-	} rktrans_sasl;
-
-	rd_kafka_buf_t *rktrans_recv_buf;  /* Used with framed_recvmsg */
-
-        /* Two pollable fds:
-         * - TCP socket
-         * - wake-up fd
-         */
-#ifndef _MSC_VER
-        struct pollfd rktrans_pfd[2];
-#else
-        WSAPOLLFD rktrans_pfd[2];
-#endif
-        int rktrans_pfd_cnt;
-
-        size_t rktrans_rcvbuf_size;    /**< Socket receive buffer size */
-        size_t rktrans_sndbuf_size;    /**< Socket send buffer size */
-};
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlist.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlist.c b/thirdparty/librdkafka-0.11.1/src/rdlist.c
deleted file mode 100644
index 11cf14d..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlist.c
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rd.h"
-#include "rdlist.h"
-
-
-void rd_list_dump (const char *what, const rd_list_t *rl) {
-        int i;
-        printf("%s: (rd_list_t*)%p cnt %d, size %d, elems %p:\n",
-               what, rl, rl->rl_cnt, rl->rl_size, rl->rl_elems);
-        for (i = 0 ; i < rl->rl_cnt ; i++)
-                printf("  #%d: %p at &%p\n", i,
-                       rl->rl_elems[i], &rl->rl_elems[i]);
-}
-
-void rd_list_grow (rd_list_t *rl, size_t size) {
-        rd_assert(!(rl->rl_flags & RD_LIST_F_FIXED_SIZE));
-        rl->rl_size += (int)size;
-        if (unlikely(rl->rl_size == 0))
-                return; /* avoid zero allocations */
-        rl->rl_elems = rd_realloc(rl->rl_elems,
-                                  sizeof(*rl->rl_elems) * rl->rl_size);
-}
-
-rd_list_t *
-rd_list_init (rd_list_t *rl, int initial_size, void (*free_cb) (void *)) {
-        memset(rl, 0, sizeof(*rl));
-
-	if (initial_size > 0)
-		rd_list_grow(rl, initial_size);
-
-        rl->rl_free_cb = free_cb;
-
-        return rl;
-}
-
-rd_list_t *rd_list_new (int initial_size, void (*free_cb) (void *)) {
-	rd_list_t *rl = malloc(sizeof(*rl));
-	rd_list_init(rl, initial_size, free_cb);
-	rl->rl_flags |= RD_LIST_F_ALLOCATED;
-	return rl;
-}
-
-void rd_list_prealloc_elems (rd_list_t *rl, size_t elemsize, size_t size) {
-	size_t allocsize;
-	char *p;
-	size_t i;
-
-	rd_assert(!rl->rl_elems);
-
-	/* Allocation layout:
-	 *   void *ptrs[cnt];
-	 *   elems[elemsize][cnt];
-	 */
-
-	allocsize = (sizeof(void *) * size) + (elemsize * size);
-	rl->rl_elems = rd_malloc(allocsize);
-
-	/* p points to first element's memory. */
-	p = (char *)&rl->rl_elems[size];
-
-	/* Pointer -> elem mapping */
-	for (i = 0 ; i < size ; i++, p += elemsize)
-		rl->rl_elems[i] = p;
-
-	rl->rl_size = (int)size;
-	rl->rl_cnt = 0;
-	rl->rl_flags |= RD_LIST_F_FIXED_SIZE;
-}
-
-
-void rd_list_free_cb (rd_list_t *rl, void *ptr) {
-        if (rl->rl_free_cb && ptr)
-                rl->rl_free_cb(ptr);
-}
-
-
-void *rd_list_add (rd_list_t *rl, void *elem) {
-        if (rl->rl_cnt == rl->rl_size)
-                rd_list_grow(rl, rl->rl_size ? rl->rl_size * 2 : 16);
-	rl->rl_flags &= ~RD_LIST_F_SORTED;
-	if (elem)
-		rl->rl_elems[rl->rl_cnt] = elem;
-	return rl->rl_elems[rl->rl_cnt++];
-}
-
-static void rd_list_remove0 (rd_list_t *rl, int idx) {
-        rd_assert(idx < rl->rl_cnt);
-
-        if (idx + 1 < rl->rl_cnt)
-                memmove(&rl->rl_elems[idx],
-                        &rl->rl_elems[idx+1],
-                        sizeof(*rl->rl_elems) * (rl->rl_cnt - (idx+1)));
-        rl->rl_cnt--;
-}
-
-void *rd_list_remove (rd_list_t *rl, void *match_elem) {
-        void *elem;
-        int i;
-
-        RD_LIST_FOREACH(elem, rl, i) {
-                if (elem == match_elem) {
-                        rd_list_remove0(rl, i);
-                        return elem;
-                }
-        }
-
-        return NULL;
-}
-
-
-void *rd_list_remove_cmp (rd_list_t *rl, void *match_elem,
-                          int (*cmp) (void *_a, void *_b)) {
-        void *elem;
-        int i;
-
-        RD_LIST_FOREACH(elem, rl, i) {
-                if (match_elem == cmp ||
-                    !cmp(elem, match_elem)) {
-                        rd_list_remove0(rl, i);
-                        return elem;
-                }
-        }
-
-        return NULL;
-}
-
-
-/**
- * Trampoline to avoid the double pointers in callbacks.
- *
- * rl_elems is a **, but to avoid having the application do the cumbersome
- * ** -> * casting we wrap this here and provide a simple * pointer to the
- * the callbacks.
- *
- * This is true for all list comparator uses, i.e., both sort() and find().
- */
-static RD_TLS int (*rd_list_cmp_curr) (const void *, const void *);
-
-static RD_INLINE
-int rd_list_cmp_trampoline (const void *_a, const void *_b) {
-	const void *a = *(const void **)_a, *b = *(const void **)_b;
-
-	return rd_list_cmp_curr(a, b);
-}
-
-void rd_list_sort (rd_list_t *rl, int (*cmp) (const void *, const void *)) {
-	rd_list_cmp_curr = cmp;
-        qsort(rl->rl_elems, rl->rl_cnt, sizeof(*rl->rl_elems),
-	      rd_list_cmp_trampoline);
-	rl->rl_flags |= RD_LIST_F_SORTED;
-}
-
-void rd_list_clear (rd_list_t *rl) {
-        rl->rl_cnt = 0;
-	rl->rl_flags &= ~RD_LIST_F_SORTED;
-}
-
-
-void rd_list_destroy (rd_list_t *rl) {
-
-        if (rl->rl_elems) {
-                int i;
-                if (rl->rl_free_cb) {
-                        for (i = 0 ; i < rl->rl_cnt ; i++)
-                                if (rl->rl_elems[i])
-                                        rl->rl_free_cb(rl->rl_elems[i]);
-                }
-
-		rd_free(rl->rl_elems);
-        }
-
-	if (rl->rl_flags & RD_LIST_F_ALLOCATED)
-		rd_free(rl);
-}
-
-
-void *rd_list_elem (const rd_list_t *rl, int idx) {
-        if (likely(idx < rl->rl_cnt))
-                return (void *)rl->rl_elems[idx];
-        return NULL;
-}
-
-void *rd_list_find (const rd_list_t *rl, const void *match,
-                    int (*cmp) (const void *, const void *)) {
-        int i;
-        const void *elem;
-
-	if (rl->rl_flags & RD_LIST_F_SORTED) {
-		void **r;
-		rd_list_cmp_curr = cmp;
-		r = bsearch(&match/*ptrptr to match elems*/,
-			    rl->rl_elems, rl->rl_cnt,
-			    sizeof(*rl->rl_elems), rd_list_cmp_trampoline);
-		return r ? *r : NULL;
-	}
-
-        RD_LIST_FOREACH(elem, rl, i) {
-                if (!cmp(match, elem))
-                        return (void *)elem;
-        }
-
-        return NULL;
-}
-
-
-int rd_list_cmp (const rd_list_t *a, rd_list_t *b,
-		 int (*cmp) (const void *, const void *)) {
-	int i;
-
-	i = a->rl_cnt - b->rl_cnt;
-	if (i)
-		return i;
-
-	for (i = 0 ; i < a->rl_cnt ; i++) {
-		int r = cmp(a->rl_elems[i], b->rl_elems[i]);
-		if (r)
-			return r;
-	}
-
-	return 0;
-}
-
-
-/**
- * @brief Simple element pointer comparator
- */
-int rd_list_cmp_ptr (const void *a, const void *b) {
-        if (a < b)
-                return -1;
-        else if (a > b)
-                return 1;
-        return 0;
-}
-
-
-void rd_list_apply (rd_list_t *rl,
-                    int (*cb) (void *elem, void *opaque), void *opaque) {
-        void *elem;
-        int i;
-
-        RD_LIST_FOREACH(elem, rl, i) {
-                if (!cb(elem, opaque)) {
-                        rd_list_remove0(rl, i);
-                        i--;
-                }
-        }
-
-        return;
-}
-
-
-/**
- * @brief Default element copier that simply assigns the original pointer.
- */
-static void *rd_list_nocopy_ptr (const void *elem, void *opaque) {
-        return (void *)elem;
-}
-
-
-rd_list_t *rd_list_copy (const rd_list_t *src,
-                         void *(*copy_cb) (const void *elem, void *opaque),
-                         void *opaque) {
-        rd_list_t *dst;
-
-        dst = rd_list_new(src->rl_cnt, src->rl_free_cb);
-
-        rd_list_copy_to(dst, src, copy_cb, opaque);
-        return dst;
-}
-
-
-void rd_list_copy_to (rd_list_t *dst, const rd_list_t *src,
-                      void *(*copy_cb) (const void *elem, void *opaque),
-                      void *opaque) {
-        void *elem;
-        int i;
-
-        if (!copy_cb)
-                copy_cb = rd_list_nocopy_ptr;
-
-        RD_LIST_FOREACH(elem, src, i) {
-                void *celem = copy_cb(elem, opaque);
-                if (celem)
-                        rd_list_add(dst, celem);
-        }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlist.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlist.h b/thirdparty/librdkafka-0.11.1/src/rdlist.h
deleted file mode 100644
index 27233e3..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlist.h
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-
-/**
- *
- * Simple light-weight append-only list to be used as a collection convenience.
- *
- */
-
-typedef struct rd_list_s {
-        int    rl_size;
-        int    rl_cnt;
-        void **rl_elems;
-	void (*rl_free_cb) (void *);
-	int    rl_flags;
-#define RD_LIST_F_ALLOCATED  0x1  /* The rd_list_t is allocated,
-				   * will be free on destroy() */
-#define RD_LIST_F_SORTED     0x2  /* Set by sort(), cleared by any mutations.
-				   * When this flag is set bsearch() is used
-				   * by find(), otherwise a linear search. */
-#define RD_LIST_F_FIXED_SIZE 0x4  /* Assert on grow */
-#define RD_LIST_F_UNIQUE     0x8  /* Don't allow duplicates:
-                                   * ONLY ENFORCED BY CALLER. */
-} rd_list_t;
-
-
-/**
- * @brief Initialize a list, preallocate space for 'initial_size' elements
- *       (optional).
- *       List elements will optionally be freed by \p free_cb.
- *
- * @returns \p rl
- */
-rd_list_t *
-rd_list_init (rd_list_t *rl, int initial_size, void (*free_cb) (void *));
-
-
-/**
- * Allocate a new list pointer and initialize it according to rd_list_init().
- *
- * Use rd_list_destroy() to free.
- */
-rd_list_t *rd_list_new (int initial_size, void (*free_cb) (void *));
-
-
-/**
- * @brief Prepare list to for an additional \p size elements.
- *        This is an optimization to avoid incremental grows.
- */
-void rd_list_grow (rd_list_t *rl, size_t size);
-
-/**
- * @brief Preallocate elements to avoid having to pass an allocated pointer to
- *        rd_list_add(), instead pass NULL to rd_list_add() and use the returned
- *        pointer as the element.
- *
- * @param elemsize element size
- * @param size number of elements
- *
- * @remark Preallocated element lists can't grow past \p size.
- */
-void rd_list_prealloc_elems (rd_list_t *rl, size_t elemsize, size_t size);
-
-
-/**
- * @brief Free a pointer using the list's free_cb
- *
- * @remark If no free_cb is set, or \p ptr is NULL, dont do anything
- *
- * Typical use is rd_list_free_cb(rd_list_remove_cmp(....));
- */
-void rd_list_free_cb (rd_list_t *rl, void *ptr);
-
-
-/**
- * @brief Append element to list
- *
- * @returns \p elem. If \p elem is NULL the default element for that index
- *          will be returned (for use with set_elems).
- */
-void *rd_list_add (rd_list_t *rl, void *elem);
-
-
-/**
- * Remove element from list.
- * This is a slow O(n) + memmove operation.
- * Returns the removed element.
- */
-void *rd_list_remove (rd_list_t *rl, void *match_elem);
-
-/**
- * Remove element from list using comparator.
- * See rd_list_remove()
- */
-void *rd_list_remove_cmp (rd_list_t *rl, void *match_elem,
-                         int (*cmp) (void *_a, void *_b));
-
-/**
- * Sort list using comparator
- */
-void rd_list_sort (rd_list_t *rl, int (*cmp) (const void *, const void *));
-
-
-/**
- * Empties the list (but does not free any memory)
- */
-void rd_list_clear (rd_list_t *rl);
-
-
-/**
- * Empties the list, frees the element array, and optionally frees
- * each element using the registered \c rl->rl_free_cb.
- *
- * If the list was previously allocated with rd_list_new() it will be freed.
- */
-void rd_list_destroy (rd_list_t *rl);
-
-
-/**
- * Returns the element at index 'idx', or NULL if out of range.
- *
- * Typical iteration is:
- *    int i = 0;
- *    my_type_t *obj;
- *    while ((obj = rd_list_elem(rl, i++)))
- *        do_something(obj);
- */
-void *rd_list_elem (const rd_list_t *rl, int idx);
-
-#define RD_LIST_FOREACH(elem,listp,idx) \
-        for (idx = 0 ; (elem = rd_list_elem(listp, idx)) ; idx++)
-
-#define RD_LIST_FOREACH_REVERSE(elem,listp,idx)                         \
-        for (idx = (listp)->rl_cnt-1 ;                                  \
-             idx >= 0 && (elem = rd_list_elem(listp, idx)) ; idx--)
-
-/**
- * Returns the number of elements in list.
- */
-static RD_INLINE RD_UNUSED int rd_list_cnt (const rd_list_t *rl) {
-        return rl->rl_cnt;
-}
-
-
-/**
- * Returns true if list is empty
- */
-#define rd_list_empty(rl) (rd_list_cnt(rl) == 0)
-
-
-
-/**
- * Find element using comparator
- * 'match' will be the first argument to 'cmp', and each element (up to a match)
- * will be the second argument to 'cmp'.
- */
-void *rd_list_find (const rd_list_t *rl, const void *match,
-                    int (*cmp) (const void *, const void *));
-
-
-
-/**
- * @brief Compare list \p a to \p b.
- *
- * @returns < 0 if a was "lesser" than b,
- *          > 0 if a was "greater" than b,
- *            0 if a and b are equal.
- */
-int rd_list_cmp (const rd_list_t *a, rd_list_t *b,
-                 int (*cmp) (const void *, const void *));
-
-/**
- * @brief Simple element pointer comparator
- */
-int rd_list_cmp_ptr (const void *a, const void *b);
-
-
-/**
- * @brief Apply \p cb to each element in list, if \p cb returns 0
- *        the element will be removed (but not freed).
- */
-void rd_list_apply (rd_list_t *rl,
-                    int (*cb) (void *elem, void *opaque), void *opaque);
-
-
-
-/**
- * @brief Copy list \p src, returning a new list,
- *        using optional \p copy_cb (per elem)
- */
-rd_list_t *rd_list_copy (const rd_list_t *src,
-                         void *(*copy_cb) (const void *elem, void *opaque),
-                         void *opaque);
-
-
-/**
- * @brief Copy list \p src to \p dst using optional \p copy_cb (per elem)
- * @remark The destination list is not initialized or copied by this function.
- * @remark copy_cb() may return NULL in which case no element is added,
- *                   but the copy callback might have done so itself.
- */
-void rd_list_copy_to (rd_list_t *dst, const rd_list_t *src,
-                      void *(*copy_cb) (const void *elem, void *opaque),
-                      void *opaque);
-
-/**
- * @brief String copier for rd_list_copy()
- */
-static RD_UNUSED
-void *rd_list_string_copy (const void *elem, void *opaque) {
-        return rd_strdup((const char *)elem);
-}
-
-
-/**
- * Debugging: Print list to stdout.
- */
-void rd_list_dump (const char *what, const rd_list_t *rl);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlog.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlog.c b/thirdparty/librdkafka-0.11.1/src/rdlog.c
deleted file mode 100644
index 3f0d29a..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlog.c
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * librd - Rapid Development C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdlog.h"
-
-#include <stdarg.h>
-#include <string.h>
-#include <ctype.h>
-
-
-
-
-void rd_hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
-	const char *p = (const char *)ptr;
-	size_t of = 0;
-
-
-	if (name)
-		fprintf(fp, "%s hexdump (%"PRIusz" bytes):\n", name, len);
-
-	for (of = 0 ; of < len ; of += 16) {
-		char hexen[16*3+1];
-		char charen[16+1];
-		int hof = 0;
-
-		int cof = 0;
-		unsigned int i;
-
-		for (i = (unsigned int)of ; i < (unsigned int)of + 16 && i < len ; i++) {
-			hof += rd_snprintf(hexen+hof, sizeof(hexen)-hof,
-					   "%02x ",
-					   p[i] & 0xff);
-			cof += rd_snprintf(charen+cof, sizeof(charen)-cof, "%c",
-					   isprint((int)p[i]) ? p[i] : '.');
-		}
-		fprintf(fp, "%08zx: %-48s %-16s\n",
-			of, hexen, charen);
-	}
-}
-
-
-void rd_iov_print (const char *what, int iov_idx, const struct iovec *iov,
-                   int hexdump) {
-        printf("%s:  iov #%i: %"PRIusz"\n", what, iov_idx,
-               (size_t)iov->iov_len);
-        if (hexdump)
-                rd_hexdump(stdout, what, iov->iov_base, iov->iov_len);
-}
-
-
-void rd_msghdr_print (const char *what, const struct msghdr *msg,
-                      int hexdump) {
-        int i;
-        size_t len = 0;
-
-        printf("%s: iovlen %"PRIusz"\n", what, (size_t)msg->msg_iovlen);
-
-        for (i = 0 ; i < (int)msg->msg_iovlen ; i++) {
-                rd_iov_print(what, i, &msg->msg_iov[i], hexdump);
-                len += msg->msg_iov[i].iov_len;
-        }
-        printf("%s: ^ message was %"PRIusz" bytes in total\n", what, len);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlog.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlog.h b/thirdparty/librdkafka-0.11.1/src/rdlog.h
deleted file mode 100644
index 95066e2..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlog.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * librd - Rapid Development C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-void rd_hexdump (FILE *fp, const char *name, const void *ptr, size_t len);
-
-void rd_iov_print (const char *what, int iov_idx, const struct iovec *iov,
-                   int hexdump);
-struct msghdr;
-void rd_msghdr_print (const char *what, const struct msghdr *msg,
-                      int hexdump);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdports.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdports.c b/thirdparty/librdkafka-0.11.1/src/rdports.c
deleted file mode 100644
index a34195b..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdports.c
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-* librdkafka - Apache Kafka C library
-*
-* Copyright (c) 2016 Magnus Edenhill
-* All rights reserved.
-*
-* Redistribution and use in source and binary forms, with or without
-* modification, are permitted provided that the following conditions are met:
-*
-* 1. Redistributions of source code must retain the above copyright notice,
-*    this list of conditions and the following disclaimer.
-* 2. Redistributions in binary form must reproduce the above copyright notice,
-*    this list of conditions and the following disclaimer in the documentation
-*    and/or other materials provided with the distribution.
-*
-* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-* POSSIBILITY OF SUCH DAMAGE.
-*/
-
-/**
- * System portability
- */
-
-#include "rd.h"
-
-
-#include <stdlib.h>
-
-/**
- * qsort_r substitute
- * This nicely explains why we wont bother with the native implementation
- * on Win32 (qsort_s), OSX/FreeBSD (qsort_r with diff args):
- * http://forum.theorex.tech/t/different-declarations-of-qsort-r-on-mac-and-linux/93/2
- */
-static RD_TLS int (*rd_qsort_r_cmp) (const void *, const void *, void *);
-static RD_TLS void *rd_qsort_r_arg;
-
-static RD_UNUSED
-int rd_qsort_r_trampoline (const void *a, const void *b) {
-        return rd_qsort_r_cmp(a, b, rd_qsort_r_arg);
-}
-
-void rd_qsort_r (void *base, size_t nmemb, size_t size,
-                 int (*compar)(const void *, const void *, void *),
-                 void *arg) {
-        rd_qsort_r_cmp = compar;
-        rd_qsort_r_arg = arg;
-        qsort(base, nmemb, size, rd_qsort_r_trampoline);
-        rd_qsort_r_cmp = NULL;
-        rd_qsort_r_arg = NULL;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdports.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdports.h b/thirdparty/librdkafka-0.11.1/src/rdports.h
deleted file mode 100644
index 44cef55..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdports.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-* librdkafka - Apache Kafka C library
-*
-* Copyright (c) 2016 Magnus Edenhill
-* All rights reserved.
-*
-* Redistribution and use in source and binary forms, with or without
-* modification, are permitted provided that the following conditions are met:
-*
-* 1. Redistributions of source code must retain the above copyright notice,
-*    this list of conditions and the following disclaimer.
-* 2. Redistributions in binary form must reproduce the above copyright notice,
-*    this list of conditions and the following disclaimer in the documentation
-*    and/or other materials provided with the distribution.
-*
-* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-* POSSIBILITY OF SUCH DAMAGE.
-*/
-#pragma once
-
-
-void rd_qsort_r (void *base, size_t nmemb, size_t size,
-              int (*compar)(const void *, const void *, void *),
-              void *arg);