You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:51 UTC

[21/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
new file mode 100644
index 0000000..a30f2bd
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
@@ -0,0 +1,328 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "rdkafka_feature.h"
+
+
+extern const char *rd_kafka_broker_state_names[];
+extern const char *rd_kafka_secproto_names[];
+
+struct rd_kafka_broker_s { /* rd_kafka_broker_t */
+	TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;
+
+	int32_t             rkb_nodeid;
+#define RD_KAFKA_NODEID_UA -1
+
+	rd_sockaddr_list_t *rkb_rsal;
+	time_t              rkb_t_rsal_last;
+        const rd_sockaddr_inx_t  *rkb_addr_last; /* Last used connect address */
+
+	rd_kafka_transport_t *rkb_transport;
+
+	uint32_t            rkb_corrid;
+	int                 rkb_connid;    /* Connection id, increased by
+					    * one for each connection by
+					    * this broker. Used as a safe-guard
+					    * to help troubleshooting buffer
+					    * problems across disconnects. */
+
+	rd_kafka_q_t       *rkb_ops;
+
+        mtx_t               rkb_lock;
+
+        int                 rkb_blocking_max_ms; /* Maximum IO poll blocking
+                                                  * time. */
+
+        /* Toppars handled by this broker */
+	TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
+	int                 rkb_toppar_cnt;
+
+        /* Underflowed toppars that are eligible for fetching. */
+        CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_fetch_toppars;
+        int                 rkb_fetch_toppar_cnt;
+        rd_kafka_toppar_t  *rkb_fetch_toppar_next;  /* Next 'first' toppar
+                                                     * in fetch list.
+                                                     * This is used for
+                                                     * round-robin. */
+
+
+        rd_kafka_cgrp_t    *rkb_cgrp;
+
+	rd_ts_t             rkb_ts_fetch_backoff;
+	int                 rkb_fetching;
+
+	enum {
+		RD_KAFKA_BROKER_STATE_INIT,
+		RD_KAFKA_BROKER_STATE_DOWN,
+		RD_KAFKA_BROKER_STATE_CONNECT,
+		RD_KAFKA_BROKER_STATE_AUTH,
+
+		/* Any state >= STATE_UP means the Kafka protocol layer
+		 * is operational (to some degree). */
+		RD_KAFKA_BROKER_STATE_UP,
+                RD_KAFKA_BROKER_STATE_UPDATE,
+		RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
+		RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
+	} rkb_state;
+
+        rd_ts_t             rkb_ts_state;        /* Timestamp of last
+                                                  * state change */
+        rd_interval_t       rkb_timeout_scan_intvl;  /* Waitresp timeout scan
+                                                      * interval. */
+
+        rd_atomic32_t       rkb_blocking_request_cnt; /* The number of
+                                                       * in-flight blocking
+                                                       * requests.
+                                                       * A blocking request is
+                                                       * one that is known to
+                                                       * possibly block on the
+                                                       * broker for longer than
+                                                       * the typical processing
+                                                       * time, e.g.:
+                                                       * JoinGroup, SyncGroup */
+
+	int                 rkb_features;    /* Protocol features supported
+					      * by this broker.
+					      * See RD_KAFKA_FEATURE_* in
+					      * rdkafka_proto.h */
+
+        struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
+                                                      * (MUST be sorted) */
+	size_t                      rkb_ApiVersions_cnt;
+	rd_interval_t               rkb_ApiVersion_fail_intvl; /* Controls how long
+								* the fallback proto
+								* will be used after
+								* ApiVersionRequest
+								* failure. */
+
+	rd_kafka_confsource_t  rkb_source;
+	struct {
+		rd_atomic64_t tx_bytes;
+		rd_atomic64_t tx;    /* Kafka-messages (not payload msgs) */
+		rd_atomic64_t tx_err;
+		rd_atomic64_t tx_retries;
+		rd_atomic64_t req_timeouts;  /* Accumulated value */
+
+		rd_atomic64_t rx_bytes;
+		rd_atomic64_t rx;    /* Kafka messages (not payload msgs) */
+		rd_atomic64_t rx_err;
+		rd_atomic64_t rx_corrid_err; /* CorrId misses */
+		rd_atomic64_t rx_partial;    /* Partial messages received
+                                              * and dropped. */
+                rd_atomic64_t zbuf_grow;     /* Compression/decompression buffer grows needed */
+                rd_atomic64_t buf_grow;      /* rkbuf grows needed */
+                rd_atomic64_t wakeups;       /* Poll wakeups */
+	} rkb_c;
+
+        int                 rkb_req_timeouts;  /* Current value */
+
+	rd_ts_t             rkb_ts_metadata_poll; /* Next metadata poll time */
+	int                 rkb_metadata_fast_poll_cnt; /* Perform fast
+							 * metadata polls. */
+	thrd_t              rkb_thread;
+
+	rd_refcnt_t         rkb_refcnt;
+
+        rd_kafka_t         *rkb_rk;
+
+	rd_kafka_buf_t     *rkb_recv_buf;
+
+	int                 rkb_max_inflight;   /* Maximum number of in-flight
+						 * requests to broker.
+						 * Compared to rkb_waitresps length.*/
+	rd_kafka_bufq_t     rkb_outbufs;
+	rd_kafka_bufq_t     rkb_waitresps;
+	rd_kafka_bufq_t     rkb_retrybufs;
+
+	rd_avg_t            rkb_avg_int_latency;/* Current internal latency period*/
+	rd_avg_t            rkb_avg_rtt;        /* Current RTT period */
+	rd_avg_t            rkb_avg_throttle;   /* Current throttle period */
+
+        /* These are all protected by rkb_lock */
+	char                rkb_name[RD_KAFKA_NODENAME_SIZE];  /* Displ name */
+	char                rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
+        uint16_t            rkb_port;                          /* TCP port */
+        char               *rkb_origname;                      /* Original
+                                                                * host name */
+
+
+        /* Logging name is a copy of rkb_name, protected by its own mutex */
+        char               *rkb_logname;
+        mtx_t               rkb_logname_lock;
+
+        int                 rkb_wakeup_fd[2];     /* Wake-up fds (r/w) to wake
+                                                   * up from IO-wait when
+                                                   * queues have content. */
+        int                 rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd,
+                                                   * this is rkb_wakeup_fd[1]
+                                                   * if enabled. */
+        rd_interval_t       rkb_connect_intvl;    /* Reconnect throttling */
+
+	rd_kafka_secproto_t rkb_proto;
+
+	int                 rkb_down_reported;    /* Down event reported */
+#if WITH_SASL_CYRUS
+	rd_kafka_timer_t    rkb_sasl_kinit_refresh_tmr;
+#endif
+
+
+	struct {
+		char msg[512];
+		int  err;  /* errno */
+	} rkb_err;
+};
+
+#define rd_kafka_broker_keep(rkb)   rd_refcnt_add(&(rkb)->rkb_refcnt)
+#define rd_kafka_broker_lock(rkb)   mtx_lock(&(rkb)->rkb_lock)
+#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)
+
+
+/**
+ * @brief Broker comparator
+ */
+static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a,
+                                                    const void *_b) {
+        const rd_kafka_broker_t *a = _a, *b = _b;
+        return (int)(a - b);
+}
+
+
+/**
+ * @returns true if broker supports \p features, else false.
+ */
+static RD_UNUSED
+int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) {
+	int r;
+	rd_kafka_broker_lock(rkb);
+	r = (rkb->rkb_features & features) == features;
+	rd_kafka_broker_unlock(rkb);
+	return r;
+}
+
+int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
+                                              int16_t ApiKey,
+                                              int16_t minver, int16_t maxver,
+                                              int *featuresp);
+
+int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb);
+
+rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk,
+						   int32_t nodeid);
+rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk,
+                                                    int32_t nodeid,
+                                                    int state);
+#define rd_kafka_broker_find_by_nodeid(rk,nodeid) \
+        rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1)
+
+/**
+ * Filter out brokers that are currently in a blocking request.
+ */
+static RD_INLINE RD_UNUSED int
+rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) {
+        return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0;
+}
+
+/**
+ * Filter out brokers that cant do GroupCoordinator requests right now.
+ */
+static RD_INLINE RD_UNUSED int
+rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) {
+        return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 ||
+		!(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD);
+}
+
+rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
+                                        int (*filter) (rd_kafka_broker_t *rkb,
+                                                       void *opaque),
+                                        void *opaque);
+
+rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms,
+                                               int do_lock);
+
+rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, int state);
+
+int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
+void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
+
+void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
+			   int level, rd_kafka_resp_err_t err,
+			   const char *fmt, ...);
+
+void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);
+
+#define rd_kafka_broker_destroy(rkb)                                    \
+        rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt,                    \
+                                 rd_kafka_broker_destroy_final(rkb))
+
+
+void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
+                             const struct rd_kafka_metadata_broker *mdb);
+rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
+					rd_kafka_confsource_t source,
+					rd_kafka_secproto_t proto,
+					const char *name, uint16_t port,
+					int32_t nodeid);
+
+void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb);
+void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr);
+
+int rd_kafka_send (rd_kafka_broker_t *rkb);
+int rd_kafka_recv (rd_kafka_broker_t *rkb);
+
+void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
+		       rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err);
+
+void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
+                               rd_kafka_buf_t *rkbuf,
+                               rd_kafka_resp_cb_t *resp_cb,
+                               void *opaque);
+
+void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
+                                     rd_kafka_buf_t *rkbuf,
+                                     rd_kafka_replyq_t replyq,
+                                     rd_kafka_resp_cb_t *resp_cb,
+                                     void *opaque);
+
+void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
+
+
+rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk);
+
+void msghdr_print (rd_kafka_t *rk,
+		   const char *what, const struct msghdr *msg,
+		   int hexdump);
+
+const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb);
+void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb);
+
+int rd_kafka_brokers_get_state_version (rd_kafka_t *rk);
+int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
+					int timeout_ms);
+void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
new file mode 100644
index 0000000..9b50737
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
@@ -0,0 +1,428 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_buf.h"
+#include "rdkafka_broker.h"
+
+void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf) {
+
+        switch (rkbuf->rkbuf_reqhdr.ApiKey)
+        {
+        case RD_KAFKAP_Metadata:
+                if (rkbuf->rkbuf_u.Metadata.topics)
+                        rd_list_destroy(rkbuf->rkbuf_u.Metadata.topics);
+                if (rkbuf->rkbuf_u.Metadata.reason)
+                        rd_free(rkbuf->rkbuf_u.Metadata.reason);
+                if (rkbuf->rkbuf_u.Metadata.rko)
+                        rd_kafka_op_reply(rkbuf->rkbuf_u.Metadata.rko,
+                                          RD_KAFKA_RESP_ERR__DESTROY);
+                if (rkbuf->rkbuf_u.Metadata.decr) {
+                        /* Decrease metadata cache's full_.._sent state. */
+                        mtx_lock(rkbuf->rkbuf_u.Metadata.decr_lock);
+                        rd_kafka_assert(NULL,
+                                        (*rkbuf->rkbuf_u.Metadata.decr) > 0);
+                        (*rkbuf->rkbuf_u.Metadata.decr)--;
+                        mtx_unlock(rkbuf->rkbuf_u.Metadata.decr_lock);
+                }
+                break;
+        }
+
+        if (rkbuf->rkbuf_response)
+                rd_kafka_buf_destroy(rkbuf->rkbuf_response);
+
+        rd_kafka_replyq_destroy(&rkbuf->rkbuf_replyq);
+        rd_kafka_replyq_destroy(&rkbuf->rkbuf_orig_replyq);
+
+        rd_buf_destroy(&rkbuf->rkbuf_buf);
+
+        if (rkbuf->rkbuf_rktp_vers)
+                rd_list_destroy(rkbuf->rkbuf_rktp_vers);
+
+        if (rkbuf->rkbuf_rkb)
+                rd_kafka_broker_destroy(rkbuf->rkbuf_rkb);
+
+        rd_refcnt_destroy(&rkbuf->rkbuf_refcnt);
+
+	rd_free(rkbuf);
+}
+
+
+
+/**
+ * @brief Pushes \p buf of size \p len as a new segment on the buffer.
+ *
+ * \p buf will NOT be freed by the buffer.
+ */
+void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
+                         int allow_crc_calc, void (*free_cb) (void *)) {
+        rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
+
+        if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
+                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
+}
+
+
+
+/**
+ * @brief Create a new buffer with \p segcmt initial segments and \p size bytes
+ *        of initial backing memory.
+ *        The underlying buffer will grow as needed.
+ *
+ * If \p rk is non-NULL (typical case):
+ * Additional space for the Kafka protocol headers is inserted automatically.
+ */
+rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags) {
+        rd_kafka_buf_t *rkbuf;
+
+        rkbuf = rd_calloc(1, sizeof(*rkbuf));
+
+        rkbuf->rkbuf_flags = flags;
+
+        rd_buf_init(&rkbuf->rkbuf_buf, segcnt, size);
+        rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
+        rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
+
+        return rkbuf;
+}
+
+
+/**
+ * @brief Create new request buffer with the request-header written (will
+ *        need to be updated with Length, etc, later)
+ */
+rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
+                                          int segcnt, size_t size) {
+        rd_kafka_buf_t *rkbuf;
+
+        /* Make room for common protocol request headers */
+        size += RD_KAFKAP_REQHDR_SIZE +
+                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);
+        segcnt += 1; /* headers */
+
+        rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
+
+        rkbuf->rkbuf_rkb = rkb;
+        rd_kafka_broker_keep(rkb);
+
+        rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
+
+        /* Write request header, will be updated later. */
+        /* Length: updated later */
+        rd_kafka_buf_write_i32(rkbuf, 0);
+        /* ApiKey */
+        rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
+        /* ApiVersion: updated later */
+        rd_kafka_buf_write_i16(rkbuf, 0);
+        /* CorrId: updated later */
+        rd_kafka_buf_write_i32(rkbuf, 0);
+
+        /* ClientId */
+        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
+
+        return rkbuf;
+}
+
+
+
+
+/**
+ * @brief Create new read-only rkbuf shadowing a memory region.
+ *
+ * @remark \p free_cb (possibly NULL) will be used to free \p ptr when
+ *         buffer refcount reaches 0.
+ * @remark the buffer may only be read from, not written to.
+ */
+rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
+                                         void (*free_cb) (void *)) {
+	rd_kafka_buf_t *rkbuf;
+
+	rkbuf = rd_calloc(1, sizeof(*rkbuf));
+
+        rkbuf->rkbuf_reqhdr.ApiKey = RD_KAFKAP_None;
+
+        rd_buf_init(&rkbuf->rkbuf_buf, 1, 0);
+        rd_buf_push(&rkbuf->rkbuf_buf, ptr, size, free_cb);
+
+        rkbuf->rkbuf_totlen  = size;
+
+        /* Initialize reader slice */
+        rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
+
+	rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
+
+        rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
+
+	return rkbuf;
+}
+
+
+
+void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
+	TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
+	(void)rd_atomic32_add(&rkbufq->rkbq_cnt, 1);
+	(void)rd_atomic32_add(&rkbufq->rkbq_msg_cnt,
+                            rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
+}
+
+void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
+	TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
+	rd_kafka_assert(NULL, rd_atomic32_get(&rkbufq->rkbq_cnt) > 0);
+	(void)rd_atomic32_sub(&rkbufq->rkbq_cnt, 1);
+	(void)rd_atomic32_sub(&rkbufq->rkbq_msg_cnt,
+                          rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
+}
+
+void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
+	TAILQ_INIT(&rkbufq->rkbq_bufs);
+	rd_atomic32_init(&rkbufq->rkbq_cnt, 0);
+	rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
+}
+
+/**
+ * Concat all buffers from 'src' to tail of 'dst'
+ */
+void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
+	TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
+	(void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
+	(void)rd_atomic32_add(&dst->rkbq_msg_cnt, rd_atomic32_get(&src->rkbq_msg_cnt));
+	rd_kafka_bufq_init(src);
+}
+
+/**
+ * Purge the wait-response queue.
+ * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
+ *       or rkb_outbufs since buffers may be re-enqueued on those queues.
+ *       'rkbufq' needs to be bufq_init():ed before reuse after this call.
+ */
+void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
+                          rd_kafka_bufq_t *rkbufq,
+                          rd_kafka_resp_err_t err) {
+	rd_kafka_buf_t *rkbuf, *tmp;
+
+	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
+
+	rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
+		   rd_atomic32_get(&rkbufq->rkbq_cnt));
+
+	TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
+                rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
+        }
+}
+
+
+/**
+ * @brief Update bufq for connection reset:
+ *
+ * - Purge connection-setup API requests from the queue.
+ * - Reset any partially sent buffer's offset. (issue #756)
+ *
+ * Request types purged:
+ *   ApiVersion
+ *   SaslHandshake
+ */
+void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
+				     rd_kafka_bufq_t *rkbufq) {
+	rd_kafka_buf_t *rkbuf, *tmp;
+
+	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
+
+	rd_rkb_dbg(rkb, QUEUE, "BUFQ",
+		   "Updating %d buffers on connection reset",
+		   rd_atomic32_get(&rkbufq->rkbq_cnt));
+
+	TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
+		switch (rkbuf->rkbuf_reqhdr.ApiKey)
+		{
+		case RD_KAFKAP_ApiVersion:
+		case RD_KAFKAP_SaslHandshake:
+			rd_kafka_bufq_deq(rkbufq, rkbuf);
+			rd_kafka_buf_callback(rkb->rkb_rk, rkb,
+					      RD_KAFKA_RESP_ERR__DESTROY,
+					      NULL, rkbuf);
+			break;
+                default:
+                        /* Reset buffer send position */
+                        rd_slice_seek(&rkbuf->rkbuf_reader, 0);
+                        break;
+		}
+        }
+}
+
+
+void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
+			 rd_kafka_bufq_t *rkbq) {
+	rd_kafka_buf_t *rkbuf;
+	int cnt = rd_kafka_bufq_cnt(rkbq);
+	rd_ts_t now;
+
+	if (!cnt)
+		return;
+
+	now = rd_clock();
+
+	rd_rkb_dbg(rkb, BROKER, fac, "bufq with %d buffer(s):", cnt);
+
+	TAILQ_FOREACH(rkbuf, &rkbq->rkbq_bufs, rkbuf_link) {
+		rd_rkb_dbg(rkb, BROKER, fac,
+			   " Buffer %s (%"PRIusz" bytes, corrid %"PRId32", "
+			   "connid %d, retry %d in %lldms, timeout in %lldms",
+			   rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
+			   rkbuf->rkbuf_totlen, rkbuf->rkbuf_corrid,
+			   rkbuf->rkbuf_connid, rkbuf->rkbuf_retries,
+			   rkbuf->rkbuf_ts_retry ?
+			   (now - rkbuf->rkbuf_ts_retry) / 1000LL : 0,
+			   rkbuf->rkbuf_ts_timeout ?
+			   (now - rkbuf->rkbuf_ts_timeout) / 1000LL : 0);
+	}
+}
+
+
+
+
+/**
+ * Retry failed request, depending on the error.
+ * @remark \p rkb may be NULL
+ * Returns 1 if the request was scheduled for retry, else 0.
+ */
+int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
+
+        if (unlikely(!rkb ||
+		     rkb->rkb_source == RD_KAFKA_INTERNAL ||
+		     rd_kafka_terminating(rkb->rkb_rk) ||
+		     rkbuf->rkbuf_retries + 1 >
+		     rkb->rkb_rk->rk_conf.max_retries))
+                return 0;
+
+	/* Try again */
+	rkbuf->rkbuf_ts_sent = 0;
+	rkbuf->rkbuf_retries++;
+	rd_kafka_buf_keep(rkbuf);
+	rd_kafka_broker_buf_retry(rkb, rkbuf);
+	return 1;
+}
+
+
+/**
+ * @brief Handle RD_KAFKA_OP_RECV_BUF.
+ */
+void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
+        rd_kafka_buf_t *request, *response;
+
+        request = rko->rko_u.xbuf.rkbuf;
+        rko->rko_u.xbuf.rkbuf = NULL;
+
+        /* NULL on op_destroy() */
+	if (request->rkbuf_replyq.q) {
+		int32_t version = request->rkbuf_replyq.version;
+                /* Current queue usage is done, but retain original replyq for
+                 * future retries, stealing
+                 * the current reference. */
+                request->rkbuf_orig_replyq = request->rkbuf_replyq;
+                rd_kafka_replyq_clear(&request->rkbuf_replyq);
+		/* Callback might need to version check so we retain the
+		 * version across the clear() call which clears it. */
+		request->rkbuf_replyq.version = version;
+	}
+
+	if (!request->rkbuf_cb) {
+		rd_kafka_buf_destroy(request);
+		return;
+	}
+
+        /* Let buf_callback() do destroy()s */
+        response = request->rkbuf_response; /* May be NULL */
+        request->rkbuf_response = NULL;
+
+        rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
+			      request->rkbuf_rkb, err,
+                              response, request);
+}
+
+
+
+/**
+ * Call request.rkbuf_cb(), but:
+ *  - if the rkbuf has a rkbuf_replyq the buffer is enqueued on that queue
+ *    with op type RD_KAFKA_OP_RECV_BUF.
+ *  - else call rkbuf_cb().
+ *
+ * \p response may be NULL.
+ *
+ * Will decrease refcount for both response and request, eventually.
+ */
+void rd_kafka_buf_callback (rd_kafka_t *rk,
+			    rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
+                            rd_kafka_buf_t *response, rd_kafka_buf_t *request){
+
+        /* Decide if the request should be retried.
+         * This is always done in the originating broker thread. */
+        if (unlikely(err && err != RD_KAFKA_RESP_ERR__DESTROY &&
+		     rd_kafka_buf_retry(rkb, request))) {
+		/* refcount for retry was increased in buf_retry() so we can
+		 * let go of this caller's refcounts. */
+		rd_kafka_buf_destroy(request);
+		if (response)
+			rd_kafka_buf_destroy(response);
+                return;
+	}
+
+        if (err != RD_KAFKA_RESP_ERR__DESTROY && request->rkbuf_replyq.q) {
+                rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
+
+		rd_kafka_assert(NULL, !request->rkbuf_response);
+		request->rkbuf_response = response;
+
+                /* Increment refcnt since rko_rkbuf will be decref:ed
+                 * if replyq_enq() fails and we dont want the rkbuf gone in that
+                 * case. */
+                rd_kafka_buf_keep(request);
+                rko->rko_u.xbuf.rkbuf = request;
+
+                rko->rko_err = err;
+
+                /* Copy original replyq for future retries, with its own
+                 * queue reference. */
+                rd_kafka_replyq_copy(&request->rkbuf_orig_replyq,
+                                     &request->rkbuf_replyq);
+
+	        rd_kafka_replyq_enq(&request->rkbuf_replyq, rko, 0);
+
+		rd_kafka_buf_destroy(request); /* from keep above */
+		return;
+        }
+
+        if (request->rkbuf_cb)
+                request->rkbuf_cb(rk, rkb, err, response, request,
+                                  request->rkbuf_opaque);
+
+        rd_kafka_buf_destroy(request);
+	if (response)
+		rd_kafka_buf_destroy(response);
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
new file mode 100644
index 0000000..5aa2876
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
@@ -0,0 +1,819 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#pragma once
+
+#include "rdkafka_int.h"
+#include "rdcrc32.h"
+#include "rdlist.h"
+#include "rdbuf.h"
+
+
+typedef struct rd_kafka_broker_s rd_kafka_broker_t;
+
+#define RD_KAFKA_HEADERS_IOV_CNT   2
+
+
+/**
+ * Temporary buffer with memory aligned writes to accommodate
+ * effective and platform safe struct writes.
+ */
+typedef struct rd_tmpabuf_s {
+	size_t size;
+	size_t of;
+	char  *buf;
+	int    failed;
+	int    assert_on_fail;
+} rd_tmpabuf_t;
+
+/**
+ * @brief Allocate new tmpabuf with \p size bytes pre-allocated.
+ */
+static RD_UNUSED void
+rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
+	tab->buf = rd_malloc(size);
+	tab->size = size;
+	tab->of = 0;
+	tab->failed = 0;
+	tab->assert_on_fail = assert_on_fail;
+}
+
+/**
+ * @brief Free memory allocated by tmpabuf
+ */
+static RD_UNUSED void
+rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
+	rd_free(tab->buf);
+}
+
+/**
+ * @returns 1 if a previous operation failed.
+ */
+static RD_UNUSED RD_INLINE int
+rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
+	return tab->failed;
+}
+
+/**
+ * @brief Allocate \p size bytes for writing, returning an aligned pointer
+ *        to the memory.
+ * @returns the allocated pointer (within the tmpabuf) on success or
+ *          NULL if the requested number of bytes + alignment is not available
+ *          in the tmpabuf.
+ */
+static RD_UNUSED void *
+rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
+	void *ptr;
+
+	if (unlikely(tab->failed))
+		return NULL;
+
+	if (unlikely(tab->of + size > tab->size)) {
+		if (tab->assert_on_fail) {
+			fprintf(stderr,
+				"%s: %s:%d: requested size %zd + %zd > %zd\n",
+				__FUNCTION__, func, line, tab->of, size,
+				tab->size);
+			assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
+		}
+		return NULL;
+	}
+
+        ptr = (void *)(tab->buf + tab->of);
+	tab->of += RD_ROUNDUP(size, 8);
+
+	return ptr;
+}
+
+#define rd_tmpabuf_alloc(tab,size) \
+	rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
+
+/**
+ * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
+ *
+ * @returns the allocated and written-to pointer (within the tmpabuf) on success
+ *          or NULL if the requested number of bytes + alignment is not available
+ *          in the tmpabuf.
+ */
+static RD_UNUSED void *
+rd_tmpabuf_write0 (const char *func, int line,
+		   rd_tmpabuf_t *tab, const void *buf, size_t size) {
+	void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
+
+	if (ptr)
+		memcpy(ptr, buf, size);
+
+	return ptr;
+}
+#define rd_tmpabuf_write(tab,buf,size) \
+	rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
+
+
+/**
+ * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
+ */
+static RD_UNUSED char *
+rd_tmpabuf_write_str0 (const char *func, int line,
+		       rd_tmpabuf_t *tab, const char *str) {
+	return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
+}
+#define rd_tmpabuf_write_str(tab,str) \
+	rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
+
+
+
+/**
+ * @name Read buffer interface
+ *
+ * Memory reading helper macros to be used when parsing network responses.
+ *
+ * Assumptions:
+ *   - an 'err_parse:' goto-label must be available for error bailouts,
+ *                     the error code will be set in rkbuf->rkbuf_err
+ *   - local `int log_decode_errors` variable set to the logging level
+ *     to log parse errors (or 0 to turn off logging).
+ */
+
+#define rd_kafka_buf_parse_fail(rkbuf,...) do {				\
+                if (log_decode_errors > 0) {                            \
+			rd_kafka_assert(NULL, rkbuf->rkbuf_rkb);	\
+                        rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
+                                   "PROTOERR",                          \
+                                   "Protocol parse failure "            \
+                                   "at %"PRIusz"/%"PRIusz" (%s:%i) "    \
+                                   "(incorrect broker.version.fallback?)", \
+                                   rd_slice_offset(&rkbuf->rkbuf_reader), \
+                                   rd_slice_size(&rkbuf->rkbuf_reader), \
+                                   __FUNCTION__, __LINE__);             \
+                        rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
+				   "PROTOERR", __VA_ARGS__);		\
+                }                                                       \
+                (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG;        \
+                goto err_parse;                                         \
+	} while (0)
+
+
+
+/**
+ * Returns the number of remaining bytes available to read.
+ */
+#define rd_kafka_buf_read_remain(rkbuf) \
+        rd_slice_remains(&(rkbuf)->rkbuf_reader)
+
+/**
+ * Checks that at least 'len' bytes remain to be read in buffer, else fails.
+ */
+#define rd_kafka_buf_check_len(rkbuf,len) do {                          \
+                size_t __len0 = (size_t)(len);                          \
+                if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
+                        rd_kafka_buf_parse_fail(                        \
+                                rkbuf,                                  \
+                                "expected %"PRIusz" bytes > %"PRIusz    \
+                                " remaining bytes",                     \
+                                __len0, rd_kafka_buf_read_remain(rkbuf)); \
+                        (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
+                        goto err_parse;                                 \
+                }                                                       \
+        } while (0)
+
+/**
+ * Skip (as in read and ignore) the next 'len' bytes.
+ */
+#define rd_kafka_buf_skip(rkbuf, len) do {                              \
+                size_t __len1 = (size_t)(len);                          \
+                if (__len1 &&                                           \
+                    !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
+                        rd_kafka_buf_check_len(rkbuf, __len1);           \
+        } while (0)
+
+/**
+ * Skip (as in read and ignore) up to fixed position \p pos.
+ */
+#define rd_kafka_buf_skip_to(rkbuf, pos) do {                           \
+                size_t __len1 = (size_t)(pos) -                         \
+                        rd_slice_offset(&(rkbuf)->rkbuf_reader);        \
+                if (__len1 &&                                           \
+                    !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
+                        rd_kafka_buf_check_len(rkbuf, __len1);           \
+        } while (0)
+
+
+
+/**
+ * Read 'len' bytes and copy to 'dstptr'
+ */
+#define rd_kafka_buf_read(rkbuf,dstptr,len) do {                        \
+                size_t __len2 = (size_t)(len);                          \
+                if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2))  \
+                        rd_kafka_buf_check_len(rkbuf, __len2);          \
+        } while (0)
+
+
+/**
+ * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
+ *        without affecting the current reader position.
+ */
+#define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do {                 \
+                size_t __len2 = (size_t)(len);                          \
+                if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset,      \
+                                   dstptr, __len2))                     \
+                        rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
+        } while (0)
+
+
+/**
+ * Read a 16,32,64-bit integer and store it in 'dstptr'
+ */
+#define rd_kafka_buf_read_i64(rkbuf,dstptr) do {                        \
+                int64_t _v;                                             \
+                rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
+                *(dstptr) = be64toh(_v);                                \
+        } while (0)
+
+#define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do {                     \
+                int64_t _v;                                             \
+                rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v));          \
+                *(dstptr) = be64toh(_v);                                \
+        } while (0)
+
+#define rd_kafka_buf_read_i32(rkbuf,dstptr) do {                        \
+                int32_t _v;                                             \
+                rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
+                *(dstptr) = be32toh(_v);                                \
+        } while (0)
+
+/* Same as .._read_i32 but does a direct assignment.
+ * dst is assumed to be a scalar, not pointer. */
+#define rd_kafka_buf_read_i32a(rkbuf, dst) do {				\
+                int32_t _v;                                             \
+		rd_kafka_buf_read(rkbuf, &_v, 4);			\
+		dst = (int32_t) be32toh(_v);				\
+	} while (0)
+
+#define rd_kafka_buf_read_i16(rkbuf,dstptr) do {                        \
+                int16_t _v;                                             \
+                rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
+                *(dstptr) = be16toh(_v);                                \
+        } while (0)
+
+
+#define rd_kafka_buf_read_i16a(rkbuf, dst) do {				\
+                int16_t _v;                                             \
+		rd_kafka_buf_read(rkbuf, &_v, 2);			\
+                dst = (int16_t)be16toh(_v);				\
+	} while (0)
+
+#define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
+
+#define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
+
+
+/**
+ * @brief Read varint and store in int64_t \p dst
+ */
+#define rd_kafka_buf_read_varint(rkbuf,dst) do {                        \
+                int64_t _v;                                             \
+                size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \
+                if (unlikely(RD_UVARINT_UNDERFLOW(_r)))                 \
+                        rd_kafka_buf_parse_fail(rkbuf,                  \
+                                                "varint parsing failed: " \
+                                                "buffer underflow");    \
+                *(dst) = _v;                                            \
+        } while (0)
+
+/* Read Kafka String representation (2+N).
+ * The kstr data will be updated to point to the rkbuf. */
+#define rd_kafka_buf_read_str(rkbuf, kstr) do {                         \
+                int _klen;                                              \
+                rd_kafka_buf_read_i16a(rkbuf, (kstr)->len);             \
+                _klen = RD_KAFKAP_STR_LEN(kstr);                        \
+                if (RD_KAFKAP_STR_LEN0(_klen) == 0)                     \
+                        (kstr)->str = NULL;                             \
+                else if (!((kstr)->str =                                \
+                           rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
+                                                     _klen)))           \
+                        rd_kafka_buf_check_len(rkbuf, _klen);           \
+        } while (0)
+
+/* Read Kafka String representation (2+N) and write it to the \p tmpabuf
+ * with a trailing nul byte. */
+#define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do {		\
+                rd_kafkap_str_t _kstr;					\
+		size_t _slen;						\
+		char *_dst;						\
+		rd_kafka_buf_read_str(rkbuf, &_kstr);			\
+		_slen = RD_KAFKAP_STR_LEN(&_kstr);			\
+		if (!(_dst =						\
+		      rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1)))	\
+			rd_kafka_buf_parse_fail(			\
+				rkbuf,					\
+				"Not enough room in tmpabuf: "		\
+				"%"PRIusz"+%"PRIusz			\
+				" > %"PRIusz,				\
+				(tmpabuf)->of, _slen+1, (tmpabuf)->size); \
+		_dst[_slen] = '\0';					\
+		dst = (void *)_dst;					\
+	} while (0)
+
+/**
+ * Skip a string.
+ */
+#define rd_kafka_buf_skip_str(rkbuf) do {			\
+		int16_t _slen;					\
+		rd_kafka_buf_read_i16(rkbuf, &_slen);		\
+		rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen));	\
+	} while (0)
+
+/* Read Kafka Bytes representation (4+N).
+ *  The 'kbytes' will be updated to point to rkbuf data */
+#define rd_kafka_buf_read_bytes(rkbuf, kbytes) do {                     \
+                int _klen;                                              \
+                rd_kafka_buf_read_i32a(rkbuf, _klen);                   \
+                (kbytes)->len = _klen;                                  \
+                if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) {                  \
+                        (kbytes)->data = NULL;                          \
+                        (kbytes)->len = 0;                              \
+                } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0)            \
+                        (kbytes)->data = "";                            \
+                else if (!((kbytes)->data =                             \
+                           rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
+                                                  _klen)))              \
+                        rd_kafka_buf_check_len(rkbuf, _klen);           \
+        } while (0)
+
+
+/**
+ * @brief Read \p size bytes from buffer, setting \p *ptr to the start
+ *        of the memory region.
+ */
+#define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do {                      \
+                size_t _klen = size;                                    \
+                if (!(*(ptr) = (void *)                                 \
+                      rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
+                        rd_kafka_buf_check_len(rkbuf, _klen);           \
+        } while (0)
+
+
+/**
+ * @brief Read varint-lengted Kafka Bytes representation
+ */
+#define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do {               \
+                int64_t _len2;                                          \
+                size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \
+                                                &_len2);                \
+                if (unlikely(RD_UVARINT_UNDERFLOW(_r)))                 \
+                        rd_kafka_buf_parse_fail(rkbuf,                  \
+                                                "varint parsing failed: " \
+                                                "buffer underflow");    \
+                (kbytes)->len = (int32_t)_len2;                         \
+                if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) {                  \
+                        (kbytes)->data = NULL;                          \
+                        (kbytes)->len = 0;                              \
+                } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0)            \
+                        (kbytes)->data = "";                            \
+                else if (!((kbytes)->data =                             \
+                           rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
+                                                  _len2)))              \
+                        rd_kafka_buf_check_len(rkbuf, _len2);           \
+        } while (0)
+
+
+/**
+ * Response handling callback.
+ *
+ * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
+ *       which indicates that some entity is terminating (rd_kafka_t, broker,
+ *       toppar, queue, etc) and the callback may not be called in the
+ *       correct thread. In this case the callback must perform just
+ *       the most minimal cleanup and dont trigger any other operations.
+ *
+ * NOTE: rkb, reply and request may be NULL, depending on error situation.
+ */
+typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
+				   rd_kafka_broker_t *rkb,
+                                   rd_kafka_resp_err_t err,
+                                   rd_kafka_buf_t *reply,
+                                   rd_kafka_buf_t *request,
+                                   void *opaque);
+
+struct rd_kafka_buf_s { /* rd_kafka_buf_t */
+	TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
+
+	int32_t rkbuf_corrid;
+
+	rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */
+
+	int     rkbuf_flags; /* RD_KAFKA_OP_F */
+
+        rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */
+        rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */
+
+	int     rkbuf_connid;      /* broker connection id (used when buffer
+				    * was partially sent). */
+        size_t  rkbuf_totlen;      /* recv: total expected length,
+                                    * send: not used */
+
+	rd_crc32_t rkbuf_crc;      /* Current CRC calculation */
+
+	struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
+                                                 * These fields are encoded
+                                                 * and written to output buffer
+                                                 * on buffer finalization. */
+	struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
+                                                 * Decoded fields are copied
+                                                 * here from the buffer
+                                                 * to provide an ease-of-use
+                                                 * interface to the header */
+
+	int32_t rkbuf_expected_size;  /* expected size of message */
+
+        rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
+        rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
+                                                 * for retries from inside
+                                                 * the rkbuf_cb() callback
+                                                 * since rkbuf_replyq will
+                                                 * have been reset. */
+        rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
+        struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */
+
+        struct rd_kafka_broker_s *rkbuf_rkb;
+
+	rd_refcnt_t rkbuf_refcnt;
+	void   *rkbuf_opaque;
+
+	int     rkbuf_retries;            /* Retries so far. */
+#define RD_KAFKA_BUF_NO_RETRIES  1000000  /* Do not retry */
+
+        int     rkbuf_features;   /* Required feature(s) that must be
+                                   * supported by broker. */
+
+	rd_ts_t rkbuf_ts_enq;
+	rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
+				   * after response: RTT. */
+	rd_ts_t rkbuf_ts_timeout;
+
+        int64_t rkbuf_offset;     /* Used by OffsetCommit */
+
+	rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
+					* Used by FetchRequest. */
+
+	rd_kafka_msgq_t rkbuf_msgq;
+
+        rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */
+
+        union {
+                struct {
+                        rd_list_t *topics;  /* Requested topics (char *) */
+                        char *reason;       /* Textual reason */
+                        rd_kafka_op_t *rko; /* Originating rko with replyq
+                                             * (if any) */
+                        int all_topics;     /* Full/All topics requested */
+
+                        int *decr;          /* Decrement this integer by one
+                                             * when request is complete:
+                                             * typically points to metadata
+                                             * cache's full_.._sent.
+                                             * Will be performed with
+                                             * decr_lock held. */
+                        mtx_t *decr_lock;
+
+                } Metadata;
+        } rkbuf_u;
+};
+
+
+typedef struct rd_kafka_bufq_s {
+	TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
+	rd_atomic32_t  rkbq_cnt;
+	rd_atomic32_t  rkbq_msg_cnt;
+} rd_kafka_bufq_t;
+
+#define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
+
+
+#define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
+#define rd_kafka_buf_destroy(rkbuf)                                     \
+        rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt,                \
+                                 rd_kafka_buf_destroy_final(rkbuf))
+
+void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
+void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
+                         int allow_crc_calc, void (*free_cb) (void *));
+#define rd_kafka_buf_push(rkbuf,buf,len,free_cb)                        \
+        rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
+rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
+#define rd_kafka_buf_new(segcnt,size) \
+        rd_kafka_buf_new0(segcnt,size,0)
+rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
+                                          int segcnt, size_t size);
+rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
+                                         void (*free_cb) (void *));
+void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
+void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
+void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
+void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
+void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
+                          rd_kafka_bufq_t *rkbufq,
+                          rd_kafka_resp_err_t err);
+void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
+				     rd_kafka_bufq_t *rkbufq);
+void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
+			 rd_kafka_bufq_t *rkbq);
+
+int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
+
+void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
+void rd_kafka_buf_callback (rd_kafka_t *rk,
+			    rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
+                            rd_kafka_buf_t *response, rd_kafka_buf_t *request);
+
+
+
+/**
+ *
+ * Write buffer interface
+ *
+ */
+
+/**
+ * Set request API type version
+ */
+static RD_UNUSED RD_INLINE void
+rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
+                             int16_t version, int features) {
+        rkbuf->rkbuf_reqhdr.ApiVersion = version;
+        rkbuf->rkbuf_features = features;
+}
+
+
+/**
+ * @returns the ApiVersion for a request
+ */
+#define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
+
+
+
+/**
+ * Write (copy) data to buffer at current write-buffer position.
+ * There must be enough space allocated in the rkbuf.
+ * Returns offset to written destination buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
+                                        const void *data, size_t len) {
+        size_t r;
+
+        r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
+
+        if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
+                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
+
+        return r;
+}
+
+
+
+/**
+ * Write (copy) 'data' to buffer at 'ptr'.
+ * There must be enough space to fit 'len'.
+ * This will overwrite the buffer at given location and length.
+ *
+ * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
+ *       is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
+ */
+static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
+                                          const void *data, size_t len) {
+        rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
+        rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
+}
+
+/**
+ * Write int8_t to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
+					      int8_t v) {
+        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int8_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i8()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
+					     size_t of, int8_t v) {
+        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+/**
+ * Write int16_t to buffer.
+ * The value will be endian-swapped before write.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
+					       int16_t v) {
+        v = htobe16(v);
+        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int16_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i16()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
+                                              size_t of, int16_t v) {
+        v = htobe16(v);
+        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+/**
+ * Write int32_t to buffer.
+ * The value will be endian-swapped before write.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
+                                               int32_t v) {
+        v = htobe32(v);
+        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int32_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i32()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
+                                              size_t of, int32_t v) {
+        v = htobe32(v);
+        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+/**
+ * Update int32_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i32()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
+                                              size_t of, uint32_t v) {
+        v = htobe32(v);
+        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+
+/**
+ * Write int64_t to buffer.
+ * The value will be endian-swapped before write.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf, int64_t v) {
+        v = htobe64(v);
+        return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int64_t in buffer at address 'ptr'.
+ * 'of' should have been previously returned by `.._buf_write_i64()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
+                                              size_t of, int64_t v) {
+        v = htobe64(v);
+        rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+
+/**
+ * Write (copy) Kafka string to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
+                                                const rd_kafkap_str_t *kstr) {
+        return rd_kafka_buf_write(rkbuf, RD_KAFKAP_STR_SER(kstr),
+				  RD_KAFKAP_STR_SIZE(kstr));
+}
+
+/**
+ * Write (copy) char * string to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
+                                               const char *str, size_t len) {
+        size_t r;
+        if (!str)
+                len = RD_KAFKAP_STR_LEN_NULL;
+        else if (len == (size_t)-1)
+                len = strlen(str);
+        r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
+        if (str)
+                rd_kafka_buf_write(rkbuf, str, len);
+        return r;
+}
+
+
+/**
+ * Push (i.e., no copy) Kafka string to buffer iovec
+ */
+static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
+                                             const rd_kafkap_str_t *kstr) {
+	rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
+			  RD_KAFKAP_STR_SIZE(kstr), NULL);
+}
+
+
+
+/**
+ * Write (copy) Kafka bytes to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
+					          const rd_kafkap_bytes_t *kbytes){
+        return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
+                                  RD_KAFKAP_BYTES_SIZE(kbytes));
+}
+
+/**
+ * Push (i.e., no copy) Kafka bytes to buffer iovec
+ */
+static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
+					       const rd_kafkap_bytes_t *kbytes){
+	rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
+			  RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
+}
+
+/**
+ * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
+                                                 const void *payload, size_t size) {
+        size_t r;
+        if (!payload)
+                size = RD_KAFKAP_BYTES_LEN_NULL;
+        r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
+        if (payload)
+                rd_kafka_buf_write(rkbuf, payload, size);
+        return r;
+}
+
+
+
+
+/**
+ * Write Kafka Message to buffer
+ * The number of bytes written is returned in '*outlenp'.
+ *
+ * Returns the buffer offset of the first byte.
+ */
+size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
+				   rd_kafka_buf_t *rkbuf,
+				   int64_t Offset, int8_t MagicByte,
+				   int8_t Attributes, int64_t Timestamp,
+				   const void *key, int32_t key_len,
+				   const void *payload, int32_t len,
+				   int *outlenp);
+
+/**
+ * Start calculating CRC from now and track it in '*crcp'.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
+	rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
+	rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
+	rkbuf->rkbuf_crc = rd_crc32_init();
+}
+
+/**
+ * Finalizes CRC calculation and returns the calculated checksum.
+ */
+static RD_INLINE RD_UNUSED
+rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
+	rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
+	return rd_crc32_finalize(rkbuf->rkbuf_crc);
+}
+
+
+
+
+
+/**
+ * @brief Check if buffer's replyq.version is outdated.
+ * @param rkbuf: may be NULL, for convenience.
+ *
+ * @returns 1 if this is an outdated buffer, else 0.
+ */
+static RD_UNUSED RD_INLINE int
+rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
+        return rkbuf && rkbuf->rkbuf_replyq.version &&
+                rkbuf->rkbuf_replyq.version < version;
+}