You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:51 UTC
[21/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
new file mode 100644
index 0000000..a30f2bd
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
@@ -0,0 +1,328 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "rdkafka_feature.h"
+
+
+extern const char *rd_kafka_broker_state_names[];
+extern const char *rd_kafka_secproto_names[];
+
+struct rd_kafka_broker_s { /* rd_kafka_broker_t */
+ TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;
+
+ int32_t rkb_nodeid;
+#define RD_KAFKA_NODEID_UA -1
+
+ rd_sockaddr_list_t *rkb_rsal;
+ time_t rkb_t_rsal_last;
+ const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */
+
+ rd_kafka_transport_t *rkb_transport;
+
+ uint32_t rkb_corrid;
+ int rkb_connid; /* Connection id, increased by
+ * one for each connection by
+ * this broker. Used as a safe-guard
+ * to help troubleshooting buffer
+ * problems across disconnects. */
+
+ rd_kafka_q_t *rkb_ops;
+
+ mtx_t rkb_lock;
+
+ int rkb_blocking_max_ms; /* Maximum IO poll blocking
+ * time. */
+
+ /* Toppars handled by this broker */
+ TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
+ int rkb_toppar_cnt;
+
+ /* Underflowed toppars that are eligible for fetching. */
+ CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_fetch_toppars;
+ int rkb_fetch_toppar_cnt;
+ rd_kafka_toppar_t *rkb_fetch_toppar_next; /* Next 'first' toppar
+ * in fetch list.
+ * This is used for
+ * round-robin. */
+
+
+ rd_kafka_cgrp_t *rkb_cgrp;
+
+ rd_ts_t rkb_ts_fetch_backoff;
+ int rkb_fetching;
+
+ enum {
+ RD_KAFKA_BROKER_STATE_INIT,
+ RD_KAFKA_BROKER_STATE_DOWN,
+ RD_KAFKA_BROKER_STATE_CONNECT,
+ RD_KAFKA_BROKER_STATE_AUTH,
+
+ /* Any state >= STATE_UP means the Kafka protocol layer
+ * is operational (to some degree). */
+ RD_KAFKA_BROKER_STATE_UP,
+ RD_KAFKA_BROKER_STATE_UPDATE,
+ RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
+ RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
+ } rkb_state;
+
+ rd_ts_t rkb_ts_state; /* Timestamp of last
+ * state change */
+ rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan
+ * interval. */
+
+ rd_atomic32_t rkb_blocking_request_cnt; /* The number of
+ * in-flight blocking
+ * requests.
+ * A blocking request is
+ * one that is known to
+ * possibly block on the
+ * broker for longer than
+ * the typical processing
+ * time, e.g.:
+ * JoinGroup, SyncGroup */
+
+ int rkb_features; /* Protocol features supported
+ * by this broker.
+ * See RD_KAFKA_FEATURE_* in
+ * rdkafka_proto.h */
+
+ struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
+ * (MUST be sorted) */
+ size_t rkb_ApiVersions_cnt;
+ rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long
+ * the fallback proto
+ * will be used after
+ * ApiVersionRequest
+ * failure. */
+
+ rd_kafka_confsource_t rkb_source;
+ struct {
+ rd_atomic64_t tx_bytes;
+ rd_atomic64_t tx; /* Kafka-messages (not payload msgs) */
+ rd_atomic64_t tx_err;
+ rd_atomic64_t tx_retries;
+ rd_atomic64_t req_timeouts; /* Accumulated value */
+
+ rd_atomic64_t rx_bytes;
+ rd_atomic64_t rx; /* Kafka messages (not payload msgs) */
+ rd_atomic64_t rx_err;
+ rd_atomic64_t rx_corrid_err; /* CorrId misses */
+ rd_atomic64_t rx_partial; /* Partial messages received
+ * and dropped. */
+ rd_atomic64_t zbuf_grow; /* Compression/decompression buffer grows needed */
+ rd_atomic64_t buf_grow; /* rkbuf grows needed */
+ rd_atomic64_t wakeups; /* Poll wakeups */
+ } rkb_c;
+
+ int rkb_req_timeouts; /* Current value */
+
+ rd_ts_t rkb_ts_metadata_poll; /* Next metadata poll time */
+ int rkb_metadata_fast_poll_cnt; /* Perform fast
+ * metadata polls. */
+ thrd_t rkb_thread;
+
+ rd_refcnt_t rkb_refcnt;
+
+ rd_kafka_t *rkb_rk;
+
+ rd_kafka_buf_t *rkb_recv_buf;
+
+ int rkb_max_inflight; /* Maximum number of in-flight
+ * requests to broker.
+ * Compared to rkb_waitresps length.*/
+ rd_kafka_bufq_t rkb_outbufs;
+ rd_kafka_bufq_t rkb_waitresps;
+ rd_kafka_bufq_t rkb_retrybufs;
+
+ rd_avg_t rkb_avg_int_latency;/* Current internal latency period*/
+ rd_avg_t rkb_avg_rtt; /* Current RTT period */
+ rd_avg_t rkb_avg_throttle; /* Current throttle period */
+
+ /* These are all protected by rkb_lock */
+ char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */
+ char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
+ uint16_t rkb_port; /* TCP port */
+ char *rkb_origname; /* Original
+ * host name */
+
+
+ /* Logging name is a copy of rkb_name, protected by its own mutex */
+ char *rkb_logname;
+ mtx_t rkb_logname_lock;
+
+ int rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake
+ * up from IO-wait when
+ * queues have content. */
+ int rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd,
+ * this is rkb_wakeup_fd[1]
+ * if enabled. */
+ rd_interval_t rkb_connect_intvl; /* Reconnect throttling */
+
+ rd_kafka_secproto_t rkb_proto;
+
+ int rkb_down_reported; /* Down event reported */
+#if WITH_SASL_CYRUS
+ rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr;
+#endif
+
+
+ struct {
+ char msg[512];
+ int err; /* errno */
+ } rkb_err;
+};
+
+#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
+#define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock)
+#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)
+
+
+/**
+ * @brief Broker comparator
+ */
+static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a,
+ const void *_b) {
+ const rd_kafka_broker_t *a = _a, *b = _b;
+ return (int)(a - b);
+}
+
+
+/**
+ * @returns true if broker supports \p features, else false.
+ */
+static RD_UNUSED
+int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) {
+ int r;
+ rd_kafka_broker_lock(rkb);
+ r = (rkb->rkb_features & features) == features;
+ rd_kafka_broker_unlock(rkb);
+ return r;
+}
+
+int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
+ int16_t ApiKey,
+ int16_t minver, int16_t maxver,
+ int *featuresp);
+
+int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb);
+
+rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk,
+ int32_t nodeid);
+rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk,
+ int32_t nodeid,
+ int state);
+#define rd_kafka_broker_find_by_nodeid(rk,nodeid) \
+ rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1)
+
+/**
+ * Filter out brokers that are currently in a blocking request.
+ */
+static RD_INLINE RD_UNUSED int
+rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) {
+ return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0;
+}
+
+/**
+ * Filter out brokers that cant do GroupCoordinator requests right now.
+ */
+static RD_INLINE RD_UNUSED int
+rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) {
+ return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 ||
+ !(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD);
+}
+
+rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
+ int (*filter) (rd_kafka_broker_t *rkb,
+ void *opaque),
+ void *opaque);
+
+rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms,
+ int do_lock);
+
+rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, int state);
+
+int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
+void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
+
+void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
+ int level, rd_kafka_resp_err_t err,
+ const char *fmt, ...);
+
+void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);
+
+#define rd_kafka_broker_destroy(rkb) \
+ rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \
+ rd_kafka_broker_destroy_final(rkb))
+
+
+void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
+ const struct rd_kafka_metadata_broker *mdb);
+rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
+ rd_kafka_confsource_t source,
+ rd_kafka_secproto_t proto,
+ const char *name, uint16_t port,
+ int32_t nodeid);
+
+void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb);
+void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr);
+
+int rd_kafka_send (rd_kafka_broker_t *rkb);
+int rd_kafka_recv (rd_kafka_broker_t *rkb);
+
+void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
+ rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err);
+
+void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
+
+
+rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk);
+
+void msghdr_print (rd_kafka_t *rk,
+ const char *what, const struct msghdr *msg,
+ int hexdump);
+
+const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb);
+void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb);
+
+int rd_kafka_brokers_get_state_version (rd_kafka_t *rk);
+int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
+ int timeout_ms);
+void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
new file mode 100644
index 0000000..9b50737
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
@@ -0,0 +1,428 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_buf.h"
+#include "rdkafka_broker.h"
+
+void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf) {
+
+ switch (rkbuf->rkbuf_reqhdr.ApiKey)
+ {
+ case RD_KAFKAP_Metadata:
+ if (rkbuf->rkbuf_u.Metadata.topics)
+ rd_list_destroy(rkbuf->rkbuf_u.Metadata.topics);
+ if (rkbuf->rkbuf_u.Metadata.reason)
+ rd_free(rkbuf->rkbuf_u.Metadata.reason);
+ if (rkbuf->rkbuf_u.Metadata.rko)
+ rd_kafka_op_reply(rkbuf->rkbuf_u.Metadata.rko,
+ RD_KAFKA_RESP_ERR__DESTROY);
+ if (rkbuf->rkbuf_u.Metadata.decr) {
+ /* Decrease metadata cache's full_.._sent state. */
+ mtx_lock(rkbuf->rkbuf_u.Metadata.decr_lock);
+ rd_kafka_assert(NULL,
+ (*rkbuf->rkbuf_u.Metadata.decr) > 0);
+ (*rkbuf->rkbuf_u.Metadata.decr)--;
+ mtx_unlock(rkbuf->rkbuf_u.Metadata.decr_lock);
+ }
+ break;
+ }
+
+ if (rkbuf->rkbuf_response)
+ rd_kafka_buf_destroy(rkbuf->rkbuf_response);
+
+ rd_kafka_replyq_destroy(&rkbuf->rkbuf_replyq);
+ rd_kafka_replyq_destroy(&rkbuf->rkbuf_orig_replyq);
+
+ rd_buf_destroy(&rkbuf->rkbuf_buf);
+
+ if (rkbuf->rkbuf_rktp_vers)
+ rd_list_destroy(rkbuf->rkbuf_rktp_vers);
+
+ if (rkbuf->rkbuf_rkb)
+ rd_kafka_broker_destroy(rkbuf->rkbuf_rkb);
+
+ rd_refcnt_destroy(&rkbuf->rkbuf_refcnt);
+
+ rd_free(rkbuf);
+}
+
+
+
+/**
+ * @brief Pushes \p buf of size \p len as a new segment on the buffer.
+ *
+ * \p buf will NOT be freed by the buffer.
+ */
+void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
+ int allow_crc_calc, void (*free_cb) (void *)) {
+ rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
+
+ if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
+ rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
+}
+
+
+
+/**
+ * @brief Create a new buffer with \p segcmt initial segments and \p size bytes
+ * of initial backing memory.
+ * The underlying buffer will grow as needed.
+ *
+ * If \p rk is non-NULL (typical case):
+ * Additional space for the Kafka protocol headers is inserted automatically.
+ */
+rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_calloc(1, sizeof(*rkbuf));
+
+ rkbuf->rkbuf_flags = flags;
+
+ rd_buf_init(&rkbuf->rkbuf_buf, segcnt, size);
+ rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
+ rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
+
+ return rkbuf;
+}
+
+
+/**
+ * @brief Create new request buffer with the request-header written (will
+ * need to be updated with Length, etc, later)
+ */
+rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
+ int segcnt, size_t size) {
+ rd_kafka_buf_t *rkbuf;
+
+ /* Make room for common protocol request headers */
+ size += RD_KAFKAP_REQHDR_SIZE +
+ RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);
+ segcnt += 1; /* headers */
+
+ rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
+
+ rkbuf->rkbuf_rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+
+ rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
+
+ /* Write request header, will be updated later. */
+ /* Length: updated later */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+ /* ApiKey */
+ rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
+ /* ApiVersion: updated later */
+ rd_kafka_buf_write_i16(rkbuf, 0);
+ /* CorrId: updated later */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+
+ /* ClientId */
+ rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
+
+ return rkbuf;
+}
+
+
+
+
+/**
+ * @brief Create new read-only rkbuf shadowing a memory region.
+ *
+ * @remark \p free_cb (possibly NULL) will be used to free \p ptr when
+ * buffer refcount reaches 0.
+ * @remark the buffer may only be read from, not written to.
+ */
+rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
+ void (*free_cb) (void *)) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_calloc(1, sizeof(*rkbuf));
+
+ rkbuf->rkbuf_reqhdr.ApiKey = RD_KAFKAP_None;
+
+ rd_buf_init(&rkbuf->rkbuf_buf, 1, 0);
+ rd_buf_push(&rkbuf->rkbuf_buf, ptr, size, free_cb);
+
+ rkbuf->rkbuf_totlen = size;
+
+ /* Initialize reader slice */
+ rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
+
+ rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
+
+ rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
+
+ return rkbuf;
+}
+
+
+
+void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
+ TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
+ (void)rd_atomic32_add(&rkbufq->rkbq_cnt, 1);
+ (void)rd_atomic32_add(&rkbufq->rkbq_msg_cnt,
+ rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
+}
+
+void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
+ TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
+ rd_kafka_assert(NULL, rd_atomic32_get(&rkbufq->rkbq_cnt) > 0);
+ (void)rd_atomic32_sub(&rkbufq->rkbq_cnt, 1);
+ (void)rd_atomic32_sub(&rkbufq->rkbq_msg_cnt,
+ rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
+}
+
+void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
+ TAILQ_INIT(&rkbufq->rkbq_bufs);
+ rd_atomic32_init(&rkbufq->rkbq_cnt, 0);
+ rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
+}
+
+/**
+ * Concat all buffers from 'src' to tail of 'dst'
+ */
+void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
+ TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
+ (void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
+ (void)rd_atomic32_add(&dst->rkbq_msg_cnt, rd_atomic32_get(&src->rkbq_msg_cnt));
+ rd_kafka_bufq_init(src);
+}
+
+/**
+ * Purge the wait-response queue.
+ * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
+ * or rkb_outbufs since buffers may be re-enqueued on those queues.
+ * 'rkbufq' needs to be bufq_init():ed before reuse after this call.
+ */
+void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
+ rd_kafka_bufq_t *rkbufq,
+ rd_kafka_resp_err_t err) {
+ rd_kafka_buf_t *rkbuf, *tmp;
+
+ rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
+
+ rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
+ rd_atomic32_get(&rkbufq->rkbq_cnt));
+
+ TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
+ rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
+ }
+}
+
+
+/**
+ * @brief Update bufq for connection reset:
+ *
+ * - Purge connection-setup API requests from the queue.
+ * - Reset any partially sent buffer's offset. (issue #756)
+ *
+ * Request types purged:
+ * ApiVersion
+ * SaslHandshake
+ */
+void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
+ rd_kafka_bufq_t *rkbufq) {
+ rd_kafka_buf_t *rkbuf, *tmp;
+
+ rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
+
+ rd_rkb_dbg(rkb, QUEUE, "BUFQ",
+ "Updating %d buffers on connection reset",
+ rd_atomic32_get(&rkbufq->rkbq_cnt));
+
+ TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
+ switch (rkbuf->rkbuf_reqhdr.ApiKey)
+ {
+ case RD_KAFKAP_ApiVersion:
+ case RD_KAFKAP_SaslHandshake:
+ rd_kafka_bufq_deq(rkbufq, rkbuf);
+ rd_kafka_buf_callback(rkb->rkb_rk, rkb,
+ RD_KAFKA_RESP_ERR__DESTROY,
+ NULL, rkbuf);
+ break;
+ default:
+ /* Reset buffer send position */
+ rd_slice_seek(&rkbuf->rkbuf_reader, 0);
+ break;
+ }
+ }
+}
+
+
+void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
+ rd_kafka_bufq_t *rkbq) {
+ rd_kafka_buf_t *rkbuf;
+ int cnt = rd_kafka_bufq_cnt(rkbq);
+ rd_ts_t now;
+
+ if (!cnt)
+ return;
+
+ now = rd_clock();
+
+ rd_rkb_dbg(rkb, BROKER, fac, "bufq with %d buffer(s):", cnt);
+
+ TAILQ_FOREACH(rkbuf, &rkbq->rkbq_bufs, rkbuf_link) {
+ rd_rkb_dbg(rkb, BROKER, fac,
+ " Buffer %s (%"PRIusz" bytes, corrid %"PRId32", "
+ "connid %d, retry %d in %lldms, timeout in %lldms",
+ rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
+ rkbuf->rkbuf_totlen, rkbuf->rkbuf_corrid,
+ rkbuf->rkbuf_connid, rkbuf->rkbuf_retries,
+ rkbuf->rkbuf_ts_retry ?
+ (now - rkbuf->rkbuf_ts_retry) / 1000LL : 0,
+ rkbuf->rkbuf_ts_timeout ?
+ (now - rkbuf->rkbuf_ts_timeout) / 1000LL : 0);
+ }
+}
+
+
+
+
+/**
+ * Retry failed request, depending on the error.
+ * @remark \p rkb may be NULL
+ * Returns 1 if the request was scheduled for retry, else 0.
+ */
+int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
+
+ if (unlikely(!rkb ||
+ rkb->rkb_source == RD_KAFKA_INTERNAL ||
+ rd_kafka_terminating(rkb->rkb_rk) ||
+ rkbuf->rkbuf_retries + 1 >
+ rkb->rkb_rk->rk_conf.max_retries))
+ return 0;
+
+ /* Try again */
+ rkbuf->rkbuf_ts_sent = 0;
+ rkbuf->rkbuf_retries++;
+ rd_kafka_buf_keep(rkbuf);
+ rd_kafka_broker_buf_retry(rkb, rkbuf);
+ return 1;
+}
+
+
+/**
+ * @brief Handle RD_KAFKA_OP_RECV_BUF.
+ */
+void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
+ rd_kafka_buf_t *request, *response;
+
+ request = rko->rko_u.xbuf.rkbuf;
+ rko->rko_u.xbuf.rkbuf = NULL;
+
+ /* NULL on op_destroy() */
+ if (request->rkbuf_replyq.q) {
+ int32_t version = request->rkbuf_replyq.version;
+ /* Current queue usage is done, but retain original replyq for
+ * future retries, stealing
+ * the current reference. */
+ request->rkbuf_orig_replyq = request->rkbuf_replyq;
+ rd_kafka_replyq_clear(&request->rkbuf_replyq);
+ /* Callback might need to version check so we retain the
+ * version across the clear() call which clears it. */
+ request->rkbuf_replyq.version = version;
+ }
+
+ if (!request->rkbuf_cb) {
+ rd_kafka_buf_destroy(request);
+ return;
+ }
+
+ /* Let buf_callback() do destroy()s */
+ response = request->rkbuf_response; /* May be NULL */
+ request->rkbuf_response = NULL;
+
+ rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
+ request->rkbuf_rkb, err,
+ response, request);
+}
+
+
+
+/**
+ * Call request.rkbuf_cb(), but:
+ * - if the rkbuf has a rkbuf_replyq the buffer is enqueued on that queue
+ * with op type RD_KAFKA_OP_RECV_BUF.
+ * - else call rkbuf_cb().
+ *
+ * \p response may be NULL.
+ *
+ * Will decrease refcount for both response and request, eventually.
+ */
+void rd_kafka_buf_callback (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *response, rd_kafka_buf_t *request){
+
+ /* Decide if the request should be retried.
+ * This is always done in the originating broker thread. */
+ if (unlikely(err && err != RD_KAFKA_RESP_ERR__DESTROY &&
+ rd_kafka_buf_retry(rkb, request))) {
+ /* refcount for retry was increased in buf_retry() so we can
+ * let go of this caller's refcounts. */
+ rd_kafka_buf_destroy(request);
+ if (response)
+ rd_kafka_buf_destroy(response);
+ return;
+ }
+
+ if (err != RD_KAFKA_RESP_ERR__DESTROY && request->rkbuf_replyq.q) {
+ rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
+
+ rd_kafka_assert(NULL, !request->rkbuf_response);
+ request->rkbuf_response = response;
+
+ /* Increment refcnt since rko_rkbuf will be decref:ed
+ * if replyq_enq() fails and we dont want the rkbuf gone in that
+ * case. */
+ rd_kafka_buf_keep(request);
+ rko->rko_u.xbuf.rkbuf = request;
+
+ rko->rko_err = err;
+
+ /* Copy original replyq for future retries, with its own
+ * queue reference. */
+ rd_kafka_replyq_copy(&request->rkbuf_orig_replyq,
+ &request->rkbuf_replyq);
+
+ rd_kafka_replyq_enq(&request->rkbuf_replyq, rko, 0);
+
+ rd_kafka_buf_destroy(request); /* from keep above */
+ return;
+ }
+
+ if (request->rkbuf_cb)
+ request->rkbuf_cb(rk, rkb, err, response, request,
+ request->rkbuf_opaque);
+
+ rd_kafka_buf_destroy(request);
+ if (response)
+ rd_kafka_buf_destroy(response);
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
new file mode 100644
index 0000000..5aa2876
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
@@ -0,0 +1,819 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#pragma once
+
+#include "rdkafka_int.h"
+#include "rdcrc32.h"
+#include "rdlist.h"
+#include "rdbuf.h"
+
+
+typedef struct rd_kafka_broker_s rd_kafka_broker_t;
+
+#define RD_KAFKA_HEADERS_IOV_CNT 2
+
+
+/**
+ * Temporary buffer with memory aligned writes to accommodate
+ * effective and platform safe struct writes.
+ */
+typedef struct rd_tmpabuf_s {
+ size_t size;
+ size_t of;
+ char *buf;
+ int failed;
+ int assert_on_fail;
+} rd_tmpabuf_t;
+
+/**
+ * @brief Allocate new tmpabuf with \p size bytes pre-allocated.
+ */
+static RD_UNUSED void
+rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
+ tab->buf = rd_malloc(size);
+ tab->size = size;
+ tab->of = 0;
+ tab->failed = 0;
+ tab->assert_on_fail = assert_on_fail;
+}
+
+/**
+ * @brief Free memory allocated by tmpabuf
+ */
+static RD_UNUSED void
+rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
+ rd_free(tab->buf);
+}
+
+/**
+ * @returns 1 if a previous operation failed.
+ */
+static RD_UNUSED RD_INLINE int
+rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
+ return tab->failed;
+}
+
+/**
+ * @brief Allocate \p size bytes for writing, returning an aligned pointer
+ * to the memory.
+ * @returns the allocated pointer (within the tmpabuf) on success or
+ * NULL if the requested number of bytes + alignment is not available
+ * in the tmpabuf.
+ */
+static RD_UNUSED void *
+rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
+ void *ptr;
+
+ if (unlikely(tab->failed))
+ return NULL;
+
+ if (unlikely(tab->of + size > tab->size)) {
+ if (tab->assert_on_fail) {
+ fprintf(stderr,
+ "%s: %s:%d: requested size %zd + %zd > %zd\n",
+ __FUNCTION__, func, line, tab->of, size,
+ tab->size);
+ assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
+ }
+ return NULL;
+ }
+
+ ptr = (void *)(tab->buf + tab->of);
+ tab->of += RD_ROUNDUP(size, 8);
+
+ return ptr;
+}
+
+#define rd_tmpabuf_alloc(tab,size) \
+ rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
+
+/**
+ * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
+ *
+ * @returns the allocated and written-to pointer (within the tmpabuf) on success
+ * or NULL if the requested number of bytes + alignment is not available
+ * in the tmpabuf.
+ */
+static RD_UNUSED void *
+rd_tmpabuf_write0 (const char *func, int line,
+ rd_tmpabuf_t *tab, const void *buf, size_t size) {
+ void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
+
+ if (ptr)
+ memcpy(ptr, buf, size);
+
+ return ptr;
+}
+#define rd_tmpabuf_write(tab,buf,size) \
+ rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
+
+
+/**
+ * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
+ */
+static RD_UNUSED char *
+rd_tmpabuf_write_str0 (const char *func, int line,
+ rd_tmpabuf_t *tab, const char *str) {
+ return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
+}
+#define rd_tmpabuf_write_str(tab,str) \
+ rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
+
+
+
+/**
+ * @name Read buffer interface
+ *
+ * Memory reading helper macros to be used when parsing network responses.
+ *
+ * Assumptions:
+ * - an 'err_parse:' goto-label must be available for error bailouts,
+ * the error code will be set in rkbuf->rkbuf_err
+ * - local `int log_decode_errors` variable set to the logging level
+ * to log parse errors (or 0 to turn off logging).
+ */
+
+#define rd_kafka_buf_parse_fail(rkbuf,...) do { \
+ if (log_decode_errors > 0) { \
+ rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
+ rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
+ "PROTOERR", \
+ "Protocol parse failure " \
+ "at %"PRIusz"/%"PRIusz" (%s:%i) " \
+ "(incorrect broker.version.fallback?)", \
+ rd_slice_offset(&rkbuf->rkbuf_reader), \
+ rd_slice_size(&rkbuf->rkbuf_reader), \
+ __FUNCTION__, __LINE__); \
+ rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
+ "PROTOERR", __VA_ARGS__); \
+ } \
+ (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
+ goto err_parse; \
+ } while (0)
+
+
+
+/**
+ * Returns the number of remaining bytes available to read.
+ */
+#define rd_kafka_buf_read_remain(rkbuf) \
+ rd_slice_remains(&(rkbuf)->rkbuf_reader)
+
+/**
+ * Checks that at least 'len' bytes remain to be read in buffer, else fails.
+ */
+#define rd_kafka_buf_check_len(rkbuf,len) do { \
+ size_t __len0 = (size_t)(len); \
+ if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
+ rd_kafka_buf_parse_fail( \
+ rkbuf, \
+ "expected %"PRIusz" bytes > %"PRIusz \
+ " remaining bytes", \
+ __len0, rd_kafka_buf_read_remain(rkbuf)); \
+ (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
+ goto err_parse; \
+ } \
+ } while (0)
+
+/**
+ * Skip (as in read and ignore) the next 'len' bytes.
+ */
+#define rd_kafka_buf_skip(rkbuf, len) do { \
+ size_t __len1 = (size_t)(len); \
+ if (__len1 && \
+ !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
+ rd_kafka_buf_check_len(rkbuf, __len1); \
+ } while (0)
+
+/**
+ * Skip (as in read and ignore) up to fixed position \p pos.
+ */
+#define rd_kafka_buf_skip_to(rkbuf, pos) do { \
+ size_t __len1 = (size_t)(pos) - \
+ rd_slice_offset(&(rkbuf)->rkbuf_reader); \
+ if (__len1 && \
+ !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
+ rd_kafka_buf_check_len(rkbuf, __len1); \
+ } while (0)
+
+
+
+/**
+ * Read 'len' bytes and copy to 'dstptr'
+ */
+#define rd_kafka_buf_read(rkbuf,dstptr,len) do { \
+ size_t __len2 = (size_t)(len); \
+ if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2)) \
+ rd_kafka_buf_check_len(rkbuf, __len2); \
+ } while (0)
+
+
+/**
+ * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
+ * without affecting the current reader position.
+ */
+#define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do { \
+ size_t __len2 = (size_t)(len); \
+ if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset, \
+ dstptr, __len2)) \
+ rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
+ } while (0)
+
+
+/**
+ * Read a 16,32,64-bit integer and store it in 'dstptr'
+ */
+#define rd_kafka_buf_read_i64(rkbuf,dstptr) do { \
+ int64_t _v; \
+ rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
+ *(dstptr) = be64toh(_v); \
+ } while (0)
+
+#define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do { \
+ int64_t _v; \
+ rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \
+ *(dstptr) = be64toh(_v); \
+ } while (0)
+
+#define rd_kafka_buf_read_i32(rkbuf,dstptr) do { \
+ int32_t _v; \
+ rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
+ *(dstptr) = be32toh(_v); \
+ } while (0)
+
+/* Same as .._read_i32 but does a direct assignment.
+ * dst is assumed to be a scalar, not pointer. */
+#define rd_kafka_buf_read_i32a(rkbuf, dst) do { \
+ int32_t _v; \
+ rd_kafka_buf_read(rkbuf, &_v, 4); \
+ dst = (int32_t) be32toh(_v); \
+ } while (0)
+
+#define rd_kafka_buf_read_i16(rkbuf,dstptr) do { \
+ int16_t _v; \
+ rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
+ *(dstptr) = be16toh(_v); \
+ } while (0)
+
+
+#define rd_kafka_buf_read_i16a(rkbuf, dst) do { \
+ int16_t _v; \
+ rd_kafka_buf_read(rkbuf, &_v, 2); \
+ dst = (int16_t)be16toh(_v); \
+ } while (0)
+
+#define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
+
+#define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
+
+
+/**
+ * @brief Read varint and store in int64_t \p dst
+ */
+#define rd_kafka_buf_read_varint(rkbuf,dst) do { \
+ int64_t _v; \
+ size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \
+ if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
+ rd_kafka_buf_parse_fail(rkbuf, \
+ "varint parsing failed: " \
+ "buffer underflow"); \
+ *(dst) = _v; \
+ } while (0)
+
+/* Read Kafka String representation (2+N).
+ * The kstr data will be updated to point to the rkbuf. */
+#define rd_kafka_buf_read_str(rkbuf, kstr) do { \
+ int _klen; \
+ rd_kafka_buf_read_i16a(rkbuf, (kstr)->len); \
+ _klen = RD_KAFKAP_STR_LEN(kstr); \
+ if (RD_KAFKAP_STR_LEN0(_klen) == 0) \
+ (kstr)->str = NULL; \
+ else if (!((kstr)->str = \
+ rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
+ _klen))) \
+ rd_kafka_buf_check_len(rkbuf, _klen); \
+ } while (0)
+
+/* Read Kafka String representation (2+N) and write it to the \p tmpabuf
+ * with a trailing nul byte. */
+#define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do { \
+ rd_kafkap_str_t _kstr; \
+ size_t _slen; \
+ char *_dst; \
+ rd_kafka_buf_read_str(rkbuf, &_kstr); \
+ _slen = RD_KAFKAP_STR_LEN(&_kstr); \
+ if (!(_dst = \
+ rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1))) \
+ rd_kafka_buf_parse_fail( \
+ rkbuf, \
+ "Not enough room in tmpabuf: " \
+ "%"PRIusz"+%"PRIusz \
+ " > %"PRIusz, \
+ (tmpabuf)->of, _slen+1, (tmpabuf)->size); \
+ _dst[_slen] = '\0'; \
+ dst = (void *)_dst; \
+ } while (0)
+
+/**
+ * Skip a string.
+ */
+#define rd_kafka_buf_skip_str(rkbuf) do { \
+ int16_t _slen; \
+ rd_kafka_buf_read_i16(rkbuf, &_slen); \
+ rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen)); \
+ } while (0)
+
+/* Read Kafka Bytes representation (4+N).
+ * The 'kbytes' will be updated to point to rkbuf data */
+#define rd_kafka_buf_read_bytes(rkbuf, kbytes) do { \
+ int _klen; \
+ rd_kafka_buf_read_i32a(rkbuf, _klen); \
+ (kbytes)->len = _klen; \
+ if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
+ (kbytes)->data = NULL; \
+ (kbytes)->len = 0; \
+ } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
+ (kbytes)->data = ""; \
+ else if (!((kbytes)->data = \
+ rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
+ _klen))) \
+ rd_kafka_buf_check_len(rkbuf, _klen); \
+ } while (0)
+
+
+/**
+ * @brief Read \p size bytes from buffer, setting \p *ptr to the start
+ * of the memory region.
+ */
+#define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do { \
+ size_t _klen = size; \
+ if (!(*(ptr) = (void *) \
+ rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
+ rd_kafka_buf_check_len(rkbuf, _klen); \
+ } while (0)
+
+
+/**
+ * @brief Read varint-lengted Kafka Bytes representation
+ */
+#define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do { \
+ int64_t _len2; \
+ size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \
+ &_len2); \
+ if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
+ rd_kafka_buf_parse_fail(rkbuf, \
+ "varint parsing failed: " \
+ "buffer underflow"); \
+ (kbytes)->len = (int32_t)_len2; \
+ if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
+ (kbytes)->data = NULL; \
+ (kbytes)->len = 0; \
+ } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
+ (kbytes)->data = ""; \
+ else if (!((kbytes)->data = \
+ rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
+ _len2))) \
+ rd_kafka_buf_check_len(rkbuf, _len2); \
+ } while (0)
+
+
+/**
+ * Response handling callback.
+ *
+ * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
+ * which indicates that some entity is terminating (rd_kafka_t, broker,
+ * toppar, queue, etc) and the callback may not be called in the
+ * correct thread. In this case the callback must perform just
+ * the most minimal cleanup and dont trigger any other operations.
+ *
+ * NOTE: rkb, reply and request may be NULL, depending on error situation.
+ */
+typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque);
+
+struct rd_kafka_buf_s { /* rd_kafka_buf_t */
+ TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
+
+ int32_t rkbuf_corrid;
+
+ rd_ts_t rkbuf_ts_retry; /* Absolute send retry time */
+
+ int rkbuf_flags; /* RD_KAFKA_OP_F */
+
+ rd_buf_t rkbuf_buf; /**< Send/Recv byte buffer */
+ rd_slice_t rkbuf_reader; /**< Buffer slice reader for rkbuf_buf */
+
+ int rkbuf_connid; /* broker connection id (used when buffer
+ * was partially sent). */
+ size_t rkbuf_totlen; /* recv: total expected length,
+ * send: not used */
+
+ rd_crc32_t rkbuf_crc; /* Current CRC calculation */
+
+ struct rd_kafkap_reqhdr rkbuf_reqhdr; /* Request header.
+ * These fields are encoded
+ * and written to output buffer
+ * on buffer finalization. */
+ struct rd_kafkap_reshdr rkbuf_reshdr; /* Response header.
+ * Decoded fields are copied
+ * here from the buffer
+ * to provide an ease-of-use
+ * interface to the header */
+
+ int32_t rkbuf_expected_size; /* expected size of message */
+
+ rd_kafka_replyq_t rkbuf_replyq; /* Enqueue response on replyq */
+ rd_kafka_replyq_t rkbuf_orig_replyq; /* Original replyq to be used
+ * for retries from inside
+ * the rkbuf_cb() callback
+ * since rkbuf_replyq will
+ * have been reset. */
+ rd_kafka_resp_cb_t *rkbuf_cb; /* Response callback */
+ struct rd_kafka_buf_s *rkbuf_response; /* Response buffer */
+
+ struct rd_kafka_broker_s *rkbuf_rkb;
+
+ rd_refcnt_t rkbuf_refcnt;
+ void *rkbuf_opaque;
+
+ int rkbuf_retries; /* Retries so far. */
+#define RD_KAFKA_BUF_NO_RETRIES 1000000 /* Do not retry */
+
+ int rkbuf_features; /* Required feature(s) that must be
+ * supported by broker. */
+
+ rd_ts_t rkbuf_ts_enq;
+ rd_ts_t rkbuf_ts_sent; /* Initially: Absolute time of transmission,
+ * after response: RTT. */
+ rd_ts_t rkbuf_ts_timeout;
+
+ int64_t rkbuf_offset; /* Used by OffsetCommit */
+
+ rd_list_t *rkbuf_rktp_vers; /* Toppar + Op Version map.
+ * Used by FetchRequest. */
+
+ rd_kafka_msgq_t rkbuf_msgq;
+
+ rd_kafka_resp_err_t rkbuf_err; /* Buffer parsing error code */
+
+ union {
+ struct {
+ rd_list_t *topics; /* Requested topics (char *) */
+ char *reason; /* Textual reason */
+ rd_kafka_op_t *rko; /* Originating rko with replyq
+ * (if any) */
+ int all_topics; /* Full/All topics requested */
+
+ int *decr; /* Decrement this integer by one
+ * when request is complete:
+ * typically points to metadata
+ * cache's full_.._sent.
+ * Will be performed with
+ * decr_lock held. */
+ mtx_t *decr_lock;
+
+ } Metadata;
+ } rkbuf_u;
+};
+
+
+typedef struct rd_kafka_bufq_s {
+ TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
+ rd_atomic32_t rkbq_cnt;
+ rd_atomic32_t rkbq_msg_cnt;
+} rd_kafka_bufq_t;
+
+#define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
+
+
+#define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
+#define rd_kafka_buf_destroy(rkbuf) \
+ rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt, \
+ rd_kafka_buf_destroy_final(rkbuf))
+
+void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
+void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
+ int allow_crc_calc, void (*free_cb) (void *));
+#define rd_kafka_buf_push(rkbuf,buf,len,free_cb) \
+ rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
+rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
+#define rd_kafka_buf_new(segcnt,size) \
+ rd_kafka_buf_new0(segcnt,size,0)
+rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
+ int segcnt, size_t size);
+rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
+ void (*free_cb) (void *));
+void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
+void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
+void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
+void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
+void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
+ rd_kafka_bufq_t *rkbufq,
+ rd_kafka_resp_err_t err);
+void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
+ rd_kafka_bufq_t *rkbufq);
+void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
+ rd_kafka_bufq_t *rkbq);
+
+int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
+
+void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
+void rd_kafka_buf_callback (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *response, rd_kafka_buf_t *request);
+
+
+
+/**
+ *
+ * Write buffer interface
+ *
+ */
+
+/**
+ * Set request API type version
+ */
+static RD_UNUSED RD_INLINE void
+rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
+ int16_t version, int features) {
+ rkbuf->rkbuf_reqhdr.ApiVersion = version;
+ rkbuf->rkbuf_features = features;
+}
+
+
+/**
+ * @returns the ApiVersion for a request
+ */
+#define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
+
+
+
+/**
+ * Write (copy) data to buffer at current write-buffer position.
+ * There must be enough space allocated in the rkbuf.
+ * Returns offset to written destination buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
+ const void *data, size_t len) {
+ size_t r;
+
+ r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
+
+ if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
+ rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
+
+ return r;
+}
+
+
+
+/**
+ * Write (copy) 'data' to buffer at 'ptr'.
+ * There must be enough space to fit 'len'.
+ * This will overwrite the buffer at given location and length.
+ *
+ * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
+ * is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
+ */
+static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
+ const void *data, size_t len) {
+ rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
+ rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
+}
+
+/**
+ * Write int8_t to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
+ int8_t v) {
+ return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int8_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i8()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
+ size_t of, int8_t v) {
+ rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+/**
+ * Write int16_t to buffer.
+ * The value will be endian-swapped before write.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
+ int16_t v) {
+ v = htobe16(v);
+ return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int16_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i16()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
+ size_t of, int16_t v) {
+ v = htobe16(v);
+ rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+/**
+ * Write int32_t to buffer.
+ * The value will be endian-swapped before write.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
+ int32_t v) {
+ v = htobe32(v);
+ return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int32_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i32()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
+ size_t of, int32_t v) {
+ v = htobe32(v);
+ rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+/**
+ * Update int32_t in buffer at offset 'of'.
+ * 'of' should have been previously returned by `.._buf_write_i32()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
+ size_t of, uint32_t v) {
+ v = htobe32(v);
+ rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+
+/**
+ * Write int64_t to buffer.
+ * The value will be endian-swapped before write.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf, int64_t v) {
+ v = htobe64(v);
+ return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
+}
+
+/**
+ * Update int64_t in buffer at address 'ptr'.
+ * 'of' should have been previously returned by `.._buf_write_i64()`.
+ */
+static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
+ size_t of, int64_t v) {
+ v = htobe64(v);
+ rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
+}
+
+
+/**
+ * Write (copy) Kafka string to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
+ const rd_kafkap_str_t *kstr) {
+ return rd_kafka_buf_write(rkbuf, RD_KAFKAP_STR_SER(kstr),
+ RD_KAFKAP_STR_SIZE(kstr));
+}
+
+/**
+ * Write (copy) char * string to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
+ const char *str, size_t len) {
+ size_t r;
+ if (!str)
+ len = RD_KAFKAP_STR_LEN_NULL;
+ else if (len == (size_t)-1)
+ len = strlen(str);
+ r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
+ if (str)
+ rd_kafka_buf_write(rkbuf, str, len);
+ return r;
+}
+
+
+/**
+ * Push (i.e., no copy) Kafka string to buffer iovec
+ */
+static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
+ const rd_kafkap_str_t *kstr) {
+ rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
+ RD_KAFKAP_STR_SIZE(kstr), NULL);
+}
+
+
+
+/**
+ * Write (copy) Kafka bytes to buffer.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
+ const rd_kafkap_bytes_t *kbytes){
+ return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
+ RD_KAFKAP_BYTES_SIZE(kbytes));
+}
+
+/**
+ * Push (i.e., no copy) Kafka bytes to buffer iovec
+ */
+static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
+ const rd_kafkap_bytes_t *kbytes){
+ rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
+ RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
+}
+
+/**
+ * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
+ */
+static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
+ const void *payload, size_t size) {
+ size_t r;
+ if (!payload)
+ size = RD_KAFKAP_BYTES_LEN_NULL;
+ r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
+ if (payload)
+ rd_kafka_buf_write(rkbuf, payload, size);
+ return r;
+}
+
+
+
+
+/**
+ * Write Kafka Message to buffer
+ * The number of bytes written is returned in '*outlenp'.
+ *
+ * Returns the buffer offset of the first byte.
+ */
+size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *rkbuf,
+ int64_t Offset, int8_t MagicByte,
+ int8_t Attributes, int64_t Timestamp,
+ const void *key, int32_t key_len,
+ const void *payload, int32_t len,
+ int *outlenp);
+
+/**
+ * Start calculating CRC from now and track it in '*crcp'.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
+ rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
+ rkbuf->rkbuf_crc = rd_crc32_init();
+}
+
+/**
+ * Finalizes CRC calculation and returns the calculated checksum.
+ */
+static RD_INLINE RD_UNUSED
+rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
+ rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
+ return rd_crc32_finalize(rkbuf->rkbuf_crc);
+}
+
+
+
+
+
+/**
+ * @brief Check if buffer's replyq.version is outdated.
+ * @param rkbuf: may be NULL, for convenience.
+ *
+ * @returns 1 if this is an outdated buffer, else 0.
+ */
+static RD_UNUSED RD_INLINE int
+rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
+ return rkbuf && rkbuf->rkbuf_replyq.version &&
+ rkbuf->rkbuf_replyq.version < version;
+}