You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:44 UTC
[16/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h
deleted file mode 100644
index 49d1d29..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.h
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdlist.h"
-
-extern const char *rd_kafka_topic_state_names[];
-
-
-/* rd_kafka_itopic_t: internal representation of a topic */
-struct rd_kafka_itopic_s {
- TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;
-
- rd_refcnt_t rkt_refcnt;
-
- rwlock_t rkt_lock;
- rd_kafkap_str_t *rkt_topic;
-
- shptr_rd_kafka_toppar_t *rkt_ua; /* unassigned partition */
- shptr_rd_kafka_toppar_t **rkt_p;
- int32_t rkt_partition_cnt;
-
- rd_list_t rkt_desp; /* Desired partitions
- * that are not yet seen
- * in the cluster. */
-
- rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata
- * update for this topic. */
-
- mtx_t rkt_app_lock; /* Protects rkt_app_* */
- rd_kafka_topic_t *rkt_app_rkt; /* A shared topic pointer
- * to be used for callbacks
- * to the application. */
-
- int rkt_app_refcnt; /* Number of active rkt's new()ed
- * by application. */
-
- enum {
- RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */
- RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */
- RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
- } rkt_state;
-
- int rkt_flags;
-#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL 0x1 /* Leader lost/unavailable
- * for at least one partition. */
-
- rd_kafka_t *rkt_rk;
-
- shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */
-
- rd_kafka_topic_conf_t rkt_conf;
-};
-
-#define rd_kafka_topic_rdlock(rkt) rwlock_rdlock(&(rkt)->rkt_lock)
-#define rd_kafka_topic_wrlock(rkt) rwlock_wrlock(&(rkt)->rkt_lock)
-#define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock)
-#define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock)
-
-
-/* Converts a shptr..itopic_t to an internal itopic_t */
-#define rd_kafka_topic_s2i(s_rkt) rd_shared_ptr_obj(s_rkt)
-
-/* Converts an application topic_t (a shptr topic) to an internal itopic_t */
-#define rd_kafka_topic_a2i(app_rkt) \
- rd_kafka_topic_s2i((shptr_rd_kafka_itopic_t *)app_rkt)
-
-/* Converts a shptr..itopic_t to an app topic_t (they are the same thing) */
-#define rd_kafka_topic_s2a(s_rkt) ((rd_kafka_topic_t *)(s_rkt))
-
-/* Converts an app topic_t to a shptr..itopic_t (they are the same thing) */
-#define rd_kafka_topic_a2s(app_rkt) ((shptr_rd_kafka_itopic_t *)(app_rkt))
-
-
-
-
-
-/**
- * Returns a shared pointer for the topic.
- */
-#define rd_kafka_topic_keep(rkt) \
- rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, shptr_rd_kafka_itopic_t)
-
-/* Same, but casts to an app topic_t */
-#define rd_kafka_topic_keep_a(rkt) \
- ((rd_kafka_topic_t *)rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, \
- shptr_rd_kafka_itopic_t))
-
-void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt);
-
-
-/**
- * Frees a shared pointer previously returned by ..topic_keep()
- */
-static RD_INLINE RD_UNUSED void
-rd_kafka_topic_destroy0 (shptr_rd_kafka_itopic_t *s_rkt) {
- rd_shared_ptr_put(s_rkt,
- &rd_kafka_topic_s2i(s_rkt)->rkt_refcnt,
- rd_kafka_topic_destroy_final(
- rd_kafka_topic_s2i(s_rkt)));
-}
-
-
-shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic,
- rd_kafka_topic_conf_t *conf,
- int *existing, int do_lock);
-
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line,
- rd_kafka_t *rk,
- const char *topic,
- int do_lock);
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line,
- rd_kafka_t *rk,
- const rd_kafkap_str_t *topic);
-#define rd_kafka_topic_find(rk,topic,do_lock) \
- rd_kafka_topic_find_fl(__FUNCTION__,__LINE__,rk,topic,do_lock)
-#define rd_kafka_topic_find0(rk,topic) \
- rd_kafka_topic_find0_fl(__FUNCTION__,__LINE__,rk,topic)
-int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b);
-
-void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt);
-
-void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt);
-
-int rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb,
- const struct rd_kafka_metadata_topic *mdt);
-
-int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);
-
-
-typedef struct rd_kafka_topic_info_s {
- const char *topic; /**< Allocated along with struct */
- int partition_cnt;
-} rd_kafka_topic_info_t;
-
-
-int rd_kafka_topic_info_cmp (const void *_a, const void *_b);
-rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic,
- int partition_cnt);
-void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti);
-
-int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern,
- const char *topic);
-
-int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp,
- int32_t leader_id, rd_kafka_broker_t *rkb);
-
-rd_kafka_resp_err_t
-rd_kafka_topics_leader_query_sync (rd_kafka_t *rk, int all_topics,
- const rd_list_t *topics, int timeout_ms);
-void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt,
- int do_rk_lock);
-#define rd_kafka_topic_leader_query(rk,rkt) \
- rd_kafka_topic_leader_query0(rk,rkt,1/*lock*/)
-
-#define rd_kafka_topic_fast_leader_query(rk) \
- rd_kafka_metadata_fast_leader_query(rk)
-
-void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c
deleted file mode 100644
index 3a5f93c..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.c
+++ /dev/null
@@ -1,1523 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#ifdef _MSC_VER
-#pragma comment(lib, "ws2_32.lib")
-#endif
-
-#define __need_IOV_MAX
-
-#define _DARWIN_C_SOURCE /* MSG_DONTWAIT */
-
-#include "rdkafka_int.h"
-#include "rdaddr.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_transport_int.h"
-#include "rdkafka_broker.h"
-
-#include <errno.h>
-
-#if WITH_VALGRIND
-/* OpenSSL relies on uninitialized memory, which Valgrind will whine about.
- * We use in-code Valgrind macros to suppress those warnings. */
-#include <valgrind/memcheck.h>
-#else
-#define VALGRIND_MAKE_MEM_DEFINED(A,B)
-#endif
-
-
-#ifdef _MSC_VER
-#define socket_errno WSAGetLastError()
-#else
-#include <sys/socket.h>
-#define socket_errno errno
-#define SOCKET_ERROR -1
-#endif
-
-/* AIX doesn't have MSG_DONTWAIT */
-#ifndef MSG_DONTWAIT
-# define MSG_DONTWAIT MSG_NONBLOCK
-#endif
-
-
-#if WITH_SSL
-static mtx_t *rd_kafka_ssl_locks;
-static int rd_kafka_ssl_locks_cnt;
-#endif
-
-
-
-/**
- * Low-level socket close
- */
-static void rd_kafka_transport_close0 (rd_kafka_t *rk, int s) {
- if (rk->rk_conf.closesocket_cb)
- rk->rk_conf.closesocket_cb(s, rk->rk_conf.opaque);
- else {
-#ifndef _MSC_VER
- close(s);
-#else
- closesocket(s);
-#endif
- }
-
-}
-
-/**
- * Close and destroy a transport handle
- */
-void rd_kafka_transport_close (rd_kafka_transport_t *rktrans) {
-#if WITH_SSL
- if (rktrans->rktrans_ssl) {
- SSL_shutdown(rktrans->rktrans_ssl);
- SSL_free(rktrans->rktrans_ssl);
- }
-#endif
-
- rd_kafka_sasl_close(rktrans);
-
- if (rktrans->rktrans_recv_buf)
- rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
-
- if (rktrans->rktrans_s != -1)
- rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,
- rktrans->rktrans_s);
-
- rd_free(rktrans);
-}
-
-
-static const char *socket_strerror(int err) {
-#ifdef _MSC_VER
- static RD_TLS char buf[256];
- rd_strerror_w32(err, buf, sizeof(buf));
- return buf;
-#else
- return rd_strerror(err);
-#endif
-}
-
-
-
-
-#ifndef _MSC_VER
-/**
- * @brief sendmsg() abstraction, converting a list of segments to iovecs.
- * @remark should only be called if the number of segments is > 1.
- */
-ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
- rd_slice_t *slice,
- char *errstr, size_t errstr_size) {
- struct iovec iov[IOV_MAX];
- struct msghdr msg = { .msg_iov = iov };
- size_t iovlen;
- ssize_t r;
-
- rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX,
- /* FIXME: Measure the effects of this */
- rktrans->rktrans_sndbuf_size);
- msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen;
-
-#ifdef sun
- /* See recvmsg() comment. Setting it here to be safe. */
- socket_errno = EAGAIN;
-#endif
-
- r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT
-#ifdef MSG_NOSIGNAL
- | MSG_NOSIGNAL
-#endif
- );
-
- if (r == -1) {
- if (socket_errno == EAGAIN)
- return 0;
- rd_snprintf(errstr, errstr_size, "%s", rd_strerror(errno));
- }
-
- /* Update buffer read position */
- rd_slice_read(slice, NULL, (size_t)r);
-
- return r;
-}
-#endif
-
-
-/**
- * @brief Plain send() abstraction
- */
-static ssize_t
-rd_kafka_transport_socket_send0 (rd_kafka_transport_t *rktrans,
- rd_slice_t *slice,
- char *errstr, size_t errstr_size) {
- ssize_t sum = 0;
- const void *p;
- size_t rlen;
-
- while ((rlen = rd_slice_peeker(slice, &p))) {
- ssize_t r;
-
- r = send(rktrans->rktrans_s, p,
-#ifdef _MSC_VER
- (int)rlen, (int)0
-#else
- rlen, 0
-#endif
- );
-
-#ifdef _MSC_VER
- if (unlikely(r == SOCKET_ERROR)) {
- if (sum > 0 || WSAGetLastError() == WSAEWOULDBLOCK)
- return sum;
- else {
- rd_snprintf(errstr, errstr_size, "%s",
- socket_strerror(WSAGetLastError()));
- return -1;
- }
- }
-#else
- if (unlikely(r <= 0)) {
- if (r == 0 || errno == EAGAIN)
- return 0;
- rd_snprintf(errstr, errstr_size, "%s",
- socket_strerror(socket_errno));
- return -1;
- }
-#endif
-
- /* Update buffer read position */
- rd_slice_read(slice, NULL, (size_t)r);
-
- sum += r;
-
- /* FIXME: remove this and try again immediately and let
- * the next write() call fail instead? */
- if ((size_t)r < rlen)
- break;
- }
-
- return sum;
-}
-
-
-static ssize_t
-rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans,
- rd_slice_t *slice,
- char *errstr, size_t errstr_size) {
-#ifndef _MSC_VER
- /* FIXME: Use sendmsg() with iovecs if there's more than one segment
- * remaining, otherwise (or if platform does not have sendmsg)
- * use plain send(). */
- return rd_kafka_transport_socket_sendmsg(rktrans, slice,
- errstr, errstr_size);
-#endif
- return rd_kafka_transport_socket_send0(rktrans, slice,
- errstr, errstr_size);
-}
-
-
-
-#ifndef _MSC_VER
-/**
- * @brief recvmsg() abstraction, converting a list of segments to iovecs.
- * @remark should only be called if the number of segments is > 1.
- */
-static ssize_t
-rd_kafka_transport_socket_recvmsg (rd_kafka_transport_t *rktrans,
- rd_buf_t *rbuf,
- char *errstr, size_t errstr_size) {
- ssize_t r;
- struct iovec iov[IOV_MAX];
- struct msghdr msg = { .msg_iov = iov };
- size_t iovlen;
-
- rd_buf_get_write_iov(rbuf, msg.msg_iov, &iovlen, IOV_MAX,
- /* FIXME: Measure the effects of this */
- rktrans->rktrans_rcvbuf_size);
- msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen;
-
-#ifdef sun
- /* SunOS doesn't seem to set errno when recvmsg() fails
- * due to no data and MSG_DONTWAIT is set. */
- socket_errno = EAGAIN;
-#endif
- r = recvmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT);
- if (unlikely(r <= 0)) {
- if (r == -1 && socket_errno == EAGAIN)
- return 0;
- else if (r == 0) {
- /* Receive 0 after POLLIN event means
- * connection closed. */
- rd_snprintf(errstr, errstr_size, "Disconnected");
- return -1;
- } else if (r == -1) {
- rd_snprintf(errstr, errstr_size, "%s",
- rd_strerror(errno));
- return -1;
- }
- }
-
- /* Update buffer write position */
- rd_buf_write(rbuf, NULL, (size_t)r);
-
- return r;
-}
-#endif
-
-
-/**
- * @brief Plain recv()
- */
-static ssize_t
-rd_kafka_transport_socket_recv0 (rd_kafka_transport_t *rktrans,
- rd_buf_t *rbuf,
- char *errstr, size_t errstr_size) {
- ssize_t sum = 0;
- void *p;
- size_t len;
-
- while ((len = rd_buf_get_writable(rbuf, &p))) {
- ssize_t r;
-
- r = recv(rktrans->rktrans_s, p,
-#ifdef _MSC_VER
- (int)
-#endif
- len,
- 0);
-
-#ifdef _MSC_VER
- if (unlikely(r == SOCKET_ERROR)) {
- if (WSAGetLastError() == WSAEWOULDBLOCK)
- return sum;
- rd_snprintf(errstr, errstr_size, "%s",
- socket_strerror(WSAGetLastError()));
- return -1;
- }
-#else
- if (unlikely(r <= 0)) {
- if (r == -1 && socket_errno == EAGAIN)
- return 0;
- else if (r == 0) {
- /* Receive 0 after POLLIN event means
- * connection closed. */
- rd_snprintf(errstr, errstr_size,
- "Disconnected");
- return -1;
- } else if (r == -1) {
- rd_snprintf(errstr, errstr_size, "%s",
- rd_strerror(errno));
- return -1;
- }
- }
-#endif
-
- /* Update buffer write position */
- rd_buf_write(rbuf, NULL, (size_t)r);
-
- sum += r;
-
- /* FIXME: remove this and try again immediately and let
- * the next recv() call fail instead? */
- if ((size_t)r < len)
- break;
- }
- return sum;
-}
-
-
-static ssize_t
-rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans,
- rd_buf_t *buf,
- char *errstr, size_t errstr_size) {
-#ifndef _MSC_VER
- /* FIXME: Use recvmsg() with iovecs if there's more than one segment
- * remaining, otherwise (or if platform does not have sendmsg)
- * use plain send(). */
- return rd_kafka_transport_socket_recvmsg(rktrans, buf,
- errstr, errstr_size);
-#endif
- return rd_kafka_transport_socket_recv0(rktrans, buf,
- errstr, errstr_size);
-}
-
-
-
-
-
-/**
- * CONNECT state is failed (errstr!=NULL) or done (TCP is up, SSL is working..).
- * From this state we either hand control back to the broker code,
- * or if authentication is configured we ente the AUTH state.
- */
-void rd_kafka_transport_connect_done (rd_kafka_transport_t *rktrans,
- char *errstr) {
- rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
-
- rd_kafka_broker_connect_done(rkb, errstr);
-}
-
-
-
-#if WITH_SSL
-
-
-/**
- * Serves the entire OpenSSL error queue and logs each error.
- * The last error is not logged but returned in 'errstr'.
- *
- * If 'rkb' is non-NULL broker-specific logging will be used,
- * else it will fall back on global 'rk' debugging.
- */
-static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
- char *errstr, size_t errstr_size) {
- unsigned long l;
- const char *file, *data;
- int line, flags;
- int cnt = 0;
-
- while ((l = ERR_get_error_line_data(&file, &line, &data, &flags)) != 0) {
- char buf[256];
-
- if (cnt++ > 0) {
- /* Log last message */
- if (rkb)
- rd_rkb_log(rkb, LOG_ERR, "SSL", "%s", errstr);
- else
- rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr);
- }
-
- ERR_error_string_n(l, buf, sizeof(buf));
-
- rd_snprintf(errstr, errstr_size, "%s:%d: %s: %s",
- file, line, buf, (flags & ERR_TXT_STRING) ? data : "");
-
- }
-
- if (cnt == 0)
- rd_snprintf(errstr, errstr_size, "No error");
-
- return errstr;
-}
-
-
-static void rd_kafka_transport_ssl_lock_cb (int mode, int i,
- const char *file, int line) {
- if (mode & CRYPTO_LOCK)
- mtx_lock(&rd_kafka_ssl_locks[i]);
- else
- mtx_unlock(&rd_kafka_ssl_locks[i]);
-}
-
-static unsigned long rd_kafka_transport_ssl_threadid_cb (void) {
-#ifdef _MSC_VER
- /* Windows makes a distinction between thread handle
- * and thread id, which means we can't use the
- * thrd_current() API that returns the handle. */
- return (unsigned long)GetCurrentThreadId();
-#else
- return (unsigned long)(intptr_t)thrd_current();
-#endif
-}
-
-
-/**
- * Global OpenSSL cleanup.
- */
-void rd_kafka_transport_ssl_term (void) {
- int i;
-
- CRYPTO_set_id_callback(NULL);
- CRYPTO_set_locking_callback(NULL);
- CRYPTO_cleanup_all_ex_data();
-
- for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++)
- mtx_destroy(&rd_kafka_ssl_locks[i]);
-
- rd_free(rd_kafka_ssl_locks);
-
-}
-
-
-/**
- * Global OpenSSL init.
- */
-void rd_kafka_transport_ssl_init (void) {
- int i;
-
- rd_kafka_ssl_locks_cnt = CRYPTO_num_locks();
- rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt *
- sizeof(*rd_kafka_ssl_locks));
- for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++)
- mtx_init(&rd_kafka_ssl_locks[i], mtx_plain);
-
- CRYPTO_set_id_callback(rd_kafka_transport_ssl_threadid_cb);
- CRYPTO_set_locking_callback(rd_kafka_transport_ssl_lock_cb);
-
- SSL_load_error_strings();
- SSL_library_init();
- OpenSSL_add_all_algorithms();
-}
-
-
-/**
- * Set transport IO event polling based on SSL error.
- *
- * Returns -1 on permanent errors.
- *
- * Locality: broker thread
- */
-static RD_INLINE int
-rd_kafka_transport_ssl_io_update (rd_kafka_transport_t *rktrans, int ret,
- char *errstr, size_t errstr_size) {
- int serr = SSL_get_error(rktrans->rktrans_ssl, ret);
- int serr2;
-
- switch (serr)
- {
- case SSL_ERROR_WANT_READ:
- rd_kafka_transport_poll_set(rktrans, POLLIN);
- break;
-
- case SSL_ERROR_WANT_WRITE:
- case SSL_ERROR_WANT_CONNECT:
- rd_kafka_transport_poll_set(rktrans, POLLOUT);
- break;
-
- case SSL_ERROR_SYSCALL:
- if (!(serr2 = SSL_get_error(rktrans->rktrans_ssl, ret))) {
- if (ret == 0)
- errno = ECONNRESET;
- rd_snprintf(errstr, errstr_size,
- "SSL syscall error: %s", rd_strerror(errno));
- } else
- rd_snprintf(errstr, errstr_size,
- "SSL syscall error number: %d: %s", serr2,
- rd_strerror(errno));
- return -1;
-
- case SSL_ERROR_ZERO_RETURN:
- rd_snprintf(errstr, errstr_size, "Disconnected");
- return -1;
-
- default:
- rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb,
- errstr, errstr_size);
- return -1;
- }
-
- return 0;
-}
-
-static ssize_t
-rd_kafka_transport_ssl_send (rd_kafka_transport_t *rktrans,
- rd_slice_t *slice,
- char *errstr, size_t errstr_size) {
- ssize_t sum = 0;
- const void *p;
- size_t rlen;
-
- while ((rlen = rd_slice_peeker(slice, &p))) {
- int r;
-
- r = SSL_write(rktrans->rktrans_ssl, p, (int)rlen);
-
- if (unlikely(r <= 0)) {
- if (rd_kafka_transport_ssl_io_update(rktrans, r,
- errstr,
- errstr_size) == -1)
- return -1;
- else
- return sum;
- }
-
- /* Update buffer read position */
- rd_slice_read(slice, NULL, (size_t)r);
-
- sum += r;
- /* FIXME: remove this and try again immediately and let
- * the next SSL_write() call fail instead? */
- if ((size_t)r < rlen)
- break;
-
- }
- return sum;
-}
-
-static ssize_t
-rd_kafka_transport_ssl_recv (rd_kafka_transport_t *rktrans,
- rd_buf_t *rbuf, char *errstr, size_t errstr_size) {
- ssize_t sum = 0;
- void *p;
- size_t len;
-
- while ((len = rd_buf_get_writable(rbuf, &p))) {
- int r;
-
- r = SSL_read(rktrans->rktrans_ssl, p, (int)len);
-
- if (unlikely(r <= 0)) {
- if (rd_kafka_transport_ssl_io_update(rktrans, r,
- errstr,
- errstr_size) == -1)
- return -1;
- else
- return sum;
- }
-
- VALGRIND_MAKE_MEM_DEFINED(p, r);
-
- /* Update buffer write position */
- rd_buf_write(rbuf, NULL, (size_t)r);
-
- sum += r;
-
- /* FIXME: remove this and try again immediately and let
- * the next SSL_read() call fail instead? */
- if ((size_t)r < len)
- break;
-
- }
- return sum;
-
-}
-
-
-/**
- * OpenSSL password query callback
- *
- * Locality: application thread
- */
-static int rd_kafka_transport_ssl_passwd_cb (char *buf, int size, int rwflag,
- void *userdata) {
- rd_kafka_t *rk = userdata;
- int pwlen;
-
- rd_kafka_dbg(rk, SECURITY, "SSLPASSWD",
- "Private key file \"%s\" requires password",
- rk->rk_conf.ssl.key_location);
-
- if (!rk->rk_conf.ssl.key_password) {
- rd_kafka_log(rk, LOG_WARNING, "SSLPASSWD",
- "Private key file \"%s\" requires password but "
- "no password configured (ssl.key.password)",
- rk->rk_conf.ssl.key_location);
- return -1;
- }
-
-
- pwlen = (int) strlen(rk->rk_conf.ssl.key_password);
- memcpy(buf, rk->rk_conf.ssl.key_password, RD_MIN(pwlen, size));
-
- return pwlen;
-}
-
-/**
- * Set up SSL for a newly connected connection
- *
- * Returns -1 on failure, else 0.
- */
-static int rd_kafka_transport_ssl_connect (rd_kafka_broker_t *rkb,
- rd_kafka_transport_t *rktrans,
- char *errstr, size_t errstr_size) {
- int r;
- char name[RD_KAFKA_NODENAME_SIZE];
- char *t;
-
- rktrans->rktrans_ssl = SSL_new(rkb->rkb_rk->rk_conf.ssl.ctx);
- if (!rktrans->rktrans_ssl)
- goto fail;
-
- if (!SSL_set_fd(rktrans->rktrans_ssl, rktrans->rktrans_s))
- goto fail;
-
-#if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT)
- /* If non-numerical hostname, send it for SNI */
- rd_snprintf(name, sizeof(name), "%s", rkb->rkb_nodename);
- if ((t = strrchr(name, ':')))
- *t = '\0';
- if (!(/*ipv6*/(strchr(name, ':') &&
- strspn(name, "0123456789abcdefABCDEF:.[]%") == strlen(name)) ||
- /*ipv4*/strspn(name, "0123456789.") == strlen(name)) &&
- !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name))
- goto fail;
-#endif
-
- r = SSL_connect(rktrans->rktrans_ssl);
- if (r == 1) {
- /* Connected, highly unlikely since this is a
- * non-blocking operation. */
- rd_kafka_transport_connect_done(rktrans, NULL);
- return 0;
- }
-
-
- if (rd_kafka_transport_ssl_io_update(rktrans, r,
- errstr, errstr_size) == -1)
- return -1;
-
- return 0;
-
- fail:
- rd_kafka_ssl_error(NULL, rkb, errstr, errstr_size);
- return -1;
-}
-
-
-static RD_UNUSED int
-rd_kafka_transport_ssl_io_event (rd_kafka_transport_t *rktrans, int events) {
- int r;
- char errstr[512];
-
- if (events & POLLOUT) {
- r = SSL_write(rktrans->rktrans_ssl, NULL, 0);
- if (rd_kafka_transport_ssl_io_update(rktrans, r,
- errstr,
- sizeof(errstr)) == -1)
- goto fail;
- }
-
- return 0;
-
- fail:
- /* Permanent error */
- rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
- RD_KAFKA_RESP_ERR__TRANSPORT,
- "%s", errstr);
- return -1;
-}
-
-
-/**
- * Verify SSL handshake was valid.
- */
-static int rd_kafka_transport_ssl_verify (rd_kafka_transport_t *rktrans) {
- long int rl;
- X509 *cert;
-
- cert = SSL_get_peer_certificate(rktrans->rktrans_ssl);
- X509_free(cert);
- if (!cert) {
- rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
- RD_KAFKA_RESP_ERR__SSL,
- "Broker did not provide a certificate");
- return -1;
- }
-
- if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) {
- rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
- RD_KAFKA_RESP_ERR__SSL,
- "Failed to verify broker certificate: %s",
- X509_verify_cert_error_string(rl));
- return -1;
- }
-
- rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY",
- "Broker SSL certificate verified");
- return 0;
-}
-
-/**
- * SSL handshake handling.
- * Call repeatedly (based on IO events) until handshake is done.
- *
- * Returns -1 on error, 0 if handshake is still in progress, or 1 on completion.
- */
-static int rd_kafka_transport_ssl_handshake (rd_kafka_transport_t *rktrans) {
- rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
- char errstr[512];
- int r;
-
- r = SSL_do_handshake(rktrans->rktrans_ssl);
- if (r == 1) {
- /* SSL handshake done. Verify. */
- if (rd_kafka_transport_ssl_verify(rktrans) == -1)
- return -1;
-
- rd_kafka_transport_connect_done(rktrans, NULL);
- return 1;
-
- } else if (rd_kafka_transport_ssl_io_update(rktrans, r,
- errstr,
- sizeof(errstr)) == -1) {
- rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__SSL,
- "SSL handshake failed: %s%s", errstr,
- strstr(errstr, "unexpected message") ?
- ": client authentication might be "
- "required (see broker log)" : "");
- return -1;
- }
-
- return 0;
-}
-
-
-/**
- * Once per rd_kafka_t handle cleanup of OpenSSL
- *
- * Locality: any thread
- *
- * NOTE: rd_kafka_wrlock() MUST be held
- */
-void rd_kafka_transport_ssl_ctx_term (rd_kafka_t *rk) {
- SSL_CTX_free(rk->rk_conf.ssl.ctx);
- rk->rk_conf.ssl.ctx = NULL;
-}
-
-/**
- * Once per rd_kafka_t handle initialization of OpenSSL
- *
- * Locality: application thread
- *
- * NOTE: rd_kafka_wrlock() MUST be held
- */
-int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
- char *errstr, size_t errstr_size) {
- int r;
- SSL_CTX *ctx;
-
- if (errstr_size > 0)
- errstr[0] = '\0';
-
- ctx = SSL_CTX_new(SSLv23_client_method());
- if (!ctx) {
- rd_snprintf(errstr, errstr_size,
- "SSLv23_client_method() failed: ");
- goto fail;
- }
-
-#ifdef SSL_OP_NO_SSLv3
- /* Disable SSLv3 (unsafe) */
- SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3);
-#endif
-
- /* Key file password callback */
- SSL_CTX_set_default_passwd_cb(ctx, rd_kafka_transport_ssl_passwd_cb);
- SSL_CTX_set_default_passwd_cb_userdata(ctx, rk);
-
- /* Ciphers */
- if (rk->rk_conf.ssl.cipher_suites) {
- rd_kafka_dbg(rk, SECURITY, "SSL",
- "Setting cipher list: %s",
- rk->rk_conf.ssl.cipher_suites);
- if (!SSL_CTX_set_cipher_list(ctx,
- rk->rk_conf.ssl.cipher_suites)) {
- /* Set a string that will prefix the
- * the OpenSSL error message (which is lousy)
- * to make it more meaningful. */
- rd_snprintf(errstr, errstr_size,
- "ssl.cipher.suites failed: ");
- goto fail;
- }
- }
-
-
- if (rk->rk_conf.ssl.ca_location) {
- /* CA certificate location, either file or directory. */
- int is_dir = rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location);
-
- rd_kafka_dbg(rk, SECURITY, "SSL",
- "Loading CA certificate(s) from %s %s",
- is_dir ? "directory":"file",
- rk->rk_conf.ssl.ca_location);
-
- r = SSL_CTX_load_verify_locations(ctx,
- !is_dir ?
- rk->rk_conf.ssl.
- ca_location : NULL,
- is_dir ?
- rk->rk_conf.ssl.
- ca_location : NULL);
-
- if (r != 1) {
- rd_snprintf(errstr, errstr_size,
- "ssl.ca.location failed: ");
- goto fail;
- }
- } else {
- /* Use default CA certificate paths: ignore failures. */
- r = SSL_CTX_set_default_verify_paths(ctx);
- if (r != 1)
- rd_kafka_dbg(rk, SECURITY, "SSL",
- "SSL_CTX_set_default_verify_paths() "
- "failed: ignoring");
- }
-
- if (rk->rk_conf.ssl.crl_location) {
- rd_kafka_dbg(rk, SECURITY, "SSL",
- "Loading CRL from file %s",
- rk->rk_conf.ssl.crl_location);
-
- r = SSL_CTX_load_verify_locations(ctx,
- rk->rk_conf.ssl.crl_location,
- NULL);
-
- if (r != 1) {
- rd_snprintf(errstr, errstr_size,
- "ssl.crl.location failed: ");
- goto fail;
- }
-
-
- rd_kafka_dbg(rk, SECURITY, "SSL",
- "Enabling CRL checks");
-
- X509_STORE_set_flags(SSL_CTX_get_cert_store(ctx),
- X509_V_FLAG_CRL_CHECK);
- }
-
- if (rk->rk_conf.ssl.cert_location) {
- rd_kafka_dbg(rk, SECURITY, "SSL",
- "Loading certificate from file %s",
- rk->rk_conf.ssl.cert_location);
-
- r = SSL_CTX_use_certificate_chain_file(ctx,
- rk->rk_conf.ssl.cert_location);
-
- if (r != 1) {
- rd_snprintf(errstr, errstr_size,
- "ssl.certificate.location failed: ");
- goto fail;
- }
- }
-
- if (rk->rk_conf.ssl.key_location) {
- rd_kafka_dbg(rk, SECURITY, "SSL",
- "Loading private key file from %s",
- rk->rk_conf.ssl.key_location);
-
- r = SSL_CTX_use_PrivateKey_file(ctx,
- rk->rk_conf.ssl.key_location,
- SSL_FILETYPE_PEM);
- if (r != 1) {
- rd_snprintf(errstr, errstr_size,
- "ssl.key.location failed: ");
- goto fail;
- }
- }
-
-
- SSL_CTX_set_mode(ctx, SSL_MODE_ENABLE_PARTIAL_WRITE);
-
- rk->rk_conf.ssl.ctx = ctx;
- return 0;
-
- fail:
- r = (int)strlen(errstr);
- rd_kafka_ssl_error(rk, NULL, errstr+r,
- (int)errstr_size > r ? (int)errstr_size - r : 0);
- SSL_CTX_free(ctx);
-
- return -1;
-}
-
-
-#endif /* WITH_SSL */
-
-
-ssize_t
-rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
- rd_slice_t *slice, char *errstr, size_t errstr_size) {
-
-#if WITH_SSL
- if (rktrans->rktrans_ssl)
- return rd_kafka_transport_ssl_send(rktrans, slice,
- errstr, errstr_size);
- else
-#endif
- return rd_kafka_transport_socket_send(rktrans, slice,
- errstr, errstr_size);
-}
-
-
-ssize_t
-rd_kafka_transport_recv (rd_kafka_transport_t *rktrans, rd_buf_t *rbuf,
- char *errstr, size_t errstr_size) {
-#if WITH_SSL
- if (rktrans->rktrans_ssl)
- return rd_kafka_transport_ssl_recv(rktrans, rbuf,
- errstr, errstr_size);
- else
-#endif
- return rd_kafka_transport_socket_recv(rktrans, rbuf,
- errstr, errstr_size);
-}
-
-
-
-
-/**
- * Length framed receive handling.
- * Currently only supports a the following framing:
- * [int32_t:big_endian_length_of_payload][payload]
- *
- * To be used on POLLIN event, will return:
- * -1: on fatal error (errstr will be updated, *rkbufp remains unset)
- * 0: still waiting for data (*rkbufp remains unset)
- * 1: data complete, (buffer returned in *rkbufp)
- */
-int rd_kafka_transport_framed_recv (rd_kafka_transport_t *rktrans,
- rd_kafka_buf_t **rkbufp,
- char *errstr, size_t errstr_size) {
- rd_kafka_buf_t *rkbuf = rktrans->rktrans_recv_buf;
- ssize_t r;
- const int log_decode_errors = LOG_ERR;
-
- /* States:
- * !rktrans_recv_buf: initial state; set up buf to receive header.
- * rkbuf_totlen == 0: awaiting header
- * rkbuf_totlen > 0: awaiting payload
- */
-
- if (!rkbuf) {
- rkbuf = rd_kafka_buf_new(1, 4/*length field's length*/);
- /* Set up buffer reader for the length field */
- rd_buf_write_ensure(&rkbuf->rkbuf_buf, 4, 4);
- rktrans->rktrans_recv_buf = rkbuf;
- }
-
-
- r = rd_kafka_transport_recv(rktrans, &rkbuf->rkbuf_buf,
- errstr, errstr_size);
- if (r == 0)
- return 0;
- else if (r == -1)
- return -1;
-
- if (rkbuf->rkbuf_totlen == 0) {
- /* Frame length not known yet. */
- int32_t frame_len;
-
- if (rd_buf_write_pos(&rkbuf->rkbuf_buf) < sizeof(frame_len)) {
- /* Wait for entire frame header. */
- return 0;
- }
-
- /* Initialize reader */
- rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, 4);
-
- /* Reader header: payload length */
- rd_kafka_buf_read_i32(rkbuf, &frame_len);
-
- if (frame_len < 0 ||
- frame_len > rktrans->rktrans_rkb->
- rkb_rk->rk_conf.recv_max_msg_size) {
- rd_snprintf(errstr, errstr_size,
- "Invalid frame size %"PRId32, frame_len);
- return -1;
- }
-
- rkbuf->rkbuf_totlen = 4 + frame_len;
- if (frame_len == 0) {
- /* Payload is empty, we're done. */
- rktrans->rktrans_recv_buf = NULL;
- *rkbufp = rkbuf;
- return 1;
- }
-
- /* Allocate memory to hold entire frame payload in contigious
- * memory. */
- rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf, frame_len);
-
- /* Try reading directly, there is probably more data available*/
- return rd_kafka_transport_framed_recv(rktrans, rkbufp,
- errstr, errstr_size);
- }
-
- if (rd_buf_write_pos(&rkbuf->rkbuf_buf) == rkbuf->rkbuf_totlen) {
- /* Payload is complete. */
- rktrans->rktrans_recv_buf = NULL;
- *rkbufp = rkbuf;
- return 1;
- }
-
- /* Wait for more data */
- return 0;
-
- err_parse:
- if (rkbuf)
- rd_kafka_buf_destroy(rkbuf);
- rd_snprintf(errstr, errstr_size, "Frame header parsing failed: %s",
- rd_kafka_err2str(rkbuf->rkbuf_err));
- return -1;
-}
-
-
-/**
- * TCP connection established.
- * Set up socket options, SSL, etc.
- *
- * Locality: broker thread
- */
-static void rd_kafka_transport_connected (rd_kafka_transport_t *rktrans) {
- rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
- unsigned int slen;
-
- rd_rkb_dbg(rkb, BROKER, "CONNECT",
- "Connected to %s",
- rd_sockaddr2str(rkb->rkb_addr_last,
- RD_SOCKADDR2STR_F_PORT |
- RD_SOCKADDR2STR_F_FAMILY));
-
- /* Set socket send & receive buffer sizes if configuerd */
- if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
- if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
- (void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
- sizeof(rkb->rkb_rk->rk_conf.
- socket_sndbuf_size)) == SOCKET_ERROR)
- rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
- "Failed to set socket send "
- "buffer size to %i: %s",
- rkb->rkb_rk->rk_conf.socket_sndbuf_size,
- socket_strerror(socket_errno));
- }
-
- if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
- if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
- (void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
- sizeof(rkb->rkb_rk->rk_conf.
- socket_rcvbuf_size)) == SOCKET_ERROR)
- rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
- "Failed to set socket receive "
- "buffer size to %i: %s",
- rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
- socket_strerror(socket_errno));
- }
-
- /* Get send and receive buffer sizes to allow limiting
- * the total number of bytes passed with iovecs to sendmsg()
- * and recvmsg(). */
- slen = sizeof(rktrans->rktrans_rcvbuf_size);
- if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
- (void *)&rktrans->rktrans_rcvbuf_size,
- &slen) == SOCKET_ERROR) {
- rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
- "Failed to get socket receive "
- "buffer size: %s: assuming 1MB",
- socket_strerror(socket_errno));
- rktrans->rktrans_rcvbuf_size = 1024*1024;
- } else if (rktrans->rktrans_rcvbuf_size < 1024 * 64)
- rktrans->rktrans_rcvbuf_size = 1024*64; /* Use at least 64KB */
-
- slen = sizeof(rktrans->rktrans_sndbuf_size);
- if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
- (void *)&rktrans->rktrans_sndbuf_size,
- &slen) == SOCKET_ERROR) {
- rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
- "Failed to get socket send "
- "buffer size: %s: assuming 1MB",
- socket_strerror(socket_errno));
- rktrans->rktrans_sndbuf_size = 1024*1024;
- } else if (rktrans->rktrans_sndbuf_size < 1024 * 64)
- rktrans->rktrans_sndbuf_size = 1024*64; /* Use at least 64KB */
-
-
-#ifdef TCP_NODELAY
- if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
- int one = 1;
- if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY,
- (void *)&one, sizeof(one)) == SOCKET_ERROR)
- rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
- "Failed to disable Nagle (TCP_NODELAY) "
- "on socket %d: %s",
- socket_strerror(socket_errno));
- }
-#endif
-
-
-#if WITH_SSL
- if (rkb->rkb_proto == RD_KAFKA_PROTO_SSL ||
- rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL) {
- char errstr[512];
-
- /* Set up SSL connection.
- * This is also an asynchronous operation so dont
- * propagate to broker_connect_done() just yet. */
- if (rd_kafka_transport_ssl_connect(rkb, rktrans,
- errstr,
- sizeof(errstr)) == -1) {
- rd_kafka_transport_connect_done(rktrans, errstr);
- return;
- }
- return;
- }
-#endif
-
- /* Propagate connect success */
- rd_kafka_transport_connect_done(rktrans, NULL);
-}
-
-
-
-/**
- * @brief the kernel SO_ERROR in \p errp for the given transport.
- * @returns 0 if getsockopt() was succesful (and \p and errp can be trusted),
- * else -1 in which case \p errp 's value is undefined.
- */
-static int rd_kafka_transport_get_socket_error (rd_kafka_transport_t *rktrans,
- int *errp) {
- socklen_t intlen = sizeof(*errp);
-
- if (getsockopt(rktrans->rktrans_s, SOL_SOCKET,
- SO_ERROR, (void *)errp, &intlen) == -1) {
- rd_rkb_dbg(rktrans->rktrans_rkb, BROKER, "SO_ERROR",
- "Failed to get socket error: %s",
- socket_strerror(socket_errno));
- return -1;
- }
-
- return 0;
-}
-
-
-/**
- * IO event handler.
- *
- * Locality: broker thread
- */
-static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans,
- int events) {
- char errstr[512];
- int r;
- rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
-
- switch (rkb->rkb_state)
- {
- case RD_KAFKA_BROKER_STATE_CONNECT:
-#if WITH_SSL
- if (rktrans->rktrans_ssl) {
- /* Currently setting up SSL connection:
- * perform handshake. */
- rd_kafka_transport_ssl_handshake(rktrans);
- return;
- }
-#endif
-
- /* Asynchronous connect finished, read status. */
- if (!(events & (POLLOUT|POLLERR|POLLHUP)))
- return;
-
- if (rd_kafka_transport_get_socket_error(rktrans, &r) == -1) {
- rd_kafka_broker_fail(
- rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
- "Connect to %s failed: "
- "unable to get status from "
- "socket %d: %s",
- rd_sockaddr2str(rkb->rkb_addr_last,
- RD_SOCKADDR2STR_F_PORT |
- RD_SOCKADDR2STR_F_FAMILY),
- rktrans->rktrans_s,
- rd_strerror(socket_errno));
- } else if (r != 0) {
- /* Connect failed */
- errno = r;
- rd_snprintf(errstr, sizeof(errstr),
- "Connect to %s failed: %s",
- rd_sockaddr2str(rkb->rkb_addr_last,
- RD_SOCKADDR2STR_F_PORT |
- RD_SOCKADDR2STR_F_FAMILY),
- rd_strerror(r));
-
- rd_kafka_transport_connect_done(rktrans, errstr);
- } else {
- /* Connect succeeded */
- rd_kafka_transport_connected(rktrans);
- }
- break;
-
- case RD_KAFKA_BROKER_STATE_AUTH:
- /* SASL handshake */
- if (rd_kafka_sasl_io_event(rktrans, events,
- errstr, sizeof(errstr)) == -1) {
- errno = EINVAL;
- rd_kafka_broker_fail(rkb, LOG_ERR,
- RD_KAFKA_RESP_ERR__AUTHENTICATION,
- "SASL authentication failure: %s",
- errstr);
- return;
- }
- break;
-
- case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
- case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
- case RD_KAFKA_BROKER_STATE_UP:
- case RD_KAFKA_BROKER_STATE_UPDATE:
-
- if (events & POLLIN) {
- while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
- rd_kafka_recv(rkb) > 0)
- ;
- }
-
- if (events & POLLHUP) {
- rd_kafka_broker_fail(rkb,
- rkb->rkb_rk->rk_conf.
- log_connection_close ?
- LOG_NOTICE : LOG_DEBUG,
- RD_KAFKA_RESP_ERR__TRANSPORT,
- "Connection closed");
- return;
- }
-
- if (events & POLLOUT) {
- while (rd_kafka_send(rkb) > 0)
- ;
- }
- break;
-
- case RD_KAFKA_BROKER_STATE_INIT:
- case RD_KAFKA_BROKER_STATE_DOWN:
- rd_kafka_assert(rkb->rkb_rk, !*"bad state");
- }
-}
-
-
-/**
- * Poll and serve IOs
- *
- * Locality: broker thread
- */
-void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
- int timeout_ms) {
- rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
- int events;
-
- if (rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
- rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0)
- rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT);
-
- if ((events = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0)
- return;
-
- rd_kafka_transport_poll_clear(rktrans, POLLOUT);
-
- rd_kafka_transport_io_event(rktrans, events);
-}
-
-
-/**
- * Initiate asynchronous connection attempt.
- *
- * Locality: broker thread
- */
-rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
- const rd_sockaddr_inx_t *sinx,
- char *errstr,
- size_t errstr_size) {
- rd_kafka_transport_t *rktrans;
- int s = -1;
- int on = 1;
- int r;
-
- rkb->rkb_addr_last = sinx;
-
- s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,
- SOCK_STREAM, IPPROTO_TCP,
- rkb->rkb_rk->rk_conf.opaque);
- if (s == -1) {
- rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
- socket_strerror(socket_errno));
- return NULL;
- }
-
-
-#ifdef SO_NOSIGPIPE
- /* Disable SIGPIPE signalling for this socket on OSX */
- if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1)
- rd_rkb_dbg(rkb, BROKER, "SOCKET",
- "Failed to set SO_NOSIGPIPE: %s",
- socket_strerror(socket_errno));
-#endif
-
- /* Enable TCP keep-alives, if configured. */
- if (rkb->rkb_rk->rk_conf.socket_keepalive) {
-#ifdef SO_KEEPALIVE
- if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE,
- (void *)&on, sizeof(on)) == SOCKET_ERROR)
- rd_rkb_dbg(rkb, BROKER, "SOCKET",
- "Failed to set SO_KEEPALIVE: %s",
- socket_strerror(socket_errno));
-#else
- rd_rkb_dbg(rkb, BROKER, "SOCKET",
- "System does not support "
- "socket.keepalive.enable (SO_KEEPALIVE)");
-#endif
- }
-
- /* Set the socket to non-blocking */
- if ((r = rd_fd_set_nonblocking(s))) {
- rd_snprintf(errstr, errstr_size,
- "Failed to set socket non-blocking: %s",
- socket_strerror(r));
- goto err;
- }
-
- rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) "
- "with socket %i",
- rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY |
- RD_SOCKADDR2STR_F_PORT),
- rd_kafka_secproto_names[rkb->rkb_proto], s);
-
- /* Connect to broker */
- if (rkb->rkb_rk->rk_conf.connect_cb) {
- r = rkb->rkb_rk->rk_conf.connect_cb(
- s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
- rkb->rkb_name, rkb->rkb_rk->rk_conf.opaque);
- } else {
- if (connect(s, (struct sockaddr *)sinx,
- RD_SOCKADDR_INX_LEN(sinx)) == SOCKET_ERROR &&
- (socket_errno != EINPROGRESS
-#ifdef _MSC_VER
- && socket_errno != WSAEWOULDBLOCK
-#endif
- ))
- r = socket_errno;
- else
- r = 0;
- }
-
- if (r != 0) {
- rd_rkb_dbg(rkb, BROKER, "CONNECT",
- "couldn't connect to %s: %s (%i)",
- rd_sockaddr2str(sinx,
- RD_SOCKADDR2STR_F_PORT |
- RD_SOCKADDR2STR_F_FAMILY),
- socket_strerror(r), r);
- rd_snprintf(errstr, errstr_size,
- "Failed to connect to broker at %s: %s",
- rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
- socket_strerror(r));
- goto err;
- }
-
- /* Create transport handle */
- rktrans = rd_calloc(1, sizeof(*rktrans));
- rktrans->rktrans_rkb = rkb;
- rktrans->rktrans_s = s;
- rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;
- if (rkb->rkb_wakeup_fd[0] != -1) {
- rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
- rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];
- }
-
-
- /* Poll writability to trigger on connection success/failure. */
- rd_kafka_transport_poll_set(rktrans, POLLOUT);
-
- return rktrans;
-
- err:
- if (s != -1)
- rd_kafka_transport_close0(rkb->rkb_rk, s);
-
- return NULL;
-}
-
-
-
-void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event) {
- rktrans->rktrans_pfd[0].events |= event;
-}
-
-void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event) {
- rktrans->rktrans_pfd[0].events &= ~event;
-}
-
-
-int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) {
- int r;
-#ifndef _MSC_VER
- r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
- if (r <= 0)
- return r;
-#else
- r = WSAPoll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
- if (r == 0) {
- /* Workaround for broken WSAPoll() while connecting:
- * failed connection attempts are not indicated at all by WSAPoll()
- * so we need to check the socket error when Poll returns 0.
- * Issue #525 */
- r = ECONNRESET;
- if (unlikely(rktrans->rktrans_rkb->rkb_state ==
- RD_KAFKA_BROKER_STATE_CONNECT &&
- (rd_kafka_transport_get_socket_error(rktrans,
- &r) == -1 ||
- r != 0))) {
- char errstr[512];
- errno = r;
- rd_snprintf(errstr, sizeof(errstr),
- "Connect to %s failed: %s",
- rd_sockaddr2str(rktrans->rktrans_rkb->
- rkb_addr_last,
- RD_SOCKADDR2STR_F_PORT |
- RD_SOCKADDR2STR_F_FAMILY),
- socket_strerror(r));
- rd_kafka_transport_connect_done(rktrans, errstr);
- return -1;
- } else
- return 0;
- } else if (r == SOCKET_ERROR)
- return -1;
-#endif
- rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);
-
- if (rktrans->rktrans_pfd[1].revents & POLLIN) {
- /* Read wake-up fd data and throw away, just used for wake-ups*/
- char buf[512];
- if (rd_read((int)rktrans->rktrans_pfd[1].fd,
- buf, sizeof(buf)) == -1) {
- /* Ignore warning */
- }
- }
-
- return rktrans->rktrans_pfd[0].revents;
-}
-
-
-
-
-
-#if 0
-/**
- * Global cleanup.
- * This is dangerous and SHOULD NOT be called since it will rip
- * the rug from under the application if it uses any of this functionality
- * in its own code. This means we might leak some memory on exit.
- */
-void rd_kafka_transport_term (void) {
-#ifdef _MSC_VER
- (void)WSACleanup(); /* FIXME: dangerous */
-#endif
-}
-#endif
-
-void rd_kafka_transport_init(void) {
-#ifdef _MSC_VER
- WSADATA d;
- (void)WSAStartup(MAKEWORD(2, 2), &d);
-#endif
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h
deleted file mode 100644
index fcd2580..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#ifndef _MSC_VER
-#include <poll.h>
-#endif
-
-#include "rdbuf.h"
-#include "rdaddr.h"
-
-typedef struct rd_kafka_transport_s rd_kafka_transport_t;
-
-void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
- int timeout_ms);
-
-ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
- rd_slice_t *slice,
- char *errstr, size_t errstr_size);
-ssize_t rd_kafka_transport_recv (rd_kafka_transport_t *rktrans,
- rd_buf_t *rbuf,
- char *errstr, size_t errstr_size);
-int rd_kafka_transport_framed_recv (rd_kafka_transport_t *rktrans,
- rd_kafka_buf_t **rkbufp,
- char *errstr, size_t errstr_size);
-struct rd_kafka_broker_s;
-rd_kafka_transport_t *rd_kafka_transport_connect(struct rd_kafka_broker_s *rkb, const rd_sockaddr_inx_t *sinx,
- char *errstr, size_t errstr_size);
-void rd_kafka_transport_connect_done (rd_kafka_transport_t *rktrans,
- char *errstr);
-
-void rd_kafka_transport_close(rd_kafka_transport_t *rktrans);
-void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event);
-void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event);
-int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout);
-
-#if WITH_SSL
-void rd_kafka_transport_ssl_ctx_term (rd_kafka_t *rk);
-int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
- char *errstr, size_t errstr_size);
-
-void rd_kafka_transport_ssl_term (void);
-void rd_kafka_transport_ssl_init (void);
-#endif
-void rd_kafka_transport_term (void);
-void rd_kafka_transport_init(void);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h
deleted file mode 100644
index 8ae79b4..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_transport_int.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-/* This header file is to be used by .c files needing access to the
- * rd_kafka_transport_t struct internals. */
-
-#include "rdkafka_sasl.h"
-
-#if WITH_SSL
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-#endif
-
-struct rd_kafka_transport_s {
- int rktrans_s;
-
- rd_kafka_broker_t *rktrans_rkb;
-
-#if WITH_SSL
- SSL *rktrans_ssl;
-#endif
-
- struct {
- void *state; /* SASL implementation
- * state handle */
-
- int complete; /* Auth was completed early
- * from the client's perspective
- * (but we might still have to
- * wait for server reply). */
-
- /* SASL framing buffers */
- struct msghdr msg;
- struct iovec iov[2];
-
- char *recv_buf;
- int recv_of; /* Received byte count */
- int recv_len; /* Expected receive length for
- * current frame. */
- } rktrans_sasl;
-
- rd_kafka_buf_t *rktrans_recv_buf; /* Used with framed_recvmsg */
-
- /* Two pollable fds:
- * - TCP socket
- * - wake-up fd
- */
-#ifndef _MSC_VER
- struct pollfd rktrans_pfd[2];
-#else
- WSAPOLLFD rktrans_pfd[2];
-#endif
- int rktrans_pfd_cnt;
-
- size_t rktrans_rcvbuf_size; /**< Socket receive buffer size */
- size_t rktrans_sndbuf_size; /**< Socket send buffer size */
-};
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlist.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlist.c b/thirdparty/librdkafka-0.11.1/src/rdlist.c
deleted file mode 100644
index 11cf14d..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlist.c
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rd.h"
-#include "rdlist.h"
-
-
-void rd_list_dump (const char *what, const rd_list_t *rl) {
- int i;
- printf("%s: (rd_list_t*)%p cnt %d, size %d, elems %p:\n",
- what, rl, rl->rl_cnt, rl->rl_size, rl->rl_elems);
- for (i = 0 ; i < rl->rl_cnt ; i++)
- printf(" #%d: %p at &%p\n", i,
- rl->rl_elems[i], &rl->rl_elems[i]);
-}
-
-void rd_list_grow (rd_list_t *rl, size_t size) {
- rd_assert(!(rl->rl_flags & RD_LIST_F_FIXED_SIZE));
- rl->rl_size += (int)size;
- if (unlikely(rl->rl_size == 0))
- return; /* avoid zero allocations */
- rl->rl_elems = rd_realloc(rl->rl_elems,
- sizeof(*rl->rl_elems) * rl->rl_size);
-}
-
-rd_list_t *
-rd_list_init (rd_list_t *rl, int initial_size, void (*free_cb) (void *)) {
- memset(rl, 0, sizeof(*rl));
-
- if (initial_size > 0)
- rd_list_grow(rl, initial_size);
-
- rl->rl_free_cb = free_cb;
-
- return rl;
-}
-
-rd_list_t *rd_list_new (int initial_size, void (*free_cb) (void *)) {
- rd_list_t *rl = malloc(sizeof(*rl));
- rd_list_init(rl, initial_size, free_cb);
- rl->rl_flags |= RD_LIST_F_ALLOCATED;
- return rl;
-}
-
-void rd_list_prealloc_elems (rd_list_t *rl, size_t elemsize, size_t size) {
- size_t allocsize;
- char *p;
- size_t i;
-
- rd_assert(!rl->rl_elems);
-
- /* Allocation layout:
- * void *ptrs[cnt];
- * elems[elemsize][cnt];
- */
-
- allocsize = (sizeof(void *) * size) + (elemsize * size);
- rl->rl_elems = rd_malloc(allocsize);
-
- /* p points to first element's memory. */
- p = (char *)&rl->rl_elems[size];
-
- /* Pointer -> elem mapping */
- for (i = 0 ; i < size ; i++, p += elemsize)
- rl->rl_elems[i] = p;
-
- rl->rl_size = (int)size;
- rl->rl_cnt = 0;
- rl->rl_flags |= RD_LIST_F_FIXED_SIZE;
-}
-
-
-void rd_list_free_cb (rd_list_t *rl, void *ptr) {
- if (rl->rl_free_cb && ptr)
- rl->rl_free_cb(ptr);
-}
-
-
-void *rd_list_add (rd_list_t *rl, void *elem) {
- if (rl->rl_cnt == rl->rl_size)
- rd_list_grow(rl, rl->rl_size ? rl->rl_size * 2 : 16);
- rl->rl_flags &= ~RD_LIST_F_SORTED;
- if (elem)
- rl->rl_elems[rl->rl_cnt] = elem;
- return rl->rl_elems[rl->rl_cnt++];
-}
-
-static void rd_list_remove0 (rd_list_t *rl, int idx) {
- rd_assert(idx < rl->rl_cnt);
-
- if (idx + 1 < rl->rl_cnt)
- memmove(&rl->rl_elems[idx],
- &rl->rl_elems[idx+1],
- sizeof(*rl->rl_elems) * (rl->rl_cnt - (idx+1)));
- rl->rl_cnt--;
-}
-
-void *rd_list_remove (rd_list_t *rl, void *match_elem) {
- void *elem;
- int i;
-
- RD_LIST_FOREACH(elem, rl, i) {
- if (elem == match_elem) {
- rd_list_remove0(rl, i);
- return elem;
- }
- }
-
- return NULL;
-}
-
-
-void *rd_list_remove_cmp (rd_list_t *rl, void *match_elem,
- int (*cmp) (void *_a, void *_b)) {
- void *elem;
- int i;
-
- RD_LIST_FOREACH(elem, rl, i) {
- if (match_elem == cmp ||
- !cmp(elem, match_elem)) {
- rd_list_remove0(rl, i);
- return elem;
- }
- }
-
- return NULL;
-}
-
-
-/**
- * Trampoline to avoid the double pointers in callbacks.
- *
- * rl_elems is a **, but to avoid having the application do the cumbersome
- * ** -> * casting we wrap this here and provide a simple * pointer to the
- * the callbacks.
- *
- * This is true for all list comparator uses, i.e., both sort() and find().
- */
-static RD_TLS int (*rd_list_cmp_curr) (const void *, const void *);
-
-static RD_INLINE
-int rd_list_cmp_trampoline (const void *_a, const void *_b) {
- const void *a = *(const void **)_a, *b = *(const void **)_b;
-
- return rd_list_cmp_curr(a, b);
-}
-
-void rd_list_sort (rd_list_t *rl, int (*cmp) (const void *, const void *)) {
- rd_list_cmp_curr = cmp;
- qsort(rl->rl_elems, rl->rl_cnt, sizeof(*rl->rl_elems),
- rd_list_cmp_trampoline);
- rl->rl_flags |= RD_LIST_F_SORTED;
-}
-
-void rd_list_clear (rd_list_t *rl) {
- rl->rl_cnt = 0;
- rl->rl_flags &= ~RD_LIST_F_SORTED;
-}
-
-
-void rd_list_destroy (rd_list_t *rl) {
-
- if (rl->rl_elems) {
- int i;
- if (rl->rl_free_cb) {
- for (i = 0 ; i < rl->rl_cnt ; i++)
- if (rl->rl_elems[i])
- rl->rl_free_cb(rl->rl_elems[i]);
- }
-
- rd_free(rl->rl_elems);
- }
-
- if (rl->rl_flags & RD_LIST_F_ALLOCATED)
- rd_free(rl);
-}
-
-
-void *rd_list_elem (const rd_list_t *rl, int idx) {
- if (likely(idx < rl->rl_cnt))
- return (void *)rl->rl_elems[idx];
- return NULL;
-}
-
-void *rd_list_find (const rd_list_t *rl, const void *match,
- int (*cmp) (const void *, const void *)) {
- int i;
- const void *elem;
-
- if (rl->rl_flags & RD_LIST_F_SORTED) {
- void **r;
- rd_list_cmp_curr = cmp;
- r = bsearch(&match/*ptrptr to match elems*/,
- rl->rl_elems, rl->rl_cnt,
- sizeof(*rl->rl_elems), rd_list_cmp_trampoline);
- return r ? *r : NULL;
- }
-
- RD_LIST_FOREACH(elem, rl, i) {
- if (!cmp(match, elem))
- return (void *)elem;
- }
-
- return NULL;
-}
-
-
-int rd_list_cmp (const rd_list_t *a, rd_list_t *b,
- int (*cmp) (const void *, const void *)) {
- int i;
-
- i = a->rl_cnt - b->rl_cnt;
- if (i)
- return i;
-
- for (i = 0 ; i < a->rl_cnt ; i++) {
- int r = cmp(a->rl_elems[i], b->rl_elems[i]);
- if (r)
- return r;
- }
-
- return 0;
-}
-
-
-/**
- * @brief Simple element pointer comparator
- */
-int rd_list_cmp_ptr (const void *a, const void *b) {
- if (a < b)
- return -1;
- else if (a > b)
- return 1;
- return 0;
-}
-
-
-void rd_list_apply (rd_list_t *rl,
- int (*cb) (void *elem, void *opaque), void *opaque) {
- void *elem;
- int i;
-
- RD_LIST_FOREACH(elem, rl, i) {
- if (!cb(elem, opaque)) {
- rd_list_remove0(rl, i);
- i--;
- }
- }
-
- return;
-}
-
-
-/**
- * @brief Default element copier that simply assigns the original pointer.
- */
-static void *rd_list_nocopy_ptr (const void *elem, void *opaque) {
- return (void *)elem;
-}
-
-
-rd_list_t *rd_list_copy (const rd_list_t *src,
- void *(*copy_cb) (const void *elem, void *opaque),
- void *opaque) {
- rd_list_t *dst;
-
- dst = rd_list_new(src->rl_cnt, src->rl_free_cb);
-
- rd_list_copy_to(dst, src, copy_cb, opaque);
- return dst;
-}
-
-
-void rd_list_copy_to (rd_list_t *dst, const rd_list_t *src,
- void *(*copy_cb) (const void *elem, void *opaque),
- void *opaque) {
- void *elem;
- int i;
-
- if (!copy_cb)
- copy_cb = rd_list_nocopy_ptr;
-
- RD_LIST_FOREACH(elem, src, i) {
- void *celem = copy_cb(elem, opaque);
- if (celem)
- rd_list_add(dst, celem);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlist.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlist.h b/thirdparty/librdkafka-0.11.1/src/rdlist.h
deleted file mode 100644
index 27233e3..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlist.h
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-
-/**
- *
- * Simple light-weight append-only list to be used as a collection convenience.
- *
- */
-
-typedef struct rd_list_s {
- int rl_size;
- int rl_cnt;
- void **rl_elems;
- void (*rl_free_cb) (void *);
- int rl_flags;
-#define RD_LIST_F_ALLOCATED 0x1 /* The rd_list_t is allocated,
- * will be free on destroy() */
-#define RD_LIST_F_SORTED 0x2 /* Set by sort(), cleared by any mutations.
- * When this flag is set bsearch() is used
- * by find(), otherwise a linear search. */
-#define RD_LIST_F_FIXED_SIZE 0x4 /* Assert on grow */
-#define RD_LIST_F_UNIQUE 0x8 /* Don't allow duplicates:
- * ONLY ENFORCED BY CALLER. */
-} rd_list_t;
-
-
-/**
- * @brief Initialize a list, preallocate space for 'initial_size' elements
- * (optional).
- * List elements will optionally be freed by \p free_cb.
- *
- * @returns \p rl
- */
-rd_list_t *
-rd_list_init (rd_list_t *rl, int initial_size, void (*free_cb) (void *));
-
-
-/**
- * Allocate a new list pointer and initialize it according to rd_list_init().
- *
- * Use rd_list_destroy() to free.
- */
-rd_list_t *rd_list_new (int initial_size, void (*free_cb) (void *));
-
-
-/**
- * @brief Prepare list to for an additional \p size elements.
- * This is an optimization to avoid incremental grows.
- */
-void rd_list_grow (rd_list_t *rl, size_t size);
-
-/**
- * @brief Preallocate elements to avoid having to pass an allocated pointer to
- * rd_list_add(), instead pass NULL to rd_list_add() and use the returned
- * pointer as the element.
- *
- * @param elemsize element size
- * @param size number of elements
- *
- * @remark Preallocated element lists can't grow past \p size.
- */
-void rd_list_prealloc_elems (rd_list_t *rl, size_t elemsize, size_t size);
-
-
-/**
- * @brief Free a pointer using the list's free_cb
- *
- * @remark If no free_cb is set, or \p ptr is NULL, dont do anything
- *
- * Typical use is rd_list_free_cb(rd_list_remove_cmp(....));
- */
-void rd_list_free_cb (rd_list_t *rl, void *ptr);
-
-
-/**
- * @brief Append element to list
- *
- * @returns \p elem. If \p elem is NULL the default element for that index
- * will be returned (for use with set_elems).
- */
-void *rd_list_add (rd_list_t *rl, void *elem);
-
-
-/**
- * Remove element from list.
- * This is a slow O(n) + memmove operation.
- * Returns the removed element.
- */
-void *rd_list_remove (rd_list_t *rl, void *match_elem);
-
-/**
- * Remove element from list using comparator.
- * See rd_list_remove()
- */
-void *rd_list_remove_cmp (rd_list_t *rl, void *match_elem,
- int (*cmp) (void *_a, void *_b));
-
-/**
- * Sort list using comparator
- */
-void rd_list_sort (rd_list_t *rl, int (*cmp) (const void *, const void *));
-
-
-/**
- * Empties the list (but does not free any memory)
- */
-void rd_list_clear (rd_list_t *rl);
-
-
-/**
- * Empties the list, frees the element array, and optionally frees
- * each element using the registered \c rl->rl_free_cb.
- *
- * If the list was previously allocated with rd_list_new() it will be freed.
- */
-void rd_list_destroy (rd_list_t *rl);
-
-
-/**
- * Returns the element at index 'idx', or NULL if out of range.
- *
- * Typical iteration is:
- * int i = 0;
- * my_type_t *obj;
- * while ((obj = rd_list_elem(rl, i++)))
- * do_something(obj);
- */
-void *rd_list_elem (const rd_list_t *rl, int idx);
-
-#define RD_LIST_FOREACH(elem,listp,idx) \
- for (idx = 0 ; (elem = rd_list_elem(listp, idx)) ; idx++)
-
-#define RD_LIST_FOREACH_REVERSE(elem,listp,idx) \
- for (idx = (listp)->rl_cnt-1 ; \
- idx >= 0 && (elem = rd_list_elem(listp, idx)) ; idx--)
-
-/**
- * Returns the number of elements in list.
- */
-static RD_INLINE RD_UNUSED int rd_list_cnt (const rd_list_t *rl) {
- return rl->rl_cnt;
-}
-
-
-/**
- * Returns true if list is empty
- */
-#define rd_list_empty(rl) (rd_list_cnt(rl) == 0)
-
-
-
-/**
- * Find element using comparator
- * 'match' will be the first argument to 'cmp', and each element (up to a match)
- * will be the second argument to 'cmp'.
- */
-void *rd_list_find (const rd_list_t *rl, const void *match,
- int (*cmp) (const void *, const void *));
-
-
-
-/**
- * @brief Compare list \p a to \p b.
- *
- * @returns < 0 if a was "lesser" than b,
- * > 0 if a was "greater" than b,
- * 0 if a and b are equal.
- */
-int rd_list_cmp (const rd_list_t *a, rd_list_t *b,
- int (*cmp) (const void *, const void *));
-
-/**
- * @brief Simple element pointer comparator
- */
-int rd_list_cmp_ptr (const void *a, const void *b);
-
-
-/**
- * @brief Apply \p cb to each element in list, if \p cb returns 0
- * the element will be removed (but not freed).
- */
-void rd_list_apply (rd_list_t *rl,
- int (*cb) (void *elem, void *opaque), void *opaque);
-
-
-
-/**
- * @brief Copy list \p src, returning a new list,
- * using optional \p copy_cb (per elem)
- */
-rd_list_t *rd_list_copy (const rd_list_t *src,
- void *(*copy_cb) (const void *elem, void *opaque),
- void *opaque);
-
-
-/**
- * @brief Copy list \p src to \p dst using optional \p copy_cb (per elem)
- * @remark The destination list is not initialized or copied by this function.
- * @remark copy_cb() may return NULL in which case no element is added,
- * but the copy callback might have done so itself.
- */
-void rd_list_copy_to (rd_list_t *dst, const rd_list_t *src,
- void *(*copy_cb) (const void *elem, void *opaque),
- void *opaque);
-
-/**
- * @brief String copier for rd_list_copy()
- */
-static RD_UNUSED
-void *rd_list_string_copy (const void *elem, void *opaque) {
- return rd_strdup((const char *)elem);
-}
-
-
-/**
- * Debugging: Print list to stdout.
- */
-void rd_list_dump (const char *what, const rd_list_t *rl);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlog.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlog.c b/thirdparty/librdkafka-0.11.1/src/rdlog.c
deleted file mode 100644
index 3f0d29a..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlog.c
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * librd - Rapid Development C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdlog.h"
-
-#include <stdarg.h>
-#include <string.h>
-#include <ctype.h>
-
-
-
-
-void rd_hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
- const char *p = (const char *)ptr;
- size_t of = 0;
-
-
- if (name)
- fprintf(fp, "%s hexdump (%"PRIusz" bytes):\n", name, len);
-
- for (of = 0 ; of < len ; of += 16) {
- char hexen[16*3+1];
- char charen[16+1];
- int hof = 0;
-
- int cof = 0;
- unsigned int i;
-
- for (i = (unsigned int)of ; i < (unsigned int)of + 16 && i < len ; i++) {
- hof += rd_snprintf(hexen+hof, sizeof(hexen)-hof,
- "%02x ",
- p[i] & 0xff);
- cof += rd_snprintf(charen+cof, sizeof(charen)-cof, "%c",
- isprint((int)p[i]) ? p[i] : '.');
- }
- fprintf(fp, "%08zx: %-48s %-16s\n",
- of, hexen, charen);
- }
-}
-
-
-void rd_iov_print (const char *what, int iov_idx, const struct iovec *iov,
- int hexdump) {
- printf("%s: iov #%i: %"PRIusz"\n", what, iov_idx,
- (size_t)iov->iov_len);
- if (hexdump)
- rd_hexdump(stdout, what, iov->iov_base, iov->iov_len);
-}
-
-
-void rd_msghdr_print (const char *what, const struct msghdr *msg,
- int hexdump) {
- int i;
- size_t len = 0;
-
- printf("%s: iovlen %"PRIusz"\n", what, (size_t)msg->msg_iovlen);
-
- for (i = 0 ; i < (int)msg->msg_iovlen ; i++) {
- rd_iov_print(what, i, &msg->msg_iov[i], hexdump);
- len += msg->msg_iov[i].iov_len;
- }
- printf("%s: ^ message was %"PRIusz" bytes in total\n", what, len);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdlog.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdlog.h b/thirdparty/librdkafka-0.11.1/src/rdlog.h
deleted file mode 100644
index 95066e2..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdlog.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * librd - Rapid Development C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-void rd_hexdump (FILE *fp, const char *name, const void *ptr, size_t len);
-
-void rd_iov_print (const char *what, int iov_idx, const struct iovec *iov,
- int hexdump);
-struct msghdr;
-void rd_msghdr_print (const char *what, const struct msghdr *msg,
- int hexdump);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdports.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdports.c b/thirdparty/librdkafka-0.11.1/src/rdports.c
deleted file mode 100644
index a34195b..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdports.c
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-* librdkafka - Apache Kafka C library
-*
-* Copyright (c) 2016 Magnus Edenhill
-* All rights reserved.
-*
-* Redistribution and use in source and binary forms, with or without
-* modification, are permitted provided that the following conditions are met:
-*
-* 1. Redistributions of source code must retain the above copyright notice,
-* this list of conditions and the following disclaimer.
-* 2. Redistributions in binary form must reproduce the above copyright notice,
-* this list of conditions and the following disclaimer in the documentation
-* and/or other materials provided with the distribution.
-*
-* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-* POSSIBILITY OF SUCH DAMAGE.
-*/
-
-/**
- * System portability
- */
-
-#include "rd.h"
-
-
-#include <stdlib.h>
-
-/**
- * qsort_r substitute
- * This nicely explains why we wont bother with the native implementation
- * on Win32 (qsort_s), OSX/FreeBSD (qsort_r with diff args):
- * http://forum.theorex.tech/t/different-declarations-of-qsort-r-on-mac-and-linux/93/2
- */
-static RD_TLS int (*rd_qsort_r_cmp) (const void *, const void *, void *);
-static RD_TLS void *rd_qsort_r_arg;
-
-static RD_UNUSED
-int rd_qsort_r_trampoline (const void *a, const void *b) {
- return rd_qsort_r_cmp(a, b, rd_qsort_r_arg);
-}
-
-void rd_qsort_r (void *base, size_t nmemb, size_t size,
- int (*compar)(const void *, const void *, void *),
- void *arg) {
- rd_qsort_r_cmp = compar;
- rd_qsort_r_arg = arg;
- qsort(base, nmemb, size, rd_qsort_r_trampoline);
- rd_qsort_r_cmp = NULL;
- rd_qsort_r_arg = NULL;
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdports.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdports.h b/thirdparty/librdkafka-0.11.1/src/rdports.h
deleted file mode 100644
index 44cef55..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdports.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-* librdkafka - Apache Kafka C library
-*
-* Copyright (c) 2016 Magnus Edenhill
-* All rights reserved.
-*
-* Redistribution and use in source and binary forms, with or without
-* modification, are permitted provided that the following conditions are met:
-*
-* 1. Redistributions of source code must retain the above copyright notice,
-* this list of conditions and the following disclaimer.
-* 2. Redistributions in binary form must reproduce the above copyright notice,
-* this list of conditions and the following disclaimer in the documentation
-* and/or other materials provided with the distribution.
-*
-* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-* POSSIBILITY OF SUCH DAMAGE.
-*/
-#pragma once
-
-
-void rd_qsort_r (void *base, size_t nmemb, size_t size,
- int (*compar)(const void *, const void *, void *),
- void *arg);