You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:59 UTC

[31/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
deleted file mode 100644
index a30f2bd..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdkafka_feature.h"
-
-
-extern const char *rd_kafka_broker_state_names[];
-extern const char *rd_kafka_secproto_names[];
-
-struct rd_kafka_broker_s { /* rd_kafka_broker_t */
-	TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;
-
-	int32_t             rkb_nodeid;
-#define RD_KAFKA_NODEID_UA -1
-
-	rd_sockaddr_list_t *rkb_rsal;
-	time_t              rkb_t_rsal_last;
-        const rd_sockaddr_inx_t  *rkb_addr_last; /* Last used connect address */
-
-	rd_kafka_transport_t *rkb_transport;
-
-	uint32_t            rkb_corrid;
-	int                 rkb_connid;    /* Connection id, increased by
-					    * one for each connection by
-					    * this broker. Used as a safe-guard
-					    * to help troubleshooting buffer
-					    * problems across disconnects. */
-
-	rd_kafka_q_t       *rkb_ops;
-
-        mtx_t               rkb_lock;
-
-        int                 rkb_blocking_max_ms; /* Maximum IO poll blocking
-                                                  * time. */
-
-        /* Toppars handled by this broker */
-	TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
-	int                 rkb_toppar_cnt;
-
-        /* Underflowed toppars that are eligible for fetching. */
-        CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_fetch_toppars;
-        int                 rkb_fetch_toppar_cnt;
-        rd_kafka_toppar_t  *rkb_fetch_toppar_next;  /* Next 'first' toppar
-                                                     * in fetch list.
-                                                     * This is used for
-                                                     * round-robin. */
-
-
-        rd_kafka_cgrp_t    *rkb_cgrp;
-
-	rd_ts_t             rkb_ts_fetch_backoff;
-	int                 rkb_fetching;
-
-	enum {
-		RD_KAFKA_BROKER_STATE_INIT,
-		RD_KAFKA_BROKER_STATE_DOWN,
-		RD_KAFKA_BROKER_STATE_CONNECT,
-		RD_KAFKA_BROKER_STATE_AUTH,
-
-		/* Any state >= STATE_UP means the Kafka protocol layer
-		 * is operational (to some degree). */
-		RD_KAFKA_BROKER_STATE_UP,
-                RD_KAFKA_BROKER_STATE_UPDATE,
-		RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
-		RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
-	} rkb_state;
-
-        rd_ts_t             rkb_ts_state;        /* Timestamp of last
-                                                  * state change */
-        rd_interval_t       rkb_timeout_scan_intvl;  /* Waitresp timeout scan
-                                                      * interval. */
-
-        rd_atomic32_t       rkb_blocking_request_cnt; /* The number of
-                                                       * in-flight blocking
-                                                       * requests.
-                                                       * A blocking request is
-                                                       * one that is known to
-                                                       * possibly block on the
-                                                       * broker for longer than
-                                                       * the typical processing
-                                                       * time, e.g.:
-                                                       * JoinGroup, SyncGroup */
-
-	int                 rkb_features;    /* Protocol features supported
-					      * by this broker.
-					      * See RD_KAFKA_FEATURE_* in
-					      * rdkafka_proto.h */
-
-        struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
-                                                      * (MUST be sorted) */
-	size_t                      rkb_ApiVersions_cnt;
-	rd_interval_t               rkb_ApiVersion_fail_intvl; /* Controls how long
-								* the fallback proto
-								* will be used after
-								* ApiVersionRequest
-								* failure. */
-
-	rd_kafka_confsource_t  rkb_source;
-	struct {
-		rd_atomic64_t tx_bytes;
-		rd_atomic64_t tx;    /* Kafka-messages (not payload msgs) */
-		rd_atomic64_t tx_err;
-		rd_atomic64_t tx_retries;
-		rd_atomic64_t req_timeouts;  /* Accumulated value */
-
-		rd_atomic64_t rx_bytes;
-		rd_atomic64_t rx;    /* Kafka messages (not payload msgs) */
-		rd_atomic64_t rx_err;
-		rd_atomic64_t rx_corrid_err; /* CorrId misses */
-		rd_atomic64_t rx_partial;    /* Partial messages received
-                                              * and dropped. */
-                rd_atomic64_t zbuf_grow;     /* Compression/decompression buffer grows needed */
-                rd_atomic64_t buf_grow;      /* rkbuf grows needed */
-                rd_atomic64_t wakeups;       /* Poll wakeups */
-	} rkb_c;
-
-        int                 rkb_req_timeouts;  /* Current value */
-
-	rd_ts_t             rkb_ts_metadata_poll; /* Next metadata poll time */
-	int                 rkb_metadata_fast_poll_cnt; /* Perform fast
-							 * metadata polls. */
-	thrd_t              rkb_thread;
-
-	rd_refcnt_t         rkb_refcnt;
-
-        rd_kafka_t         *rkb_rk;
-
-	rd_kafka_buf_t     *rkb_recv_buf;
-
-	int                 rkb_max_inflight;   /* Maximum number of in-flight
-						 * requests to broker.
-						 * Compared to rkb_waitresps length.*/
-	rd_kafka_bufq_t     rkb_outbufs;
-	rd_kafka_bufq_t     rkb_waitresps;
-	rd_kafka_bufq_t     rkb_retrybufs;
-
-	rd_avg_t            rkb_avg_int_latency;/* Current internal latency period*/
-	rd_avg_t            rkb_avg_rtt;        /* Current RTT period */
-	rd_avg_t            rkb_avg_throttle;   /* Current throttle period */
-
-        /* These are all protected by rkb_lock */
-	char                rkb_name[RD_KAFKA_NODENAME_SIZE];  /* Displ name */
-	char                rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
-        uint16_t            rkb_port;                          /* TCP port */
-        char               *rkb_origname;                      /* Original
-                                                                * host name */
-
-
-        /* Logging name is a copy of rkb_name, protected by its own mutex */
-        char               *rkb_logname;
-        mtx_t               rkb_logname_lock;
-
-        int                 rkb_wakeup_fd[2];     /* Wake-up fds (r/w) to wake
-                                                   * up from IO-wait when
-                                                   * queues have content. */
-        int                 rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd,
-                                                   * this is rkb_wakeup_fd[1]
-                                                   * if enabled. */
-        rd_interval_t       rkb_connect_intvl;    /* Reconnect throttling */
-
-	rd_kafka_secproto_t rkb_proto;
-
-	int                 rkb_down_reported;    /* Down event reported */
-#if WITH_SASL_CYRUS
-	rd_kafka_timer_t    rkb_sasl_kinit_refresh_tmr;
-#endif
-
-
-	struct {
-		char msg[512];
-		int  err;  /* errno */
-	} rkb_err;
-};
-
-#define rd_kafka_broker_keep(rkb)   rd_refcnt_add(&(rkb)->rkb_refcnt)
-#define rd_kafka_broker_lock(rkb)   mtx_lock(&(rkb)->rkb_lock)
-#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)
-
-
-/**
- * @brief Broker comparator
- */
-static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a,
-                                                    const void *_b) {
-        const rd_kafka_broker_t *a = _a, *b = _b;
-        return (int)(a - b);
-}
-
-
-/**
- * @returns true if broker supports \p features, else false.
- */
-static RD_UNUSED
-int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) {
-	int r;
-	rd_kafka_broker_lock(rkb);
-	r = (rkb->rkb_features & features) == features;
-	rd_kafka_broker_unlock(rkb);
-	return r;
-}
-
-int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
-                                              int16_t ApiKey,
-                                              int16_t minver, int16_t maxver,
-                                              int *featuresp);
-
-int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb);
-
-rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk,
-						   int32_t nodeid);
-rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk,
-                                                    int32_t nodeid,
-                                                    int state);
-#define rd_kafka_broker_find_by_nodeid(rk,nodeid) \
-        rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1)
-
-/**
- * Filter out brokers that are currently in a blocking request.
- */
-static RD_INLINE RD_UNUSED int
-rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) {
-        return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0;
-}
-
-/**
- * Filter out brokers that cant do GroupCoordinator requests right now.
- */
-static RD_INLINE RD_UNUSED int
-rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) {
-        return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 ||
-		!(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD);
-}
-
-rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
-                                        int (*filter) (rd_kafka_broker_t *rkb,
-                                                       void *opaque),
-                                        void *opaque);
-
-rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms,
-                                               int do_lock);
-
-rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, int state);
-
-int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
-void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
-
-void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
-			   int level, rd_kafka_resp_err_t err,
-			   const char *fmt, ...);
-
-void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);
-
-#define rd_kafka_broker_destroy(rkb)                                    \
-        rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt,                    \
-                                 rd_kafka_broker_destroy_final(rkb))
-
-
-void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
-                             const struct rd_kafka_metadata_broker *mdb);
-rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
-					rd_kafka_confsource_t source,
-					rd_kafka_secproto_t proto,
-					const char *name, uint16_t port,
-					int32_t nodeid);
-
-void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb);
-void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr);
-
-int rd_kafka_send (rd_kafka_broker_t *rkb);
-int rd_kafka_recv (rd_kafka_broker_t *rkb);
-
-void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
-		       rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err);
-
-void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
-                               rd_kafka_buf_t *rkbuf,
-                               rd_kafka_resp_cb_t *resp_cb,
-                               void *opaque);
-
-void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
-                                     rd_kafka_buf_t *rkbuf,
-                                     rd_kafka_replyq_t replyq,
-                                     rd_kafka_resp_cb_t *resp_cb,
-                                     void *opaque);
-
-void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
-
-
-rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk);
-
-void msghdr_print (rd_kafka_t *rk,
-		   const char *what, const struct msghdr *msg,
-		   int hexdump);
-
-const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb);
-void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb);
-
-int rd_kafka_brokers_get_state_version (rd_kafka_t *rk);
-int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
-					int timeout_ms);
-void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
deleted file mode 100644
index 9b50737..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_buf.h"
-#include "rdkafka_broker.h"
-
-void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf) {
-
-        switch (rkbuf->rkbuf_reqhdr.ApiKey)
-        {
-        case RD_KAFKAP_Metadata:
-                if (rkbuf->rkbuf_u.Metadata.topics)
-                        rd_list_destroy(rkbuf->rkbuf_u.Metadata.topics);
-                if (rkbuf->rkbuf_u.Metadata.reason)
-                        rd_free(rkbuf->rkbuf_u.Metadata.reason);
-                if (rkbuf->rkbuf_u.Metadata.rko)
-                        rd_kafka_op_reply(rkbuf->rkbuf_u.Metadata.rko,
-                                          RD_KAFKA_RESP_ERR__DESTROY);
-                if (rkbuf->rkbuf_u.Metadata.decr) {
-                        /* Decrease metadata cache's full_.._sent state. */
-                        mtx_lock(rkbuf->rkbuf_u.Metadata.decr_lock);
-                        rd_kafka_assert(NULL,
-                                        (*rkbuf->rkbuf_u.Metadata.decr) > 0);
-                        (*rkbuf->rkbuf_u.Metadata.decr)--;
-                        mtx_unlock(rkbuf->rkbuf_u.Metadata.decr_lock);
-                }
-                break;
-        }
-
-        if (rkbuf->rkbuf_response)
-                rd_kafka_buf_destroy(rkbuf->rkbuf_response);
-
-        rd_kafka_replyq_destroy(&rkbuf->rkbuf_replyq);
-        rd_kafka_replyq_destroy(&rkbuf->rkbuf_orig_replyq);
-
-        rd_buf_destroy(&rkbuf->rkbuf_buf);
-
-        if (rkbuf->rkbuf_rktp_vers)
-                rd_list_destroy(rkbuf->rkbuf_rktp_vers);
-
-        if (rkbuf->rkbuf_rkb)
-                rd_kafka_broker_destroy(rkbuf->rkbuf_rkb);
-
-        rd_refcnt_destroy(&rkbuf->rkbuf_refcnt);
-
-	rd_free(rkbuf);
-}
-
-
-
-/**
- * @brief Pushes \p buf of size \p len as a new segment on the buffer.
- *
- * \p buf will NOT be freed by the buffer.
- */
-void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
-                         int allow_crc_calc, void (*free_cb) (void *)) {
-        rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
-
-        if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
-                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
-}
-
-
-
-/**
- * @brief Create a new buffer with \p segcmt initial segments and \p size bytes
- *        of initial backing memory.
- *        The underlying buffer will grow as needed.
- *
- * If \p rk is non-NULL (typical case):
- * Additional space for the Kafka protocol headers is inserted automatically.
- */
-rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags) {
-        rd_kafka_buf_t *rkbuf;
-
-        rkbuf = rd_calloc(1, sizeof(*rkbuf));
-
-        rkbuf->rkbuf_flags = flags;
-
-        rd_buf_init(&rkbuf->rkbuf_buf, segcnt, size);
-        rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
-        rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
-
-        return rkbuf;
-}
-
-
-/**
- * @brief Create new request buffer with the request-header written (will
- *        need to be updated with Length, etc, later)
- */
-rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
-                                          int segcnt, size_t size) {
-        rd_kafka_buf_t *rkbuf;
-
-        /* Make room for common protocol request headers */
-        size += RD_KAFKAP_REQHDR_SIZE +
-                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);
-        segcnt += 1; /* headers */
-
-        rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
-
-        rkbuf->rkbuf_rkb = rkb;
-        rd_kafka_broker_keep(rkb);
-
-        rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
-
-        /* Write request header, will be updated later. */
-        /* Length: updated later */
-        rd_kafka_buf_write_i32(rkbuf, 0);
-        /* ApiKey */
-        rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
-        /* ApiVersion: updated later */
-        rd_kafka_buf_write_i16(rkbuf, 0);
-        /* CorrId: updated later */
-        rd_kafka_buf_write_i32(rkbuf, 0);
-
-        /* ClientId */
-        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
-
-        return rkbuf;
-}
-
-
-
-
-/**
- * @brief Create new read-only rkbuf shadowing a memory region.
- *
- * @remark \p free_cb (possibly NULL) will be used to free \p ptr when
- *         buffer refcount reaches 0.
- * @remark the buffer may only be read from, not written to.
- */
-rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
-                                         void (*free_cb) (void *)) {
-	rd_kafka_buf_t *rkbuf;
-
-	rkbuf = rd_calloc(1, sizeof(*rkbuf));
-
-        rkbuf->rkbuf_reqhdr.ApiKey = RD_KAFKAP_None;
-
-        rd_buf_init(&rkbuf->rkbuf_buf, 1, 0);
-        rd_buf_push(&rkbuf->rkbuf_buf, ptr, size, free_cb);
-
-        rkbuf->rkbuf_totlen  = size;
-
-        /* Initialize reader slice */
-        rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
-
-	rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
-
-        rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
-
-	return rkbuf;
-}
-
-
-
-void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
-	TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
-	(void)rd_atomic32_add(&rkbufq->rkbq_cnt, 1);
-	(void)rd_atomic32_add(&rkbufq->rkbq_msg_cnt,
-                            rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
-}
-
-void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
-	TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
-	rd_kafka_assert(NULL, rd_atomic32_get(&rkbufq->rkbq_cnt) > 0);
-	(void)rd_atomic32_sub(&rkbufq->rkbq_cnt, 1);
-	(void)rd_atomic32_sub(&rkbufq->rkbq_msg_cnt,
-                          rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
-}
-
-void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
-	TAILQ_INIT(&rkbufq->rkbq_bufs);
-	rd_atomic32_init(&rkbufq->rkbq_cnt, 0);
-	rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
-}
-
-/**
- * Concat all buffers from 'src' to tail of 'dst'
- */
-void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
-	TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
-	(void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
-	(void)rd_atomic32_add(&dst->rkbq_msg_cnt, rd_atomic32_get(&src->rkbq_msg_cnt));
-	rd_kafka_bufq_init(src);
-}
-
-/**
- * Purge the wait-response queue.
- * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
- *       or rkb_outbufs since buffers may be re-enqueued on those queues.
- *       'rkbufq' needs to be bufq_init():ed before reuse after this call.
- */
-void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
-                          rd_kafka_bufq_t *rkbufq,
-                          rd_kafka_resp_err_t err) {
-	rd_kafka_buf_t *rkbuf, *tmp;
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
-		   rd_atomic32_get(&rkbufq->rkbq_cnt));
-
-	TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
-                rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
-        }
-}
-
-
-/**
- * @brief Update bufq for connection reset:
- *
- * - Purge connection-setup API requests from the queue.
- * - Reset any partially sent buffer's offset. (issue #756)
- *
- * Request types purged:
- *   ApiVersion
- *   SaslHandshake
- */
-void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
-				     rd_kafka_bufq_t *rkbufq) {
-	rd_kafka_buf_t *rkbuf, *tmp;
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	rd_rkb_dbg(rkb, QUEUE, "BUFQ",
-		   "Updating %d buffers on connection reset",
-		   rd_atomic32_get(&rkbufq->rkbq_cnt));
-
-	TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
-		switch (rkbuf->rkbuf_reqhdr.ApiKey)
-		{
-		case RD_KAFKAP_ApiVersion:
-		case RD_KAFKAP_SaslHandshake:
-			rd_kafka_bufq_deq(rkbufq, rkbuf);
-			rd_kafka_buf_callback(rkb->rkb_rk, rkb,
-					      RD_KAFKA_RESP_ERR__DESTROY,
-					      NULL, rkbuf);
-			break;
-                default:
-                        /* Reset buffer send position */
-                        rd_slice_seek(&rkbuf->rkbuf_reader, 0);
-                        break;
-		}
-        }
-}
-
-
-void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
-			 rd_kafka_bufq_t *rkbq) {
-	rd_kafka_buf_t *rkbuf;
-	int cnt = rd_kafka_bufq_cnt(rkbq);
-	rd_ts_t now;
-
-	if (!cnt)
-		return;
-
-	now = rd_clock();
-
-	rd_rkb_dbg(rkb, BROKER, fac, "bufq with %d buffer(s):", cnt);
-
-	TAILQ_FOREACH(rkbuf, &rkbq->rkbq_bufs, rkbuf_link) {
-		rd_rkb_dbg(rkb, BROKER, fac,
-			   " Buffer %s (%"PRIusz" bytes, corrid %"PRId32", "
-			   "connid %d, retry %d in %lldms, timeout in %lldms",
-			   rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
-			   rkbuf->rkbuf_totlen, rkbuf->rkbuf_corrid,
-			   rkbuf->rkbuf_connid, rkbuf->rkbuf_retries,
-			   rkbuf->rkbuf_ts_retry ?
-			   (now - rkbuf->rkbuf_ts_retry) / 1000LL : 0,
-			   rkbuf->rkbuf_ts_timeout ?
-			   (now - rkbuf->rkbuf_ts_timeout) / 1000LL : 0);
-	}
-}
-
-
-
-
-/**
- * Retry failed request, depending on the error.
- * @remark \p rkb may be NULL
- * Returns 1 if the request was scheduled for retry, else 0.
- */
-int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
-
-        if (unlikely(!rkb ||
-		     rkb->rkb_source == RD_KAFKA_INTERNAL ||
-		     rd_kafka_terminating(rkb->rkb_rk) ||
-		     rkbuf->rkbuf_retries + 1 >
-		     rkb->rkb_rk->rk_conf.max_retries))
-                return 0;
-
-	/* Try again */
-	rkbuf->rkbuf_ts_sent = 0;
-	rkbuf->rkbuf_retries++;
-	rd_kafka_buf_keep(rkbuf);
-	rd_kafka_broker_buf_retry(rkb, rkbuf);
-	return 1;
-}
-
-
-/**
- * @brief Handle RD_KAFKA_OP_RECV_BUF.
- */
-void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
-        rd_kafka_buf_t *request, *response;
-
-        request = rko->rko_u.xbuf.rkbuf;
-        rko->rko_u.xbuf.rkbuf = NULL;
-
-        /* NULL on op_destroy() */
-	if (request->rkbuf_replyq.q) {
-		int32_t version = request->rkbuf_replyq.version;
-                /* Current queue usage is done, but retain original replyq for
-                 * future retries, stealing
-                 * the current reference. */
-                request->rkbuf_orig_replyq = request->rkbuf_replyq;
-                rd_kafka_replyq_clear(&request->rkbuf_replyq);
-		/* Callback might need to version check so we retain the
-		 * version across the clear() call which clears it. */
-		request->rkbuf_replyq.version = version;
-	}
-
-	if (!request->rkbuf_cb) {
-		rd_kafka_buf_destroy(request);
-		return;
-	}
-
-        /* Let buf_callback() do destroy()s */
-        response = request->rkbuf_response; /* May be NULL */
-        request->rkbuf_response = NULL;
-
-        rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
-			      request->rkbuf_rkb, err,
-                              response, request);
-}
-
-
-
-/**
- * Call request.rkbuf_cb(), but:
- *  - if the rkbuf has a rkbuf_replyq the buffer is enqueued on that queue
- *    with op type RD_KAFKA_OP_RECV_BUF.
- *  - else call rkbuf_cb().
- *
- * \p response may be NULL.
- *
- * Will decrease refcount for both response and request, eventually.
- */
-void rd_kafka_buf_callback (rd_kafka_t *rk,
-			    rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
-                            rd_kafka_buf_t *response, rd_kafka_buf_t *request){
-
-        /* Decide if the request should be retried.
-         * This is always done in the originating broker thread. */
-        if (unlikely(err && err != RD_KAFKA_RESP_ERR__DESTROY &&
-		     rd_kafka_buf_retry(rkb, request))) {
-		/* refcount for retry was increased in buf_retry() so we can
-		 * let go of this caller's refcounts. */
-		rd_kafka_buf_destroy(request);
-		if (response)
-			rd_kafka_buf_destroy(response);
-                return;
-	}
-
-        if (err != RD_KAFKA_RESP_ERR__DESTROY && request->rkbuf_replyq.q) {
-                rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
-
-		rd_kafka_assert(NULL, !request->rkbuf_response);
-		request->rkbuf_response = response;
-
-                /* Increment refcnt since rko_rkbuf will be decref:ed
-                 * if replyq_enq() fails and we dont want the rkbuf gone in that
-                 * case. */
-                rd_kafka_buf_keep(request);
-                rko->rko_u.xbuf.rkbuf = request;
-
-                rko->rko_err = err;
-
-                /* Copy original replyq for future retries, with its own
-                 * queue reference. */
-                rd_kafka_replyq_copy(&request->rkbuf_orig_replyq,
-                                     &request->rkbuf_replyq);
-
-	        rd_kafka_replyq_enq(&request->rkbuf_replyq, rko, 0);
-
-		rd_kafka_buf_destroy(request); /* from keep above */
-		return;
-        }
-
-        if (request->rkbuf_cb)
-                request->rkbuf_cb(rk, rkb, err, response, request,
-                                  request->rkbuf_opaque);
-
-        rd_kafka_buf_destroy(request);
-	if (response)
-		rd_kafka_buf_destroy(response);
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
deleted file mode 100644
index 5aa2876..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
+++ /dev/null
@@ -1,819 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-#include "rdkafka_int.h"
-#include "rdcrc32.h"
-#include "rdlist.h"
-#include "rdbuf.h"
-
-
-typedef struct rd_kafka_broker_s rd_kafka_broker_t;
-
-#define RD_KAFKA_HEADERS_IOV_CNT   2
-
-
-/**
- * Temporary buffer with memory aligned writes to accommodate
- * effective and platform safe struct writes.
- */
-typedef struct rd_tmpabuf_s {
-	size_t size;
-	size_t of;
-	char  *buf;
-	int    failed;
-	int    assert_on_fail;
-} rd_tmpabuf_t;
-
-/**
- * @brief Allocate new tmpabuf with \p size bytes pre-allocated.
- */
-static RD_UNUSED void
-rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
-	tab->buf = rd_malloc(size);
-	tab->size = size;
-	tab->of = 0;
-	tab->failed = 0;
-	tab->assert_on_fail = assert_on_fail;
-}
-
-/**
- * @brief Free memory allocated by tmpabuf
- */
-static RD_UNUSED void
-rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
-	rd_free(tab->buf);
-}
-
-/**
- * @returns 1 if a previous operation failed.
- */
-static RD_UNUSED RD_INLINE int
-rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
-	return tab->failed;
-}
-
-/**
- * @brief Allocate \p size bytes for writing, returning an aligned pointer
- *        to the memory.
- * @returns the allocated pointer (within the tmpabuf) on success or
- *          NULL if the requested number of bytes + alignment is not available
- *          in the tmpabuf.
- */
-static RD_UNUSED void *
-rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
-	void *ptr;
-
-	if (unlikely(tab->failed))
-		return NULL;
-
-	if (unlikely(tab->of + size > tab->size)) {
-		if (tab->assert_on_fail) {
-			fprintf(stderr,
-				"%s: %s:%d: requested size %zd + %zd > %zd\n",
-				__FUNCTION__, func, line, tab->of, size,
-				tab->size);
-			assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
-		}
-		return NULL;
-	}
-
-        ptr = (void *)(tab->buf + tab->of);
-	tab->of += RD_ROUNDUP(size, 8);
-
-	return ptr;
-}
-
-#define rd_tmpabuf_alloc(tab,size) \
-	rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
-
-/**
- * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
- *
- * @returns the allocated and written-to pointer (within the tmpabuf) on success
- *          or NULL if the requested number of bytes + alignment is not available
- *          in the tmpabuf.
- */
-static RD_UNUSED void *
-rd_tmpabuf_write0 (const char *func, int line,
-		   rd_tmpabuf_t *tab, const void *buf, size_t size) {
-	void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
-
-	if (ptr)
-		memcpy(ptr, buf, size);
-
-	return ptr;
-}
-#define rd_tmpabuf_write(tab,buf,size) \
-	rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
-
-
-/**
- * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
- */
-static RD_UNUSED char *
-rd_tmpabuf_write_str0 (const char *func, int line,
-		       rd_tmpabuf_t *tab, const char *str) {
-	return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
-}
-#define rd_tmpabuf_write_str(tab,str) \
-	rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
-
-
-
-/**
- * @name Read buffer interface
- *
- * Memory reading helper macros to be used when parsing network responses.
- *
- * Assumptions:
- *   - an 'err_parse:' goto-label must be available for error bailouts,
- *                     the error code will be set in rkbuf->rkbuf_err
- *   - local `int log_decode_errors` variable set to the logging level
- *     to log parse errors (or 0 to turn off logging).
- */
-
-#define rd_kafka_buf_parse_fail(rkbuf,...) do {				\
-                if (log_decode_errors > 0) {                            \
-			rd_kafka_assert(NULL, rkbuf->rkbuf_rkb);	\
-                        rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
-                                   "PROTOERR",                          \
-                                   "Protocol parse failure "            \
-                                   "at %"PRIusz"/%"PRIusz" (%s:%i) "    \
-                                   "(incorrect broker.version.fallback?)", \
-                                   rd_slice_offset(&rkbuf->rkbuf_reader), \
-                                   rd_slice_size(&rkbuf->rkbuf_reader), \
-                                   __FUNCTION__, __LINE__);             \
-                        rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
-				   "PROTOERR", __VA_ARGS__);		\
-                }                                                       \
-                (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG;        \
-                goto err_parse;                                         \
-	} while (0)
-
-
-
-/**
- * Returns the number of remaining bytes available to read.
- */
-#define rd_kafka_buf_read_remain(rkbuf) \
-        rd_slice_remains(&(rkbuf)->rkbuf_reader)
-
-/**
- * Checks that at least 'len' bytes remain to be read in buffer, else fails.
- */
-#define rd_kafka_buf_check_len(rkbuf,len) do {                          \
-                size_t __len0 = (size_t)(len);                          \
-                if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
-                        rd_kafka_buf_parse_fail(                        \
-                                rkbuf,                                  \
-                                "expected %"PRIusz" bytes > %"PRIusz    \
-                                " remaining bytes",                     \
-                                __len0, rd_kafka_buf_read_remain(rkbuf)); \
-                        (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
-                        goto err_parse;                                 \
-                }                                                       \
-        } while (0)
-
-/**
- * Skip (as in read and ignore) the next 'len' bytes.
- */
-#define rd_kafka_buf_skip(rkbuf, len) do {                              \
-                size_t __len1 = (size_t)(len);                          \
-                if (__len1 &&                                           \
-                    !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
-                        rd_kafka_buf_check_len(rkbuf, __len1);           \
-        } while (0)
-
-/**
- * Skip (as in read and ignore) up to fixed position \p pos.
- */
-#define rd_kafka_buf_skip_to(rkbuf, pos) do {                           \
-                size_t __len1 = (size_t)(pos) -                         \
-                        rd_slice_offset(&(rkbuf)->rkbuf_reader);        \
-                if (__len1 &&                                           \
-                    !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
-                        rd_kafka_buf_check_len(rkbuf, __len1);           \
-        } while (0)
-
-
-
-/**
- * Read 'len' bytes and copy to 'dstptr'
- */
-#define rd_kafka_buf_read(rkbuf,dstptr,len) do {                        \
-                size_t __len2 = (size_t)(len);                          \
-                if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2))  \
-                        rd_kafka_buf_check_len(rkbuf, __len2);          \
-        } while (0)
-
-
-/**
- * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
- *        without affecting the current reader position.
- */
-#define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do {                 \
-                size_t __len2 = (size_t)(len);                          \
-                if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset,      \
-                                   dstptr, __len2))                     \
-                        rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
-        } while (0)
-
-
-/**
- * Read a 16,32,64-bit integer and store it in 'dstptr'
- */
-#define rd_kafka_buf_read_i64(rkbuf,dstptr) do {                        \
-                int64_t _v;                                             \
-                rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
-                *(dstptr) = be64toh(_v);                                \
-        } while (0)
-
-#define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do {                     \
-                int64_t _v;                                             \
-                rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v));          \
-                *(dstptr) = be64toh(_v);                                \
-        } while (0)
-
-#define rd_kafka_buf_read_i32(rkbuf,dstptr) do {                        \
-                int32_t _v;                                             \
-                rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
-                *(dstptr) = be32toh(_v);                                \
-        } while (0)
-
-/* Same as .._read_i32 but does a direct assignment.
- * dst is assumed to be a scalar, not pointer. */
-#define rd_kafka_buf_read_i32a(rkbuf, dst) do {				\
-                int32_t _v;                                             \
-		rd_kafka_buf_read(rkbuf, &_v, 4);			\
-		dst = (int32_t) be32toh(_v);				\
-	} while (0)
-
-#define rd_kafka_buf_read_i16(rkbuf,dstptr) do {                        \
-                int16_t _v;                                             \
-                rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
-                *(dstptr) = be16toh(_v);                                \
-        } while (0)
-
-
-#define rd_kafka_buf_read_i16a(rkbuf, dst) do {				\
-                int16_t _v;                                             \
-		rd_kafka_buf_read(rkbuf, &_v, 2);			\
-                dst = (int16_t)be16toh(_v);				\
-	} while (0)
-
-#define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
-
-#define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
-
-
-/**
- * @brief Read varint and store in int64_t \p dst
- */
-#define rd_kafka_buf_read_varint(rkbuf,dst) do {                        \
-                int64_t _v;                                             \
-                size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \
-                if (unlikely(RD_UVARINT_UNDERFLOW(_r)))                 \
-                        rd_kafka_buf_parse_fail(rkbuf,                  \
-                                                "varint parsing failed: " \
-                                                "buffer underflow");    \
-                *(dst) = _v;                                            \
-        } while (0)
-
-/* Read Kafka String representation (2+N).
- * The kstr data will be updated to point to the rkbuf. */
-#define rd_kafka_buf_read_str(rkbuf, kstr) do {                         \
-                int _klen;                                              \
-                rd_kafka_buf_read_i16a(rkbuf, (kstr)->len);             \
-                _klen = RD_KAFKAP_STR_LEN(kstr);                        \
-                if (RD_KAFKAP_STR_LEN0(_klen) == 0)                     \
-                        (kstr)->str = NULL;                             \
-                else if (!((kstr)->str =                                \
-                           rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
-                                                     _klen)))           \
-                        rd_kafka_buf_check_len(rkbuf, _klen);           \
-        } while (0)
-
-/* Read Kafka String representation (2+N) and write it to the \p tmpabuf
- * with a trailing nul byte. */
-#define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do {		\
-                rd_kafkap_str_t _kstr;					\
-		size_t _slen;						\
-		char *_dst;						\
-		rd_kafka_buf_read_str(rkbuf, &_kstr);			\
-		_slen = RD_KAFKAP_STR_LEN(&_kstr);			\
-		if (!(_dst =						\
-		      rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1)))	\
-			rd_kafka_buf_parse_fail(			\
-				rkbuf,					\
-				"Not enough room in tmpabuf: "		\
-				"%"PRIusz"+%"PRIusz			\
-				" > %"PRIusz,				\
-				(tmpabuf)->of, _slen+1, (tmpabuf)->size); \
-		_dst[_slen] = '\0';					\
-		dst = (void *)_dst;					\
-	} while (0)
-
-/**
- * Skip a string.
- */
-#define rd_kafka_buf_skip_str(rkbuf) do {			\
-		int16_t _slen;					\
-		rd_kafka_buf_read_i16(rkbuf, &_slen);		\
-		rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen));	\
-	} while (0)
-
-/* Read Kafka Bytes representation (4+N).
- *  The 'kbytes' will be updated to point to rkbuf data */
-#define rd_kafka_buf_read_bytes(rkbuf, kbytes) do {                     \
-                int _klen;                                              \
-                rd_kafka_buf_read_i32a(rkbuf, _klen);                   \
-                (kbytes)->len = _klen;                                  \
-                if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) {                  \
-                        (kbytes)->data = NULL;                          \
-                        (kbytes)->len = 0;                              \
-                } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0)            \
-                        (kbytes)->data = "";                            \
-                else if (!((kbytes)->data =                             \
-                           rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
-                                                  _klen)))              \
-                        rd_kafka_buf_check_len(rkbuf, _klen);           \
-        } while (0)
-
-
-/**
- * @brief Read \p size bytes from buffer, setting \p *ptr to the start
- *        of the memory region.
- */
-#define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do {                      \
-                size_t _klen = size;                                    \
-                if (!(*(ptr) = (void *)                                 \
-                      rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
-                        rd_kafka_buf_check_len(rkbuf, _klen);           \
-        } while (0)
-
-
-/**
- * @brief Read varint-lengted Kafka Bytes representation
- */
-#define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do {               \
-                int64_t _len2;                                          \
-                size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \
-                                                &_len2);                \
-                if (unlikely(RD_UVARINT_UNDERFLOW(_r)))                 \
-                        rd_kafka_buf_parse_fail(rkbuf,                  \
-                                                "varint parsing failed: " \
-                                                "buffer underflow");    \
-                (kbytes)->len = (int32_t)_len2;                         \
-                if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) {                  \
-                        (kbytes)->data = NULL;                          \
-                        (kbytes)->len = 0;                              \
-                } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0)            \
-                        (kbytes)->data = "";                            \
-                else if (!((kbytes)->data =                             \
-                           rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
-                                                  _len2)))              \
-                        rd_kafka_buf_check_len(rkbuf, _len2);           \
-        } while (0)
-
-
-/**
- * Response handling callback.
- *
- * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
- *       which indicates that some entity is terminating (rd_kafka_t, broker,
- *       toppar, queue, etc) and the callback may not be called in the
- *       correct thread. In this case the callback must perform just
- *       the most minimal cleanup and dont trigger any other operations.
- *
- * NOTE: rkb, reply and request may be NULL, depending on error situation.
- */
-typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
-				   rd_kafka_broker_t *rkb,
-                                   rd_kafka_resp_err_t err,
-                                   rd_kafka_buf_t *reply,
-                                   rd_kafka_buf_t *request,
-                                   void *opaque);
-
-struct rd_kafka_buf_s { /* rd_kafka_buf_t */
-	TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
-
-	int32_t rkbuf_corrid;
-
-	rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */
-
-	int     rkbuf_flags; /* RD_KAFKA_OP_F */
-
-        rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */
-        rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */
-
-	int     rkbuf_connid;      /* broker connection id (used when buffer
-				    * was partially sent). */
-        size_t  rkbuf_totlen;      /* recv: total expected length,
-                                    * send: not used */
-
-	rd_crc32_t rkbuf_crc;      /* Current CRC calculation */
-
-	struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
-                                                 * These fields are encoded
-                                                 * and written to output buffer
-                                                 * on buffer finalization. */
-	struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
-                                                 * Decoded fields are copied
-                                                 * here from the buffer
-                                                 * to provide an ease-of-use
-                                                 * interface to the header */
-
-	int32_t rkbuf_expected_size;  /* expected size of message */
-
-        rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
-        rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
-                                                 * for retries from inside
-                                                 * the rkbuf_cb() callback
-                                                 * since rkbuf_replyq will
-                                                 * have been reset. */
-        rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
-        struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */
-
-        struct rd_kafka_broker_s *rkbuf_rkb;
-
-	rd_refcnt_t rkbuf_refcnt;
-	void   *rkbuf_opaque;
-
-	int     rkbuf_retries;            /* Retries so far. */
-#define RD_KAFKA_BUF_NO_RETRIES  1000000  /* Do not retry */
-
-        int     rkbuf_features;   /* Required feature(s) that must be
-                                   * supported by broker. */
-
-	rd_ts_t rkbuf_ts_enq;
-	rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
-				   * after response: RTT. */
-	rd_ts_t rkbuf_ts_timeout;
-
-        int64_t rkbuf_offset;     /* Used by OffsetCommit */
-
-	rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
-					* Used by FetchRequest. */
-
-	rd_kafka_msgq_t rkbuf_msgq;
-
-        rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */
-
-        union {
-                struct {
-                        rd_list_t *topics;  /* Requested topics (char *) */
-                        char *reason;       /* Textual reason */
-                        rd_kafka_op_t *rko; /* Originating rko with replyq
-                                             * (if any) */
-                        int all_topics;     /* Full/All topics requested */
-
-                        int *decr;          /* Decrement this integer by one
-                                             * when request is complete:
-                                             * typically points to metadata
-                                             * cache's full_.._sent.
-                                             * Will be performed with
-                                             * decr_lock held. */
-                        mtx_t *decr_lock;
-
-                } Metadata;
-        } rkbuf_u;
-};
-
-
-typedef struct rd_kafka_bufq_s {
-	TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
-	rd_atomic32_t  rkbq_cnt;
-	rd_atomic32_t  rkbq_msg_cnt;
-} rd_kafka_bufq_t;
-
-#define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
-
-
-#define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
-#define rd_kafka_buf_destroy(rkbuf)                                     \
-        rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt,                \
-                                 rd_kafka_buf_destroy_final(rkbuf))
-
-void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
-void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
-                         int allow_crc_calc, void (*free_cb) (void *));
-#define rd_kafka_buf_push(rkbuf,buf,len,free_cb)                        \
-        rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
-rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
-#define rd_kafka_buf_new(segcnt,size) \
-        rd_kafka_buf_new0(segcnt,size,0)
-rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
-                                          int segcnt, size_t size);
-rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
-                                         void (*free_cb) (void *));
-void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
-void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
-void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
-void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
-void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
-                          rd_kafka_bufq_t *rkbufq,
-                          rd_kafka_resp_err_t err);
-void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
-				     rd_kafka_bufq_t *rkbufq);
-void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
-			 rd_kafka_bufq_t *rkbq);
-
-int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
-
-void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
-void rd_kafka_buf_callback (rd_kafka_t *rk,
-			    rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
-                            rd_kafka_buf_t *response, rd_kafka_buf_t *request);
-
-
-
-/**
- *
- * Write buffer interface
- *
- */
-
-/**
- * Set request API type version
- */
-static RD_UNUSED RD_INLINE void
-rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
-                             int16_t version, int features) {
-        rkbuf->rkbuf_reqhdr.ApiVersion = version;
-        rkbuf->rkbuf_features = features;
-}
-
-
-/**
- * @returns the ApiVersion for a request
- */
-#define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
-
-
-
-/**
- * Write (copy) data to buffer at current write-buffer position.
- * There must be enough space allocated in the rkbuf.
- * Returns offset to written destination buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
-                                        const void *data, size_t len) {
-        size_t r;
-
-        r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
-
-        if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
-                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
-
-        return r;
-}
-
-
-
-/**
- * Write (copy) 'data' to buffer at 'ptr'.
- * There must be enough space to fit 'len'.
- * This will overwrite the buffer at given location and length.
- *
- * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
- *       is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
- */
-static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
-                                          const void *data, size_t len) {
-        rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
-        rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
-}
-
-/**
- * Write int8_t to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
-					      int8_t v) {
-        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int8_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i8()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
-					     size_t of, int8_t v) {
-        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-/**
- * Write int16_t to buffer.
- * The value will be endian-swapped before write.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
-					       int16_t v) {
-        v = htobe16(v);
-        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int16_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i16()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
-                                              size_t of, int16_t v) {
-        v = htobe16(v);
-        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-/**
- * Write int32_t to buffer.
- * The value will be endian-swapped before write.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
-                                               int32_t v) {
-        v = htobe32(v);
-        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int32_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i32()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
-                                              size_t of, int32_t v) {
-        v = htobe32(v);
-        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-/**
- * Update int32_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i32()`.
- */
-static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
-                                              size_t of, uint32_t v) {
-        v = htobe32(v);
-        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-
-/**
- * Write int64_t to buffer.
- * The value will be endian-swapped before write.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf, int64_t v) {
-        v = htobe64(v);
-        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int64_t in buffer at address 'ptr'.
- * 'of' should have been previously returned by `.._buf_write_i64()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
-                                              size_t of, int64_t v) {
-        v = htobe64(v);
-        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-
-/**
- * Write (copy) Kafka string to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
-                                                const rd_kafkap_str_t *kstr) {
-        return rd_kafka_buf_write(rkbuf, RD_KAFKAP_STR_SER(kstr),
-				  RD_KAFKAP_STR_SIZE(kstr));
-}
-
-/**
- * Write (copy) char * string to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
-                                               const char *str, size_t len) {
-        size_t r;
-        if (!str)
-                len = RD_KAFKAP_STR_LEN_NULL;
-        else if (len == (size_t)-1)
-                len = strlen(str);
-        r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
-        if (str)
-                rd_kafka_buf_write(rkbuf, str, len);
-        return r;
-}
-
-
-/**
- * Push (i.e., no copy) Kafka string to buffer iovec
- */
-static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
-                                             const rd_kafkap_str_t *kstr) {
-	rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
-			  RD_KAFKAP_STR_SIZE(kstr), NULL);
-}
-
-
-
-/**
- * Write (copy) Kafka bytes to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
-					          const rd_kafkap_bytes_t *kbytes){
-        return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
-                                  RD_KAFKAP_BYTES_SIZE(kbytes));
-}
-
-/**
- * Push (i.e., no copy) Kafka bytes to buffer iovec
- */
-static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
-					       const rd_kafkap_bytes_t *kbytes){
-	rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
-			  RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
-}
-
-/**
- * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
- */
-static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
-                                                 const void *payload, size_t size) {
-        size_t r;
-        if (!payload)
-                size = RD_KAFKAP_BYTES_LEN_NULL;
-        r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
-        if (payload)
-                rd_kafka_buf_write(rkbuf, payload, size);
-        return r;
-}
-
-
-
-
-/**
- * Write Kafka Message to buffer
- * The number of bytes written is returned in '*outlenp'.
- *
- * Returns the buffer offset of the first byte.
- */
-size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
-				   rd_kafka_buf_t *rkbuf,
-				   int64_t Offset, int8_t MagicByte,
-				   int8_t Attributes, int64_t Timestamp,
-				   const void *key, int32_t key_len,
-				   const void *payload, int32_t len,
-				   int *outlenp);
-
-/**
- * Start calculating CRC from now and track it in '*crcp'.
- */
-static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
-	rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
-	rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
-	rkbuf->rkbuf_crc = rd_crc32_init();
-}
-
-/**
- * Finalizes CRC calculation and returns the calculated checksum.
- */
-static RD_INLINE RD_UNUSED
-rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
-	rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
-	return rd_crc32_finalize(rkbuf->rkbuf_crc);
-}
-
-
-
-
-
-/**
- * @brief Check if buffer's replyq.version is outdated.
- * @param rkbuf: may be NULL, for convenience.
- *
- * @returns 1 if this is an outdated buffer, else 0.
- */
-static RD_UNUSED RD_INLINE int
-rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
-        return rkbuf && rkbuf->rkbuf_replyq.version &&
-                rkbuf->rkbuf_replyq.version < version;
-}