You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:56 UTC

[28/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h
deleted file mode 100644
index 0da8015..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h
+++ /dev/null
@@ -1,338 +0,0 @@
-#pragma once
-
-#include "rdlist.h"
-
-
-/**
- * Forward declarations
- */
-struct rd_kafka_transport_s;
-
-
-/**
- * MessageSet compression codecs
- */
-typedef enum {
-	RD_KAFKA_COMPRESSION_NONE,
-	RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP,
-	RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
-        RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
-	RD_KAFKA_COMPRESSION_INHERIT /* Inherit setting from global conf */
-} rd_kafka_compression_t;
-
-
-typedef enum {
-	RD_KAFKA_PROTO_PLAINTEXT,
-	RD_KAFKA_PROTO_SSL,
-	RD_KAFKA_PROTO_SASL_PLAINTEXT,
-	RD_KAFKA_PROTO_SASL_SSL,
-	RD_KAFKA_PROTO_NUM,
-} rd_kafka_secproto_t;
-
-
-typedef enum {
-	RD_KAFKA_CONFIGURED,
-	RD_KAFKA_LEARNED,
-	RD_KAFKA_INTERNAL,
-} rd_kafka_confsource_t;
-
-typedef	enum {
-	_RK_GLOBAL = 0x1,
-	_RK_PRODUCER = 0x2,
-	_RK_CONSUMER = 0x4,
-	_RK_TOPIC = 0x8,
-        _RK_CGRP = 0x10
-} rd_kafka_conf_scope_t;
-
-typedef enum {
-	_RK_CONF_PROP_SET_REPLACE,  /* Replace current value (default) */
-	_RK_CONF_PROP_SET_ADD,      /* Add value (S2F) */
-	_RK_CONF_PROP_SET_DEL      /* Remove value (S2F) */
-} rd_kafka_conf_set_mode_t;
-
-
-
-typedef enum {
-        RD_KAFKA_OFFSET_METHOD_NONE,
-        RD_KAFKA_OFFSET_METHOD_FILE,
-        RD_KAFKA_OFFSET_METHOD_BROKER
-} rd_kafka_offset_method_t;
-
-
-
-
-/**
- * Optional configuration struct passed to rd_kafka_new*().
- *
- * The struct is populated ted through string properties
- * by calling rd_kafka_conf_set().
- *
- */
-struct rd_kafka_conf_s {
-	/*
-	 * Generic configuration
-	 */
-	int     enabled_events;
-	int     max_msg_size;
-	int     msg_copy_max_size;
-        int     recv_max_msg_size;
-	int     max_inflight;
-	int     metadata_request_timeout_ms;
-	int     metadata_refresh_interval_ms;
-	int     metadata_refresh_fast_cnt;
-	int     metadata_refresh_fast_interval_ms;
-        int     metadata_refresh_sparse;
-        int     metadata_max_age_ms;
-	int     debug;
-	int     broker_addr_ttl;
-        int     broker_addr_family;
-	int     socket_timeout_ms;
-	int     socket_blocking_max_ms;
-	int     socket_sndbuf_size;
-	int     socket_rcvbuf_size;
-        int     socket_keepalive;
-	int     socket_nagle_disable;
-        int     socket_max_fails;
-	char   *client_id_str;
-	char   *brokerlist;
-	int     stats_interval_ms;
-	int     term_sig;
-        int     reconnect_jitter_ms;
-	int     api_version_request;
-	int     api_version_request_timeout_ms;
-	int     api_version_fallback_ms;
-	char   *broker_version_fallback;
-	rd_kafka_secproto_t security_protocol;
-
-#if WITH_SSL
-	struct {
-		SSL_CTX *ctx;
-		char *cipher_suites;
-		char *key_location;
-		char *key_password;
-		char *cert_location;
-		char *ca_location;
-		char *crl_location;
-	} ssl;
-#endif
-
-        struct {
-                const struct rd_kafka_sasl_provider *provider;
-                char *principal;
-                char *mechanisms;
-                char *service_name;
-                char *kinit_cmd;
-                char *keytab;
-                int   relogin_min_time;
-                char *username;
-                char *password;
-#if WITH_SASL_SCRAM
-                /* SCRAM EVP-wrapped hash function
-                 * (return value from EVP_shaX()) */
-                const void/*EVP_MD*/ *scram_evp;
-                /* SCRAM direct hash function (e.g., SHA256()) */
-                unsigned char *(*scram_H) (const unsigned char *d, size_t n,
-                                           unsigned char *md);
-                /* Hash size */
-                size_t         scram_H_size;
-#endif
-        } sasl;
-
-#if WITH_PLUGINS
-        char *plugin_paths;
-        rd_list_t plugins;
-#endif
-
-        /* Interceptors */
-        struct {
-                /* rd_kafka_interceptor_method_t lists */
-                rd_list_t on_conf_set;        /* on_conf_set interceptors
-                                               * (not copied on conf_dup()) */
-                rd_list_t on_conf_dup;        /* .. (not copied) */
-                rd_list_t on_conf_destroy;    /* .. (not copied) */
-                rd_list_t on_new;             /* .. (copied) */
-                rd_list_t on_destroy;         /* .. (copied) */
-                rd_list_t on_send;            /* .. (copied) */
-                rd_list_t on_acknowledgement; /* .. (copied) */
-                rd_list_t on_consume;         /* .. (copied) */
-                rd_list_t on_commit;          /* .. (copied) */
-
-                /* rd_strtup_t list */
-                rd_list_t config;             /* Configuration name=val's
-                                               * handled by interceptors. */
-        } interceptors;
-
-        /* Client group configuration */
-        int    coord_query_intvl_ms;
-
-	int    builtin_features;
-	/*
-	 * Consumer configuration
-	 */
-        int    check_crcs;
-	int    queued_min_msgs;
-        int    queued_max_msg_kbytes;
-        int64_t queued_max_msg_bytes;
-	int    fetch_wait_max_ms;
-        int    fetch_msg_max_bytes;
-	int    fetch_min_bytes;
-	int    fetch_error_backoff_ms;
-        char  *group_id_str;
-
-        rd_kafka_pattern_list_t *topic_blacklist;
-        struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
-                                                   * for automatically
-                                                   * subscribed topics. */
-        int enable_auto_commit;
-	int enable_auto_offset_store;
-        int auto_commit_interval_ms;
-        int group_session_timeout_ms;
-        int group_heartbeat_intvl_ms;
-        rd_kafkap_str_t *group_protocol_type;
-        char *partition_assignment_strategy;
-        rd_list_t partition_assignors;
-	int enabled_assignor_cnt;
-        struct rd_kafka_assignor_s *assignor;
-
-        void (*rebalance_cb) (rd_kafka_t *rk,
-                              rd_kafka_resp_err_t err,
-			      rd_kafka_topic_partition_list_t *partitions,
-                              void *opaque);
-
-        void (*offset_commit_cb) (rd_kafka_t *rk,
-                                  rd_kafka_resp_err_t err,
-                                  rd_kafka_topic_partition_list_t *offsets,
-                                  void *opaque);
-
-        rd_kafka_offset_method_t offset_store_method;
-	int enable_partition_eof;
-
-	/*
-	 * Producer configuration
-	 */
-	int    queue_buffering_max_msgs;
-	int    queue_buffering_max_kbytes;
-	int    buffering_max_ms;
-	int    max_retries;
-	int    retry_backoff_ms;
-	int    batch_num_messages;
-	rd_kafka_compression_t compression_codec;
-	int    dr_err_only;
-
-	/* Message delivery report callback.
-	 * Called once for each produced message, either on
-	 * successful and acknowledged delivery to the broker in which
-	 * case 'err' is 0, or if the message could not be delivered
-	 * in which case 'err' is non-zero (use rd_kafka_err2str()
-	 * to obtain a human-readable error reason).
-	 *
-	 * If the message was produced with neither RD_KAFKA_MSG_F_FREE
-	 * or RD_KAFKA_MSG_F_COPY set then 'payload' is the original
-	 * pointer provided to rd_kafka_produce().
-	 * rdkafka will not perform any further actions on 'payload'
-	 * at this point and the application may rd_free the payload data
-	 * at this point.
-	 *
-	 * 'opaque' is 'conf.opaque', while 'msg_opaque' is
-	 * the opaque pointer provided in the rd_kafka_produce() call.
-	 */
-	void (*dr_cb) (rd_kafka_t *rk,
-		       void *payload, size_t len,
-		       rd_kafka_resp_err_t err,
-		       void *opaque, void *msg_opaque);
-
-        void (*dr_msg_cb) (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
-                           void *opaque);
-
-        /* Consume callback */
-        void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
-
-        /* Log callback */
-        void (*log_cb) (const rd_kafka_t *rk, int level,
-                        const char *fac, const char *buf);
-        int    log_level;
-        int    log_queue;
-        int    log_thread_name;
-        int    log_connection_close;
-
-        /* Error callback */
-	void (*error_cb) (rd_kafka_t *rk, int err,
-			  const char *reason, void *opaque);
-
-	/* Throttle callback */
-	void (*throttle_cb) (rd_kafka_t *rk, const char *broker_name,
-			     int32_t broker_id, int throttle_time_ms,
-			     void *opaque);
-
-	/* Stats callback */
-	int (*stats_cb) (rd_kafka_t *rk,
-			 char *json,
-			 size_t json_len,
-			 void *opaque);
-
-        /* Socket creation callback */
-        int (*socket_cb) (int domain, int type, int protocol, void *opaque);
-
-        /* Connect callback */
-        int (*connect_cb) (int sockfd,
-                           const struct sockaddr *addr,
-                           int addrlen,
-                           const char *id,
-                           void *opaque);
-
-        /* Close socket callback */
-        int (*closesocket_cb) (int sockfd, void *opaque);
-
-		/* File open callback */
-        int (*open_cb) (const char *pathname, int flags, mode_t mode,
-                        void *opaque);
-
-	/* Opaque passed to callbacks. */
-	void  *opaque;
-
-        /* For use with value-less properties. */
-        int     dummy;
-};
-
-int rd_kafka_socket_cb_linux (int domain, int type, int protocol, void *opaque);
-int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
-                                void *opaque);
-#ifndef _MSC_VER
-int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
-                            void *opaque);
-#endif
-int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
-                              void *opaque);
-
-
-
-struct rd_kafka_topic_conf_s {
-	int     required_acks;
-	int32_t request_timeout_ms;
-	int     message_timeout_ms;
-
-	int32_t (*partitioner) (const rd_kafka_topic_t *rkt,
-				const void *keydata, size_t keylen,
-				int32_t partition_cnt,
-				void *rkt_opaque,
-				void *msg_opaque);
-
-	rd_kafka_compression_t compression_codec;
-        int     produce_offset_report;
-
-        int     consume_callback_max_msgs;
-	int     auto_commit;
-	int     auto_commit_interval_ms;
-	int     auto_offset_reset;
-	char   *offset_store_path;
-	int     offset_store_sync_interval_ms;
-
-        rd_kafka_offset_method_t offset_store_method;
-
-	/* Application provided opaque pointer (this is rkt_opaque) */
-	void   *opaque;
-};
-
-
-
-void rd_kafka_anyconf_destroy (int scope, void *conf);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c
deleted file mode 100644
index 5fe783d..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_event.h"
-#include "rd.h"
-
-rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev) {
-	return rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE;
-}
-
-const char *rd_kafka_event_name (const rd_kafka_event_t *rkev) {
-	switch (rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE)
-	{
-	case RD_KAFKA_EVENT_NONE:
-		return "(NONE)";
-	case RD_KAFKA_EVENT_DR:
-		return "DeliveryReport";
-	case RD_KAFKA_EVENT_FETCH:
-		return "Fetch";
-	case RD_KAFKA_EVENT_LOG:
-		return "Log";
-	case RD_KAFKA_EVENT_ERROR:
-		return "Error";
-	case RD_KAFKA_EVENT_REBALANCE:
-		return "Rebalance";
-	case RD_KAFKA_EVENT_OFFSET_COMMIT:
-		return "OffsetCommit";
-	case RD_KAFKA_EVENT_STATS:
-		return "Stats";
-	default:
-		return "?unknown?";
-	}
-}
-
-
-
-
-void rd_kafka_event_destroy (rd_kafka_event_t *rkev) {
-	if (unlikely(!rkev))
-		return;
-	rd_kafka_op_destroy(rkev);
-}
-
-
-/**
- * @returns the next message from the event's message queue.
- * @remark messages will be freed automatically when event is destroyed,
- *         application MUST NOT call rd_kafka_message_destroy()
- */
-const rd_kafka_message_t *
-rd_kafka_event_message_next (rd_kafka_event_t *rkev) {
-	rd_kafka_op_t *rko = rkev;
-	rd_kafka_msg_t *rkm;
-	rd_kafka_msgq_t *rkmq, *rkmq2;
-	rd_kafka_message_t *rkmessage;
-
-	switch (rkev->rko_type)
-	{
-	case RD_KAFKA_OP_DR:
-		rkmq = &rko->rko_u.dr.msgq;
-		rkmq2 = &rko->rko_u.dr.msgq2;
-		break;
-
-	case RD_KAFKA_OP_FETCH:
-		/* Just one message */
-		if (rko->rko_u.fetch.evidx++ > 0)
-			return NULL;
-
-		rkmessage = rd_kafka_message_get(rko);
-		if (unlikely(!rkmessage))
-			return NULL;
-
-		/* Store offset */
-		rd_kafka_op_offset_store(NULL, rko, rkmessage);
-
-		return rkmessage;
-
-
-	default:
-		return NULL;
-	}
-
-	if (unlikely(!(rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
-		return NULL;
-
-	rd_kafka_msgq_deq(rkmq, rkm, 1);
-
-	/* Put rkm on secondary message queue which will be purged later. */
-	rd_kafka_msgq_enq(rkmq2, rkm);
-
-	return rd_kafka_message_get_from_rkm(rko, rkm);
-}
-
-
-size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
-				     const rd_kafka_message_t **rkmessages, size_t size) {
-	size_t cnt = 0;
-	const rd_kafka_message_t *rkmessage;
-
-	while ((rkmessage = rd_kafka_event_message_next(rkev)))
-		rkmessages[cnt++] = rkmessage;
-
-	return cnt;
-}
-
-
-size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev) {
-	switch (rkev->rko_evtype)
-	{
-	case RD_KAFKA_EVENT_DR:
-		return rd_atomic32_get(&rkev->rko_u.dr.msgq.rkmq_msg_cnt);
-	case RD_KAFKA_EVENT_FETCH:
-		return 1;
-	default:
-		return 0;
-	}
-}
-
-
-rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev) {
-	return rkev->rko_err;
-}
-
-const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev) {
-	switch (rkev->rko_type)
-	{
-	case RD_KAFKA_OP_ERR:
-	case RD_KAFKA_OP_CONSUMER_ERR:
-		if (rkev->rko_u.err.errstr)
-			return rkev->rko_u.err.errstr;
-		/* FALLTHRU */
-	default:
-		return rd_kafka_err2str(rkev->rko_err);
-	}
-}
-
-
-void *rd_kafka_event_opaque (rd_kafka_event_t *rkev) {
-	switch (rkev->rko_type & ~RD_KAFKA_OP_FLAGMASK)
-	{
-	case RD_KAFKA_OP_OFFSET_COMMIT:
-		return rkev->rko_u.offset_commit.opaque;
-	default:
-		return NULL;
-	}
-}
-
-
-int rd_kafka_event_log (rd_kafka_event_t *rkev, const char **fac,
-			const char **str, int *level) {
-	if (unlikely(rkev->rko_evtype != RD_KAFKA_EVENT_LOG))
-		return -1;
-
-	if (likely(fac != NULL))
-                *fac = rkev->rko_u.log.fac;
-	if (likely(str != NULL))
-		*str = rkev->rko_u.log.str;
-	if (likely(level != NULL))
-		*level = rkev->rko_u.log.level;
-
-	return 0;
-}
-
-const char *rd_kafka_event_stats (rd_kafka_event_t *rkev) {
-	return rkev->rko_u.stats.json;
-}
-
-rd_kafka_topic_partition_list_t *
-rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev) {
-	switch (rkev->rko_evtype)
-	{
-	case RD_KAFKA_EVENT_REBALANCE:
-		return rkev->rko_u.rebalance.partitions;
-	case RD_KAFKA_EVENT_OFFSET_COMMIT:
-		return rkev->rko_u.offset_commit.partitions;
-	default:
-		return NULL;
-	}
-}
-
-
-rd_kafka_topic_partition_t *
-rd_kafka_event_topic_partition (rd_kafka_event_t *rkev) {
-	rd_kafka_topic_partition_t *rktpar;
-
-	if (unlikely(!rkev->rko_rktp))
-		return NULL;
-
-	rktpar = rd_kafka_topic_partition_new_from_rktp(
-		rd_kafka_toppar_s2i(rkev->rko_rktp));
-
-	switch (rkev->rko_type)
-	{
-	case RD_KAFKA_OP_ERR:
-	case RD_KAFKA_OP_CONSUMER_ERR:
-		rktpar->offset = rkev->rko_u.err.offset;
-		break;
-	default:
-		break;
-	}
-
-	rktpar->err = rkev->rko_err;
-
-	return rktpar;
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h
deleted file mode 100644
index 0e8f8a1..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-/**
- * @brief Converts op type to event type.
- * @returns the event type, or 0 if the op cannot be mapped to an event.
- */
-static RD_UNUSED RD_INLINE
-rd_kafka_event_type_t rd_kafka_op2event (rd_kafka_op_type_t optype) {
-	static const rd_kafka_event_type_t map[RD_KAFKA_OP__END] = {
-		[RD_KAFKA_OP_DR] = RD_KAFKA_EVENT_DR,
-		[RD_KAFKA_OP_FETCH] = RD_KAFKA_EVENT_FETCH,
-		[RD_KAFKA_OP_ERR] = RD_KAFKA_EVENT_ERROR,
-		[RD_KAFKA_OP_CONSUMER_ERR] = RD_KAFKA_EVENT_ERROR,
-		[RD_KAFKA_OP_REBALANCE] = RD_KAFKA_EVENT_REBALANCE,
-		[RD_KAFKA_OP_OFFSET_COMMIT] = RD_KAFKA_EVENT_OFFSET_COMMIT,
-                [RD_KAFKA_OP_LOG] = RD_KAFKA_EVENT_LOG,
-		[RD_KAFKA_OP_STATS] = RD_KAFKA_EVENT_STATS
-	};
-
-	return map[(int)optype & ~RD_KAFKA_OP_FLAGMASK];
-}
-
-
-/**
- * @brief Attempt to set up an event based on rko.
- * @returns 1 if op is event:able and set up, else 0.
- */
-static RD_UNUSED RD_INLINE
-int rd_kafka_event_setup (rd_kafka_t *rk, rd_kafka_op_t *rko) {
-	rko->rko_evtype = rd_kafka_op2event(rko->rko_type);
-	switch (rko->rko_evtype)
-	{
-	case RD_KAFKA_EVENT_NONE:
-		return 0;
-
-	case RD_KAFKA_EVENT_DR:
-		rko->rko_rk = rk;
-		rd_dassert(!rko->rko_u.dr.do_purge2);
-		rd_kafka_msgq_init(&rko->rko_u.dr.msgq2);
-		rko->rko_u.dr.do_purge2 = 1;
-		return 1;
-
-	case RD_KAFKA_EVENT_REBALANCE:
-	case RD_KAFKA_EVENT_ERROR:
-        case RD_KAFKA_EVENT_LOG:
-        case RD_KAFKA_EVENT_OFFSET_COMMIT:
-        case RD_KAFKA_EVENT_STATS:
-		return 1;
-
-	default:
-		return 0;
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c
deleted file mode 100644
index 8a9ab24..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-#include "rdkafka_int.h"
-#include "rdkafka_feature.h"
-
-#include <stdlib.h>
-
-static const char *rd_kafka_feature_names[] = {
-	"MsgVer1",
-	"ApiVersion",
-	"BrokerBalancedConsumer",
-	"ThrottleTime",
-	"Sasl",
-	"SaslHandshake",
-	"BrokerGroupCoordinator",
-	"LZ4",
-        "OffsetTime",
-        "MsgVer2",
-	NULL
-};
-
-
-static const struct rd_kafka_feature_map {
-	/* RD_KAFKA_FEATURE_... */
-	int feature;
-
-	/* Depends on the following ApiVersions overlapping with
-	 * what the broker supports: */
-	struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM];
-
-} rd_kafka_feature_map[] = {
-	/**
-	 * @brief List of features and the ApiVersions they depend on.
-	 *
-	 * The dependency list consists of the ApiKey followed by this
-	 * client's supported minimum and maximum API versions.
-	 * As long as this list and its versions overlaps with the
-	 * broker supported API versions the feature will be enabled.
-	 */
-	{
-
-		/* @brief >=0.10.0: Message.MagicByte version 1:
-		 * Relative offsets (KIP-31) and message timestamps (KIP-32). */
-		.feature = RD_KAFKA_FEATURE_MSGVER1,
-		.depends = {
-			{ RD_KAFKAP_Produce, 2, 2 },
-			{ RD_KAFKAP_Fetch, 2, 2 },
-			{ -1 },
-		},
-	},
-        {
-                /* @brief >=0.11.0: Message.MagicByte version 2 */
-                .feature = RD_KAFKA_FEATURE_MSGVER2,
-                .depends = {
-                        { RD_KAFKAP_Produce, 3, 3 },
-                        { RD_KAFKAP_Fetch, 4, 4 },
-                        { -1 },
-                },
-        },
-	{
-		
-		/* @brief >=0.10.0: ApiVersionQuery support.
-		 * @remark This is a bit of chicken-and-egg problem but needs to be
-		 *         set by feature_check() to avoid the feature being cleared
-		 *         even when broker supports it. */
-		.feature = RD_KAFKA_FEATURE_APIVERSION,
-		.depends = {
-			{ RD_KAFKAP_ApiVersion, 0, 0 },
-			{ -1 },
-		},
-	},
-	{
-		/* @brief >=0.8.2.0: Broker-based Group coordinator */
-		.feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
-		.depends = {
-			{ RD_KAFKAP_GroupCoordinator, 0, 0 },
-			{ -1 },
-		},
-	},
-	{
-		/* @brief >=0.9.0: Broker-based balanced consumer groups. */
-		.feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
-		.depends = {
-			{ RD_KAFKAP_GroupCoordinator, 0, 0 },
-			{ RD_KAFKAP_OffsetCommit, 1, 2 },
-			{ RD_KAFKAP_OffsetFetch, 1, 1 },
-			{ RD_KAFKAP_JoinGroup, 0, 0 },
-			{ RD_KAFKAP_SyncGroup, 0, 0 },
-			{ RD_KAFKAP_Heartbeat, 0, 0 },
-			{ RD_KAFKAP_LeaveGroup, 0, 0 },
-			{ -1 },
-		},
-	},
-	{
-		/* @brief >=0.9.0: ThrottleTime */
-		.feature = RD_KAFKA_FEATURE_THROTTLETIME,
-		.depends = {
-			{ RD_KAFKAP_Produce, 1, 2 },
-			{ RD_KAFKAP_Fetch, 1, 2 },
-			{ -1 },
-		},
-
-        },
-        {
-                /* @brief >=0.9.0: SASL (GSSAPI) authentication.
-                 * Since SASL is not using the Kafka protocol
-                 * we must use something else to map us to the
-                 * proper broker version support:
-                 * JoinGroup was released along with SASL in 0.9.0. */
-                .feature = RD_KAFKA_FEATURE_SASL_GSSAPI,
-                .depends = {
-                        { RD_KAFKAP_JoinGroup, 0, 0 },
-                        { -1 },
-                },
-        },
-        {
-                /* @brief >=0.10.0: SASL mechanism handshake (KIP-43)
-                 *                  to automatically support other mechanisms
-                 *                  than GSSAPI, such as PLAIN. */
-                .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE,
-                .depends = {
-                        { RD_KAFKAP_SaslHandshake, 0, 0 },
-                        { -1 },
-                },
-        },
-        {
-                /* @brief >=0.8.2: LZ4 compression.
-                 * Since LZ4 initially did not rely on a specific API
-                 * type or version (it does in >=0.10.0)
-                 * we must use something else to map us to the
-                 * proper broker version support:
-                 * GrooupCoordinator was released in 0.8.2 */
-                .feature = RD_KAFKA_FEATURE_LZ4,
-                .depends = {
-                        { RD_KAFKAP_GroupCoordinator, 0, 0 },
-                        { -1 },
-                },
-        },
-        {
-                /* @brief >=0.10.1.0: Offset v1 (KIP-79)
-                 * Time-based offset requests */
-                .feature = RD_KAFKA_FEATURE_OFFSET_TIME,
-                .depends = {
-                        { RD_KAFKAP_Offset, 1, 1 },
-                        { -1 },
-                }
-        },
-        { .feature = 0 }, /* sentinel */
-};
-
-
-
-/**
- * @brief In absence of KIP-35 support in earlier broker versions we provide hardcoded
- *        lists that corresponds to older broker versions.
- */
-
-/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = {
-	{ RD_KAFKAP_ApiVersion, 0, 0 }
-};
-
-
-/* =~ 0.9.0 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = {
-	{ RD_KAFKAP_Produce, 0, 1 },
-	{ RD_KAFKAP_Fetch, 0, 1 },
-	{ RD_KAFKAP_Offset, 0, 0 },
-	{ RD_KAFKAP_Metadata, 0, 0 },
-	{ RD_KAFKAP_OffsetCommit, 0, 2 },
-	{ RD_KAFKAP_OffsetFetch, 0, 1 },
-	{ RD_KAFKAP_GroupCoordinator, 0, 0 },
-	{ RD_KAFKAP_JoinGroup, 0, 0 },
-	{ RD_KAFKAP_Heartbeat, 0, 0 },
-	{ RD_KAFKAP_LeaveGroup, 0, 0 },
-	{ RD_KAFKAP_SyncGroup, 0, 0 },
-	{ RD_KAFKAP_DescribeGroups, 0, 0 },
-	{ RD_KAFKAP_ListGroups, 0, 0 }
-};
-
-/* =~ 0.8.2 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = {
-	{ RD_KAFKAP_Produce, 0, 0 },
-	{ RD_KAFKAP_Fetch, 0, 0 },
-	{ RD_KAFKAP_Offset, 0, 0 },
-	{ RD_KAFKAP_Metadata, 0, 0 },
-	{ RD_KAFKAP_OffsetCommit, 0, 1 },
-	{ RD_KAFKAP_OffsetFetch, 0, 1 },
-	{ RD_KAFKAP_GroupCoordinator, 0, 0 }
-};
-
-/* =~ 0.8.1 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = {
-	{ RD_KAFKAP_Produce, 0, 0 },
-	{ RD_KAFKAP_Fetch, 0, 0 },
-	{ RD_KAFKAP_Offset, 0, 0 },
-	{ RD_KAFKAP_Metadata, 0, 0 },
-	{ RD_KAFKAP_OffsetCommit, 0, 1 },
-	{ RD_KAFKAP_OffsetFetch, 0, 0 }
-};
-
-/* =~ 0.8.0 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = {
-	{ RD_KAFKAP_Produce, 0, 0 },
-	{ RD_KAFKAP_Fetch, 0, 0 },
-	{ RD_KAFKAP_Offset, 0, 0 },
-	{ RD_KAFKAP_Metadata, 0, 0 }
-};
-
-
-/**
- * @brief Returns the ApiVersion list for legacy broker versions that do not
- *        support the ApiVersionQuery request. E.g., brokers <0.10.0.
- *
- * @param broker_version Broker version to match (longest prefix matching).
- * @param use_default If no match is found return the default APIs (but return 0).
- *
- * @returns 1 if \p broker_version was recognized: \p *apisp will point to
- *          the ApiVersion list and *api_cntp will be set to its element count.
- *          0 if \p broker_version was not recognized: \p *apisp remains unchanged.
- *
- */
-int rd_kafka_get_legacy_ApiVersions (const char *broker_version,
-				     struct rd_kafka_ApiVersion **apisp,
-				     size_t *api_cntp, const char *fallback) {
-	static const struct {
-		const char *pfx;
-		struct rd_kafka_ApiVersion *apis;
-		size_t api_cnt;
-	} vermap[] = {
-#define _VERMAP(PFX,APIS) { PFX, APIS, RD_ARRAYSIZE(APIS) }
-		_VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0),
-		_VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2),
-		_VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1),
-		_VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0),
-		{ "0.7.", NULL }, /* Unsupported */
-		{ "0.6.", NULL }, /* Unsupported */
-		_VERMAP("", rd_kafka_ApiVersion_Queryable),
-		{ NULL }
-	};
-	int i;
-	int fallback_i = -1;
-        int ret = 0;
-
-        *apisp = NULL;
-        *api_cntp = 0;
-
-	for (i = 0 ; vermap[i].pfx ; i++) {
-		if (!strncmp(vermap[i].pfx, broker_version, strlen(vermap[i].pfx))) {
-			if (!vermap[i].apis)
-				return 0;
-			*apisp = vermap[i].apis;
-			*api_cntp = vermap[i].api_cnt;
-                        ret = 1;
-                        break;
-		} else if (fallback && !strcmp(vermap[i].pfx, fallback))
-			fallback_i = i;
-	}
-
-	if (!*apisp && fallback) {
-		rd_kafka_assert(NULL, fallback_i != -1);
-		*apisp    = vermap[fallback_i].apis;
-		*api_cntp = vermap[fallback_i].api_cnt;
-	}
-
-        return ret;
-}
-
-
-/**
- * @returns 1 if the provided broker version (probably)
- *          supports api.version.request.
- */
-int rd_kafka_ApiVersion_is_queryable (const char *broker_version) {
-	struct rd_kafka_ApiVersion *apis;
-	size_t api_cnt;
-
-
-	if (!rd_kafka_get_legacy_ApiVersions(broker_version,
-					     &apis, &api_cnt, 0))
-		return 0;
-
-	return apis == rd_kafka_ApiVersion_Queryable;
-}
-
-
-
-
-	
-/**
- * @brief Check if match's versions overlaps with \p apis.
- *
- * @returns 1 if true, else 0.
- * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp()
- */
-static RD_INLINE int
-rd_kafka_ApiVersion_check (const struct rd_kafka_ApiVersion *apis, size_t api_cnt,
-			   const struct rd_kafka_ApiVersion *match) {
-	const struct rd_kafka_ApiVersion *api;
-
-	api = bsearch(match, apis, api_cnt, sizeof(*apis),
-		      rd_kafka_ApiVersion_key_cmp);
-	if (unlikely(!api))
-		return 0;
-
-	return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer;
-}
-
-
-/**
- * @brief Compare broker's supported API versions to our feature request map
- *        and enable/disable features accordingly.
- *
- * @param broker_apis Broker's supported APIs. If NULL the
- *        \p broker.version.fallback configuration property will specify a
- *        default legacy version to use.
- * @param broker_api_cnt Number of elements in \p broker_apis
- *
- * @returns the supported features (bitmask) to enable.
- */
-int rd_kafka_features_check (rd_kafka_broker_t *rkb,
-			     struct rd_kafka_ApiVersion *broker_apis,
-			     size_t broker_api_cnt) {
-	int features = 0;
-	int i;
-
-	/* Scan through features. */
-	for (i = 0 ; rd_kafka_feature_map[i].feature != 0 ; i++) {
-		const struct rd_kafka_ApiVersion *match;
-		int fails = 0;
-
-		/* For each feature check that all its API dependencies
-		 * can be fullfilled. */
-
-		for (match = &rd_kafka_feature_map[i].depends[0] ;
-		     match->ApiKey != -1 ; match++) {
-			int r;
-			
-			r = rd_kafka_ApiVersion_check(broker_apis, broker_api_cnt,
-						      match);
-
-			rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
-				   " Feature %s: %s (%hd..%hd) "
-				   "%ssupported by broker",
-				   rd_kafka_features2str(rd_kafka_feature_map[i].
-							feature),
-				   rd_kafka_ApiKey2str(match->ApiKey),
-				   match->MinVer, match->MaxVer,
-				   r ? "" : "NOT ");
-
-			fails += !r;
-		}
-
-		rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
-			   "%s feature %s",
-			   fails ? "Disabling" : "Enabling",
-			   rd_kafka_features2str(rd_kafka_feature_map[i].feature));
-
-
-		if (!fails)
-			features |= rd_kafka_feature_map[i].feature;
-	}
-
-	return features;
-}
-
-
-
-/**
- * @brief Make an allocated and sorted copy of \p src.
- */
-void
-rd_kafka_ApiVersions_copy (const struct rd_kafka_ApiVersion *src,
-                           size_t src_cnt,
-                           struct rd_kafka_ApiVersion **dstp,
-                           size_t *dst_cntp) {
-        *dstp = rd_memdup(src, sizeof(*src) * src_cnt);
-        *dst_cntp = src_cnt;
-        qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp);
-}
-
-
-
-
-
-
-/**
- * @returns a human-readable feature flag string.
- */
-const char *rd_kafka_features2str (int features) {
-	static RD_TLS char ret[4][128];
-	size_t of = 0;
-	static RD_TLS int reti = 0;
-	int i;
-
-	reti = (reti + 1) % 4;
-
-	*ret[reti] = '\0';
-	for (i = 0 ; rd_kafka_feature_names[i] ; i++) {
-		int r;
-		if (!(features & (1 << i)))
-			continue;
-
-		r = rd_snprintf(ret[reti]+of, sizeof(ret[reti])-of, "%s%s",
-				of == 0 ? "" : ",",
-				rd_kafka_feature_names[i]);
-		if ((size_t)r > sizeof(ret[reti])-of) {
-			/* Out of space */
-			memcpy(&ret[reti][sizeof(ret[reti])-3], "..", 3);
-			break;
-		}
-
-		of += r;
-	}
-
-	return ret[reti];
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h
deleted file mode 100644
index 1e40664..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-
-/**
- * @brief Kafka protocol features
- */
-
-/* Message version 1 (MagicByte=1):
- *  + relative offsets (KIP-31)
- *  + timestamps (KIP-32) */
-#define RD_KAFKA_FEATURE_MSGVER1    0x1
-
-/* ApiVersionQuery support (KIP-35) */
-#define RD_KAFKA_FEATURE_APIVERSION 0x2
-
- /* >= 0.9: Broker-based Balanced Consumer */
-#define RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER 0x4
-
-/* >= 0.9: Produce/Fetch ThrottleTime reporting */
-#define RD_KAFKA_FEATURE_THROTTLETIME 0x8
-
-/* >= 0.9: SASL GSSAPI support */
-#define RD_KAFKA_FEATURE_SASL_GSSAPI    0x10
-
-/* >= 0.10: SaslMechanismRequest (KIP-43) */
-#define RD_KAFKA_FEATURE_SASL_HANDSHAKE 0x20
-
-/* >= 0.8.2.0: Broker-based Group coordinator */
-#define RD_KAFKA_FEATURE_BROKER_GROUP_COORD 0x40
-
-/* >= 0.8.2.0: LZ4 compression (with bad and proper HC checksums) */
-#define RD_KAFKA_FEATURE_LZ4 0x80
-
-/* >= 0.10.1.0: Time-based Offset fetch (KIP-79) */
-#define RD_KAFKA_FEATURE_OFFSET_TIME 0x100
-
-/* >= 0.11.0.0: Message version 2 (MagicByte=2):
- *  + EOS message format KIP-98 */
-#define RD_KAFKA_FEATURE_MSGVER2     0x200
-
-
-int rd_kafka_get_legacy_ApiVersions (const char *broker_version,
-				     struct rd_kafka_ApiVersion **apisp,
-				     size_t *api_cntp, const char *fallback);
-int rd_kafka_ApiVersion_is_queryable (const char *broker_version);
-void rd_kafka_ApiVersions_copy (const struct rd_kafka_ApiVersion *src, size_t src_cnt,
-				struct rd_kafka_ApiVersion **dstp, size_t *dst_cntp);
-int rd_kafka_features_check (rd_kafka_broker_t *rkb,
-			     struct rd_kafka_ApiVersion *broker_apis,
-			     size_t broker_api_cnt);
-
-const char *rd_kafka_features2str (int features);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h
deleted file mode 100644
index 792c1a5..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-
-#ifndef _MSC_VER
-#define _GNU_SOURCE  /* for strndup() */
-#include <syslog.h>
-#else
-typedef int mode_t;
-#endif
-#include <fcntl.h>
-
-
-#include "rdsysqueue.h"
-
-#include "rdkafka.h"
-#include "rd.h"
-#include "rdlog.h"
-#include "rdtime.h"
-#include "rdaddr.h"
-#include "rdinterval.h"
-#include "rdavg.h"
-#include "rdlist.h"
-
-#if WITH_SSL
-#include <openssl/ssl.h>
-#endif
-
-
-
-
-typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
-typedef struct rd_ikafka_s rd_ikafka_t;
-
-
-#define rd_kafka_assert(rk, cond) do {                                  \
-                if (unlikely(!(cond)))                                  \
-                        rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
-                                       (rk), "assert: " # cond);        \
-        } while (0)
-
-
-void
-RD_NORETURN
-rd_kafka_crash (const char *file, int line, const char *function,
-                rd_kafka_t *rk, const char *reason);
-
-
-/* Forward declarations */
-struct rd_kafka_s;
-struct rd_kafka_itopic_s;
-struct rd_kafka_msg_s;
-struct rd_kafka_broker_s;
-
-typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
-typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
-
-
-
-#include "rdkafka_op.h"
-#include "rdkafka_queue.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_proto.h"
-#include "rdkafka_buf.h"
-#include "rdkafka_pattern.h"
-#include "rdkafka_conf.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_timer.h"
-#include "rdkafka_assignor.h"
-#include "rdkafka_metadata.h"
-
-
-/**
- * Protocol level sanity
- */
-#define RD_KAFKAP_BROKERS_MAX     1000
-#define RD_KAFKAP_TOPICS_MAX      1000000
-#define RD_KAFKAP_PARTITIONS_MAX  10000
-
-
-#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF)  ((OFF) < 0)
-
-
-
-
-
-
-
-/**
- * Kafka handle, internal representation of the application's rd_kafka_t.
- */
-
-typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
-
-struct rd_kafka_s {
-	rd_kafka_q_t *rk_rep;   /* kafka -> application reply queue */
-	rd_kafka_q_t *rk_ops;   /* any -> rdkafka main thread ops */
-
-	TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
-        rd_list_t                  rk_broker_by_id; /* Fast id lookups. */
-	rd_atomic32_t              rk_broker_cnt;
-	rd_atomic32_t              rk_broker_down_cnt;
-        mtx_t                      rk_internal_rkb_lock;
-	rd_kafka_broker_t         *rk_internal_rkb;
-
-	/* Broadcasting of broker state changes to wake up
-	 * functions waiting for a state change. */
-	cnd_t                      rk_broker_state_change_cnd;
-	mtx_t                      rk_broker_state_change_lock;
-	int                        rk_broker_state_change_version;
-
-
-	TAILQ_HEAD(, rd_kafka_itopic_s)  rk_topics;
-	int              rk_topic_cnt;
-
-        struct rd_kafka_cgrp_s *rk_cgrp;
-
-        rd_kafka_conf_t  rk_conf;
-        rd_kafka_q_t    *rk_logq;          /* Log queue if `log.queue` set */
-        char             rk_name[128];
-	rd_kafkap_str_t *rk_client_id;
-        rd_kafkap_str_t *rk_group_id;    /* Consumer group id */
-
-	int              rk_flags;
-	rd_atomic32_t    rk_terminate;
-	rwlock_t         rk_lock;
-	rd_kafka_type_t  rk_type;
-	struct timeval   rk_tv_state_change;
-
-	rd_atomic32_t    rk_last_throttle;  /* Last throttle_time_ms value
-					     * from broker. */
-
-        /* Locks: rd_kafka_*lock() */
-        rd_ts_t          rk_ts_metadata;    /* Timestamp of most recent
-                                             * metadata. */
-
-	struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
-	rd_ts_t          rk_ts_full_metadata;       /* Timesstamp of .. */
-        struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
-
-        char            *rk_clusterid;      /* ClusterId from metadata */
-
-        /* Simple consumer count:
-         *  >0: Running in legacy / Simple Consumer mode,
-         *   0: No consumers running
-         *  <0: Running in High level consumer mode */
-        rd_atomic32_t    rk_simple_cnt;
-
-        /**
-         * Exactly Once Semantics
-         */
-        struct {
-                rd_kafkap_str_t *TransactionalId;
-                int64_t          PID;
-                int16_t          ProducerEpoch;
-        } rk_eos;
-
-	const rd_kafkap_bytes_t *rk_null_bytes;
-
-	struct {
-		mtx_t lock;       /* Protects acces to this struct */
-		cnd_t cnd;        /* For waking up blocking injectors */
-		unsigned int cnt; /* Current message count */
-		size_t size;      /* Current message size sum */
-	        unsigned int max_cnt; /* Max limit */
-		size_t max_size; /* Max limit */
-	} rk_curr_msgs;
-
-        rd_kafka_timers_t rk_timers;
-	thrd_t rk_thread;
-
-        int rk_initialized;
-};
-
-#define rd_kafka_wrlock(rk)    rwlock_wrlock(&(rk)->rk_lock)
-#define rd_kafka_rdlock(rk)    rwlock_rdlock(&(rk)->rk_lock)
-#define rd_kafka_rdunlock(rk)    rwlock_rdunlock(&(rk)->rk_lock)
-#define rd_kafka_wrunlock(rk)    rwlock_wrunlock(&(rk)->rk_lock)
-
-/**
- * @brief Add \p cnt messages and of total size \p size bytes to the
- *        internal bookkeeping of current message counts.
- *        If the total message count or size after add would exceed the
- *        configured limits \c queue.buffering.max.messages and
- *        \c queue.buffering.max.kbytes then depending on the value of
- *        \p block the function either blocks until enough space is available
- *        if \p block is 1, else immediately returns
- *        RD_KAFKA_RESP_ERR__QUEUE_FULL.
- */
-static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
-rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
-			int block) {
-
-	if (rk->rk_type != RD_KAFKA_PRODUCER)
-		return RD_KAFKA_RESP_ERR_NO_ERROR;
-
-	mtx_lock(&rk->rk_curr_msgs.lock);
-	while (unlikely(rk->rk_curr_msgs.cnt + cnt >
-			rk->rk_curr_msgs.max_cnt ||
-			(unsigned long long)(rk->rk_curr_msgs.size + size) >
-			(unsigned long long)rk->rk_curr_msgs.max_size)) {
-		if (!block) {
-			mtx_unlock(&rk->rk_curr_msgs.lock);
-			return RD_KAFKA_RESP_ERR__QUEUE_FULL;
-		}
-
-		cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
-	}
-
-	rk->rk_curr_msgs.cnt  += cnt;
-	rk->rk_curr_msgs.size += size;
-	mtx_unlock(&rk->rk_curr_msgs.lock);
-
-	return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Subtract \p cnt messages of total size \p size from the
- *        current bookkeeping and broadcast a wakeup on the condvar
- *        for any waiting & blocking threads.
- */
-static RD_INLINE RD_UNUSED void
-rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
-        int broadcast = 0;
-
-	if (rk->rk_type != RD_KAFKA_PRODUCER)
-		return;
-
-	mtx_lock(&rk->rk_curr_msgs.lock);
-	rd_kafka_assert(NULL,
-			rk->rk_curr_msgs.cnt >= cnt &&
-			rk->rk_curr_msgs.size >= size);
-
-        /* If the subtraction would pass one of the thresholds
-         * broadcast a wake-up to any waiting listeners. */
-        if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
-             rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
-            (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
-             rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
-                broadcast = 1;
-
-	rk->rk_curr_msgs.cnt  -= cnt;
-	rk->rk_curr_msgs.size -= size;
-
-        if (unlikely(broadcast))
-                cnd_broadcast(&rk->rk_curr_msgs.cnd);
-
-	mtx_unlock(&rk->rk_curr_msgs.lock);
-}
-
-static RD_INLINE RD_UNUSED void
-rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
-	if (rk->rk_type != RD_KAFKA_PRODUCER) {
-		*cntp = 0;
-		*sizep = 0;
-		return;
-	}
-
-	mtx_lock(&rk->rk_curr_msgs.lock);
-	*cntp = rk->rk_curr_msgs.cnt;
-	*sizep = rk->rk_curr_msgs.size;
-	mtx_unlock(&rk->rk_curr_msgs.lock);
-}
-
-static RD_INLINE RD_UNUSED int
-rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
-	int cnt;
-	if (rk->rk_type != RD_KAFKA_PRODUCER)
-		return 0;
-
-	mtx_lock(&rk->rk_curr_msgs.lock);
-	cnt = rk->rk_curr_msgs.cnt;
-	mtx_unlock(&rk->rk_curr_msgs.lock);
-
-	return cnt;
-}
-
-
-void rd_kafka_destroy_final (rd_kafka_t *rk);
-
-
-/**
- * Returns true if 'rk' handle is terminating.
- */
-#define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate))
-
-#define rd_kafka_is_simple_consumer(rk) \
-        (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
-int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
-
-
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-/**
- * Debug contexts
- */
-#define RD_KAFKA_DBG_GENERIC        0x1
-#define RD_KAFKA_DBG_BROKER         0x2
-#define RD_KAFKA_DBG_TOPIC          0x4
-#define RD_KAFKA_DBG_METADATA       0x8
-#define RD_KAFKA_DBG_FEATURE        0x10
-#define RD_KAFKA_DBG_QUEUE          0x20
-#define RD_KAFKA_DBG_MSG            0x40
-#define RD_KAFKA_DBG_PROTOCOL       0x80
-#define RD_KAFKA_DBG_CGRP           0x100
-#define RD_KAFKA_DBG_SECURITY       0x200
-#define RD_KAFKA_DBG_FETCH          0x400
-#define RD_KAFKA_DBG_INTERCEPTOR    0x800
-#define RD_KAFKA_DBG_PLUGIN         0x1000
-#define RD_KAFKA_DBG_ALL            0xffff
-
-
-void rd_kafka_log0(const rd_kafka_conf_t *conf,
-                   const rd_kafka_t *rk, const char *extra, int level,
-                   const char *fac, const char *fmt, ...) RD_FORMAT(printf,
-                                                                    6, 7);
-
-#define rd_kafka_log(rk,level,fac,...) \
-        rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
-#define rd_kafka_dbg(rk,ctx,fac,...) do {                               \
-                if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
-                        rd_kafka_log0(&rk->rk_conf,rk,NULL,             \
-                                      LOG_DEBUG,fac,__VA_ARGS__);       \
-        } while (0)
-
-/* dbg() not requiring an rk, just the conf object, for early logging */
-#define rd_kafka_dbg0(conf,ctx,fac,...) do {                            \
-                if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx)))   \
-                        rd_kafka_log0(conf,NULL,NULL,                   \
-                                      LOG_DEBUG,fac,__VA_ARGS__);       \
-        } while (0)
-
-/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
- *       when logging another broker's name in the message. */
-#define rd_rkb_log(rkb,level,fac,...) do {				\
-		char _logname[RD_KAFKA_NODENAME_SIZE];			\
-                mtx_lock(&(rkb)->rkb_logname_lock);                     \
-		strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \
-		_logname[RD_KAFKA_NODENAME_SIZE-1] = '\0';		\
-                mtx_unlock(&(rkb)->rkb_logname_lock);                   \
-		rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
-                              (rkb)->rkb_rk, _logname,                  \
-                              level, fac, __VA_ARGS__);                 \
-        } while (0)
-
-#define rd_rkb_dbg(rkb,ctx,fac,...) do {				\
-		if (unlikely((rkb)->rkb_rk->rk_conf.debug &		\
-			     (RD_KAFKA_DBG_ ## ctx))) {			\
-			rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__);	\
-                }                                                       \
-	} while (0)
-
-
-
-extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
-
-static RD_UNUSED RD_INLINE
-rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
-					     int errnox) {
-        if (errnox) {
-#ifdef _MSC_VER
-                /* This is the correct way to set errno on Windows,
-                 * but it is still pointless due to different errnos in
-                 * in different runtimes:
-                 * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
-                 * errno is thus highly deprecated, and buggy, on Windows
-                 * when using librdkafka as a dynamically loaded DLL. */
-                _set_errno(errnox);
-#else
-                errno = errnox;
-#endif
-        }
-	rd_kafka_last_error_code = err;
-	return err;
-}
-
-
-extern rd_atomic32_t rd_kafka_thread_cnt_curr;
-
-extern char RD_TLS rd_kafka_thread_name[64];
-
-
-
-
-
-int rd_kafka_path_is_dir (const char *path);
-
-rd_kafka_op_res_t
-rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
-                  rd_kafka_q_cb_type_t cb_type, void *opaque);
-
-rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c
deleted file mode 100644
index 91f7761..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c
+++ /dev/null
@@ -1,624 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2017 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_interceptor.h"
-#include "rdstring.h"
-
-/**
- * @brief Interceptor methodtion/method reference
- */
-typedef struct rd_kafka_interceptor_method_s {
-        union {
-                rd_kafka_interceptor_f_on_conf_set_t *on_conf_set;
-                rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup;
-                rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy;
-                rd_kafka_interceptor_f_on_new_t     *on_new;
-                rd_kafka_interceptor_f_on_destroy_t *on_destroy;
-                rd_kafka_interceptor_f_on_send_t    *on_send;
-                rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement;
-                rd_kafka_interceptor_f_on_consume_t *on_consume;
-                rd_kafka_interceptor_f_on_commit_t  *on_commit;
-                void *generic; /* For easy assignment */
-
-        } u;
-        char *ic_name;
-        void *ic_opaque;
-} rd_kafka_interceptor_method_t;
-
-/**
- * @brief Destroy interceptor methodtion reference
- */
-static void
-rd_kafka_interceptor_method_destroy (void *ptr) {
-        rd_kafka_interceptor_method_t *method = ptr;
-        rd_free(method->ic_name);
-        rd_free(method);
-}
-
-
-
-
-
-/**
- * @brief Handle an interceptor on_... methodtion call failures.
- */
-static RD_INLINE void
-rd_kafka_interceptor_failed (rd_kafka_t *rk,
-                             const rd_kafka_interceptor_method_t *method,
-                             const char *method_name, rd_kafka_resp_err_t err,
-                             const rd_kafka_message_t *rkmessage,
-                             const char *errstr) {
-
-        /* FIXME: Suppress log messages, eventually */
-        if (rkmessage)
-                rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
-                             "Interceptor %s failed %s for "
-                             "message on %s [%"PRId32"] @ %"PRId64
-                             ": %s%s%s",
-                             method->ic_name, method_name,
-                             rd_kafka_topic_a2i(rkmessage->rkt)->rkt_topic->str,
-                             rkmessage->partition,
-                             rkmessage->offset,
-                             rd_kafka_err2str(err),
-                             errstr ? ": " : "",
-                             errstr ? errstr : "");
-        else
-                rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
-                             "Interceptor %s failed %s: %s%s%s",
-                             method->ic_name, method_name,
-                             rd_kafka_err2str(err),
-                             errstr ? ": " : "",
-                             errstr ? errstr : "");
-
-}
-
-
-
-/**
- * @brief Create interceptor method reference.
- *        Duplicates are rejected
- */
-static rd_kafka_interceptor_method_t *
-rd_kafka_interceptor_method_new (const char *ic_name,
-                                 void *func, void *ic_opaque) {
-        rd_kafka_interceptor_method_t *method;
-
-        method             = rd_calloc(1, sizeof(*method));
-        method->ic_name    = rd_strdup(ic_name);
-        method->ic_opaque  = ic_opaque;
-        method->u.generic  = func;
-
-        return method;
-}
-
-
-/**
- * @brief Method comparator to be used for finding, not sorting.
- */
-static int rd_kafka_interceptor_method_cmp (const void *_a, const void *_b) {
-        const rd_kafka_interceptor_method_t *a = _a, *b = _b;
-
-        if (a->u.generic != b->u.generic)
-                return -1;
-
-        return strcmp(a->ic_name, b->ic_name);
-}
-
-/**
- * @brief Add interceptor method reference
- */
-static rd_kafka_resp_err_t
-rd_kafka_interceptor_method_add (rd_list_t *list, const char *ic_name,
-                                 void *func, void *ic_opaque) {
-        rd_kafka_interceptor_method_t *method;
-        const rd_kafka_interceptor_method_t skel = {
-                .ic_name = (char *)ic_name,
-                .u = { .generic = func }
-        };
-
-        /* Reject same method from same interceptor.
-         * This is needed to avoid duplicate interceptors when configuration
-         * objects are duplicated.
-         * An exception is made for lists with _F_UNIQUE, which is currently
-         * only on_conf_destroy() to allow interceptor cleanup. */
-        if ((list->rl_flags & RD_LIST_F_UNIQUE) &&
-            rd_list_find(list, &skel, rd_kafka_interceptor_method_cmp))
-                return RD_KAFKA_RESP_ERR__CONFLICT;
-
-        method = rd_kafka_interceptor_method_new(ic_name, func, ic_opaque);
-        rd_list_add(list, method);
-
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief Destroy all interceptors
- * @locality application thread calling rd_kafka_conf_destroy() or 
- *           rd_kafka_destroy()
- */
-void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf) {
-        rd_list_destroy(&conf->interceptors.on_conf_set);
-        rd_list_destroy(&conf->interceptors.on_conf_dup);
-        rd_list_destroy(&conf->interceptors.on_conf_destroy);
-        rd_list_destroy(&conf->interceptors.on_new);
-        rd_list_destroy(&conf->interceptors.on_destroy);
-        rd_list_destroy(&conf->interceptors.on_send);
-        rd_list_destroy(&conf->interceptors.on_acknowledgement);
-        rd_list_destroy(&conf->interceptors.on_consume);
-        rd_list_destroy(&conf->interceptors.on_commit);
-
-        /* Interceptor config */
-        rd_list_destroy(&conf->interceptors.config);
-}
-
-
-/**
- * @brief Initialize interceptor sub-system for config object.
- * @locality application thread
- */
-static void
-rd_kafka_interceptors_init (rd_kafka_conf_t *conf) {
-        rd_list_init(&conf->interceptors.on_conf_set, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-        rd_list_init(&conf->interceptors.on_conf_dup, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-        /* conf_destroy() allows duplicates entries. */
-        rd_list_init(&conf->interceptors.on_conf_destroy, 0,
-                     rd_kafka_interceptor_method_destroy);
-        rd_list_init(&conf->interceptors.on_new, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-        rd_list_init(&conf->interceptors.on_destroy, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-        rd_list_init(&conf->interceptors.on_send, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-        rd_list_init(&conf->interceptors.on_acknowledgement, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-        rd_list_init(&conf->interceptors.on_consume, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-        rd_list_init(&conf->interceptors.on_commit, 0,
-                     rd_kafka_interceptor_method_destroy)
-                ->rl_flags |= RD_LIST_F_UNIQUE;
-
-        /* Interceptor config */
-        rd_list_init(&conf->interceptors.config, 0,
-                     (void (*)(void *))rd_strtup_destroy);
-}
-
-
-
-
-/**
- * @name Configuration backend
- */
-
-
-/**
- * @brief Constructor called when configuration object is created.
- */
-void rd_kafka_conf_interceptor_ctor (int scope, void *pconf) {
-        rd_kafka_conf_t *conf = pconf;
-        assert(scope == _RK_GLOBAL);
-        rd_kafka_interceptors_init(conf);
-}
-
-/**
- * @brief Destructor called when configuration object is destroyed.
- */
-void rd_kafka_conf_interceptor_dtor (int scope, void *pconf) {
-        rd_kafka_conf_t *conf = pconf;
-        assert(scope == _RK_GLOBAL);
-        rd_kafka_interceptors_destroy(conf);
-}
-
-/**
- * @brief Copy-constructor called when configuration object \p psrcp is
- *        duplicated to \p dstp.
- * @remark Interceptors are NOT copied, but interceptor config is.
- *
- */
-void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc,
-                                     void *dstptr, const void *srcptr,
-                                     size_t filter_cnt, const char **filter) {
-        rd_kafka_conf_t *dconf = pdst;
-        const rd_kafka_conf_t *sconf = psrc;
-        int i;
-        const rd_strtup_t *confval;
-
-        assert(scope == _RK_GLOBAL);
-
-        /* Apply interceptor configuration values.
-         * on_conf_dup() has already been called for dconf so
-         * on_conf_set() interceptors are already in place and we can
-         * apply the configuration through the standard conf_set() API. */
-        RD_LIST_FOREACH(confval, &sconf->interceptors.config, i) {
-                size_t fi;
-                size_t nlen = strlen(confval->name);
-
-                /* Apply filter */
-                for (fi = 0 ; fi < filter_cnt ; fi++) {
-                        size_t flen = strlen(filter[fi]);
-                        if (nlen >= flen && !strncmp(filter[fi], confval->name,
-                                                     flen))
-                                break;
-                }
-
-                if (fi < filter_cnt)
-                        continue; /* Filter matched: ignore property. */
-
-                /* Ignore errors for now */
-                rd_kafka_conf_set(dconf, confval->name, confval->value,
-                                  NULL, 0);
-        }
-}
-
-
-
-
-/**
- * @brief Call interceptor on_conf_set methods.
- * @locality application thread calling rd_kafka_conf_set() and
- *           rd_kafka_conf_dup()
- */
-rd_kafka_conf_res_t
-rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf,
-                                   const char *name, const char *val,
-                                   char *errstr, size_t errstr_size) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method, &conf->interceptors.on_conf_set, i) {
-                rd_kafka_conf_res_t res;
-
-                res = method->u.on_conf_set(conf,
-                                            name, val, errstr, errstr_size,
-                                            method->ic_opaque);
-                if (res == RD_KAFKA_CONF_UNKNOWN)
-                        continue;
-
-                /* Add successfully handled properties to list of
-                 * interceptor config properties so conf_t objects
-                 * can be copied. */
-                if (res == RD_KAFKA_CONF_OK)
-                        rd_list_add(&conf->interceptors.config,
-                                    rd_strtup_new(name, val));
-                return res;
-        }
-
-        return RD_KAFKA_CONF_UNKNOWN;
-}
-
-/**
- * @brief Call interceptor on_conf_dup methods.
- * @locality application thread calling rd_kafka_conf_dup()
- */
-void
-rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf,
-                                   const rd_kafka_conf_t *old_conf,
-                                   size_t filter_cnt, const char **filter) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method, &old_conf->interceptors.on_conf_dup, i) {
-                /* FIXME: Ignore error for now */
-                method->u.on_conf_dup(new_conf, old_conf,
-                                      filter_cnt, filter, method->ic_opaque);
-        }
-}
-
-
-/**
- * @brief Call interceptor on_conf_destroy methods.
- * @locality application thread calling rd_kafka_conf_destroy(), rd_kafka_new(),
- *           rd_kafka_destroy()
- */
-void
-rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method, &conf->interceptors.on_conf_destroy, i) {
-                /* FIXME: Ignore error for now */
-                method->u.on_conf_destroy(method->ic_opaque);
-        }
-}
-
-
-/**
- * @brief Call interceptor on_new methods.
- * @locality application thread calling rd_kafka_new()
- */
-void
-rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-        char errstr[512];
-
-        RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_new, i) {
-                rd_kafka_resp_err_t err;
-
-                err = method->u.on_new(rk, conf, method->ic_opaque,
-                                       errstr, sizeof(errstr));
-                if (unlikely(err))
-                        rd_kafka_interceptor_failed(rk, method, "on_new", err,
-                                                    NULL, errstr);
-        }
-}
-
-
-
-/**
- * @brief Call interceptor on_destroy methods.
- * @locality application thread calling rd_kafka_new() or rd_kafka_destroy()
- */
-void
-rd_kafka_interceptors_on_destroy (rd_kafka_t *rk) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_destroy, i) {
-                rd_kafka_resp_err_t err;
-
-                err = method->u.on_destroy(rk, method->ic_opaque);
-                if (unlikely(err))
-                        rd_kafka_interceptor_failed(rk, method, "on_destroy",
-                                                    err, NULL, NULL);
-        }
-}
-
-
-
-/**
- * @brief Call interceptor on_send methods.
- * @locality application thread calling produce()
- */
-void
-rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_send, i) {
-                rd_kafka_resp_err_t err;
-
-                err = method->u.on_send(rk, rkmessage, method->ic_opaque);
-                if (unlikely(err))
-                        rd_kafka_interceptor_failed(rk, method, "on_send", err,
-                                                    rkmessage, NULL);
-        }
-}
-
-
-
-/**
- * @brief Call interceptor on_acknowledgement methods.
- * @locality application thread calling poll(), or the broker thread if
- *           if dr callback has been set.
- */
-void
-rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
-                                          rd_kafka_message_t *rkmessage) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method,
-                        &rk->rk_conf.interceptors.on_acknowledgement, i) {
-                rd_kafka_resp_err_t err;
-
-                err = method->u.on_acknowledgement(rk, rkmessage,
-                                                   method->ic_opaque);
-                if (unlikely(err))
-                        rd_kafka_interceptor_failed(rk, method,
-                                                    "on_acknowledgement", err,
-                                                    rkmessage, NULL);
-        }
-}
-
-
-/**
- * @brief Call on_acknowledgement methods for all messages in queue.
- * @locality broker thread
- */
-void
-rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
-                                                rd_kafka_msgq_t *rkmq) {
-        rd_kafka_msg_t *rkm;
-
-        RD_KAFKA_MSGQ_FOREACH(rkm, rkmq) {
-                rd_kafka_interceptors_on_acknowledgement(rk,
-                                                         &rkm->rkm_rkmessage);
-        }
-}
-
-
-/**
- * @brief Call interceptor on_consume methods.
- * @locality application thread calling poll(), consume() or similar prior to
- *           passing the message to the application.
- */
-void
-rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
-                                  rd_kafka_message_t *rkmessage) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_consume, i) {
-                rd_kafka_resp_err_t err;
-
-                err = method->u.on_consume(rk, rkmessage,
-                                                   method->ic_opaque);
-                if (unlikely(err))
-                        rd_kafka_interceptor_failed(rk, method,
-                                                    "on_consume", err,
-                                                    rkmessage, NULL);
-        }
-}
-
-
-/**
- * @brief Call interceptor on_commit methods.
- * @locality application thread calling poll(), consume() or similar,
- *           or rdkafka main thread if no commit_cb or handler registered.
- */
-void
-rd_kafka_interceptors_on_commit (rd_kafka_t *rk,
-                                 const rd_kafka_topic_partition_list_t *offsets,
-                                 rd_kafka_resp_err_t err) {
-        rd_kafka_interceptor_method_t *method;
-        int i;
-
-        RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_commit, i) {
-                rd_kafka_resp_err_t ic_err;
-
-                ic_err = method->u.on_commit(rk, offsets, err,
-                                             method->ic_opaque);
-                if (unlikely(ic_err))
-                        rd_kafka_interceptor_failed(rk, method,
-                                                    "on_commit", ic_err, NULL,
-                                                    NULL);
-        }
-}
-
-
-
-
-/**
- * @name Public API (backend)
- * @{
- */
-
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_conf_set (
-        rd_kafka_conf_t *conf, const char *ic_name,
-        rd_kafka_interceptor_f_on_conf_set_t *on_conf_set,
-        void *ic_opaque) {
-        return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_set,
-                                               ic_name, (void *)on_conf_set,
-                                               ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_conf_dup (
-        rd_kafka_conf_t *conf, const char *ic_name,
-        rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup,
-        void *ic_opaque) {
-        return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_dup,
-                                               ic_name, (void *)on_conf_dup,
-                                               ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_conf_destroy (
-        rd_kafka_conf_t *conf, const char *ic_name,
-        rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy,
-        void *ic_opaque) {
-        return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_destroy,
-                                               ic_name, (void *)on_conf_destroy,
-                                               ic_opaque);
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_new (
-        rd_kafka_conf_t *conf, const char *ic_name,
-        rd_kafka_interceptor_f_on_new_t *on_new,
-        void *ic_opaque) {
-        return rd_kafka_interceptor_method_add(&conf->interceptors.on_new,
-                                               ic_name, (void *)on_new,
-                                               ic_opaque);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_destroy (
-        rd_kafka_t *rk, const char *ic_name,
-        rd_kafka_interceptor_f_on_destroy_t *on_destroy,
-        void *ic_opaque) {
-        assert(!rk->rk_initialized);
-        return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_destroy,
-                                               ic_name, (void *)on_destroy,
-                                               ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_send (
-        rd_kafka_t *rk, const char *ic_name,
-        rd_kafka_interceptor_f_on_send_t *on_send,
-        void *ic_opaque) {
-        assert(!rk->rk_initialized);
-        return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_send,
-                                               ic_name, (void *)on_send,
-                                               ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_acknowledgement (
-        rd_kafka_t *rk, const char *ic_name,
-        rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
-        void *ic_opaque) {
-        assert(!rk->rk_initialized);
-        return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
-                                               on_acknowledgement,
-                                               ic_name,
-                                               (void *)on_acknowledgement,
-                                               ic_opaque);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_consume (
-        rd_kafka_t *rk, const char *ic_name,
-        rd_kafka_interceptor_f_on_consume_t *on_consume,
-        void *ic_opaque) {
-        assert(!rk->rk_initialized);
-        return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
-                                               on_consume,
-                                               ic_name, (void *)on_consume,
-                                               ic_opaque);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_commit (
-        rd_kafka_t *rk, const char *ic_name,
-        rd_kafka_interceptor_f_on_commit_t *on_commit,
-        void *ic_opaque) {
-        assert(!rk->rk_initialized);
-        return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
-                                               on_commit,
-                                               ic_name, (void *)on_commit,
-                                               ic_opaque);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h
deleted file mode 100644
index 6be4e86..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2017 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef _RDKAFKA_INTERCEPTOR_H
-#define _RDKAFKA_INTERCEPTOR_H
-
-rd_kafka_conf_res_t
-rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf,
-                                   const char *name, const char *val,
-                                   char *errstr, size_t errstr_size);
-void
-rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf,
-                                   const rd_kafka_conf_t *old_conf,
-                                   size_t filter_cnt, const char **filter);
-void
-rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) ;
-void
-rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf);
-void
-rd_kafka_interceptors_on_destroy (rd_kafka_t *rk);
-void
-rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage);
-void
-rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
-                                          rd_kafka_message_t *rkmessage);
-void
-rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
-                                                rd_kafka_msgq_t *rkmq);
-
-void rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
-                                       rd_kafka_message_t *rkmessage);
-void
-rd_kafka_interceptors_on_commit (rd_kafka_t *rk,
-                                 const rd_kafka_topic_partition_list_t *offsets,
-                                 rd_kafka_resp_err_t err);
-
-
-void rd_kafka_conf_interceptor_ctor (int scope, void *pconf);
-void rd_kafka_conf_interceptor_dtor (int scope, void *pconf);
-void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc,
-                                     void *dstptr, const void *srcptr,
-                                     size_t filter_cnt, const char **filter);
-
-void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf);
-
-#endif /* _RDKAFKA_INTERCEPTOR_H */