You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:59 UTC
[31/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
deleted file mode 100644
index a30f2bd..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.h
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdkafka_feature.h"
-
-
-extern const char *rd_kafka_broker_state_names[];
-extern const char *rd_kafka_secproto_names[];
-
-struct rd_kafka_broker_s { /* rd_kafka_broker_t */
- TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;
-
- int32_t rkb_nodeid;
-#define RD_KAFKA_NODEID_UA -1
-
- rd_sockaddr_list_t *rkb_rsal;
- time_t rkb_t_rsal_last;
- const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */
-
- rd_kafka_transport_t *rkb_transport;
-
- uint32_t rkb_corrid;
- int rkb_connid; /* Connection id, increased by
- * one for each connection by
- * this broker. Used as a safe-guard
- * to help troubleshooting buffer
- * problems across disconnects. */
-
- rd_kafka_q_t *rkb_ops;
-
- mtx_t rkb_lock;
-
- int rkb_blocking_max_ms; /* Maximum IO poll blocking
- * time. */
-
- /* Toppars handled by this broker */
- TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
- int rkb_toppar_cnt;
-
- /* Underflowed toppars that are eligible for fetching. */
- CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_fetch_toppars;
- int rkb_fetch_toppar_cnt;
- rd_kafka_toppar_t *rkb_fetch_toppar_next; /* Next 'first' toppar
- * in fetch list.
- * This is used for
- * round-robin. */
-
-
- rd_kafka_cgrp_t *rkb_cgrp;
-
- rd_ts_t rkb_ts_fetch_backoff;
- int rkb_fetching;
-
- enum {
- RD_KAFKA_BROKER_STATE_INIT,
- RD_KAFKA_BROKER_STATE_DOWN,
- RD_KAFKA_BROKER_STATE_CONNECT,
- RD_KAFKA_BROKER_STATE_AUTH,
-
- /* Any state >= STATE_UP means the Kafka protocol layer
- * is operational (to some degree). */
- RD_KAFKA_BROKER_STATE_UP,
- RD_KAFKA_BROKER_STATE_UPDATE,
- RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
- RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
- } rkb_state;
-
- rd_ts_t rkb_ts_state; /* Timestamp of last
- * state change */
- rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan
- * interval. */
-
- rd_atomic32_t rkb_blocking_request_cnt; /* The number of
- * in-flight blocking
- * requests.
- * A blocking request is
- * one that is known to
- * possibly block on the
- * broker for longer than
- * the typical processing
- * time, e.g.:
- * JoinGroup, SyncGroup */
-
- int rkb_features; /* Protocol features supported
- * by this broker.
- * See RD_KAFKA_FEATURE_* in
- * rdkafka_proto.h */
-
- struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
- * (MUST be sorted) */
- size_t rkb_ApiVersions_cnt;
- rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long
- * the fallback proto
- * will be used after
- * ApiVersionRequest
- * failure. */
-
- rd_kafka_confsource_t rkb_source;
- struct {
- rd_atomic64_t tx_bytes;
- rd_atomic64_t tx; /* Kafka-messages (not payload msgs) */
- rd_atomic64_t tx_err;
- rd_atomic64_t tx_retries;
- rd_atomic64_t req_timeouts; /* Accumulated value */
-
- rd_atomic64_t rx_bytes;
- rd_atomic64_t rx; /* Kafka messages (not payload msgs) */
- rd_atomic64_t rx_err;
- rd_atomic64_t rx_corrid_err; /* CorrId misses */
- rd_atomic64_t rx_partial; /* Partial messages received
- * and dropped. */
- rd_atomic64_t zbuf_grow; /* Compression/decompression buffer grows needed */
- rd_atomic64_t buf_grow; /* rkbuf grows needed */
- rd_atomic64_t wakeups; /* Poll wakeups */
- } rkb_c;
-
- int rkb_req_timeouts; /* Current value */
-
- rd_ts_t rkb_ts_metadata_poll; /* Next metadata poll time */
- int rkb_metadata_fast_poll_cnt; /* Perform fast
- * metadata polls. */
- thrd_t rkb_thread;
-
- rd_refcnt_t rkb_refcnt;
-
- rd_kafka_t *rkb_rk;
-
- rd_kafka_buf_t *rkb_recv_buf;
-
- int rkb_max_inflight; /* Maximum number of in-flight
- * requests to broker.
- * Compared to rkb_waitresps length.*/
- rd_kafka_bufq_t rkb_outbufs;
- rd_kafka_bufq_t rkb_waitresps;
- rd_kafka_bufq_t rkb_retrybufs;
-
- rd_avg_t rkb_avg_int_latency;/* Current internal latency period*/
- rd_avg_t rkb_avg_rtt; /* Current RTT period */
- rd_avg_t rkb_avg_throttle; /* Current throttle period */
-
- /* These are all protected by rkb_lock */
- char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */
- char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
- uint16_t rkb_port; /* TCP port */
- char *rkb_origname; /* Original
- * host name */
-
-
- /* Logging name is a copy of rkb_name, protected by its own mutex */
- char *rkb_logname;
- mtx_t rkb_logname_lock;
-
- int rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake
- * up from IO-wait when
- * queues have content. */
- int rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd,
- * this is rkb_wakeup_fd[1]
- * if enabled. */
- rd_interval_t rkb_connect_intvl; /* Reconnect throttling */
-
- rd_kafka_secproto_t rkb_proto;
-
- int rkb_down_reported; /* Down event reported */
-#if WITH_SASL_CYRUS
- rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr;
-#endif
-
-
- struct {
- char msg[512];
- int err; /* errno */
- } rkb_err;
-};
-
-#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
-#define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock)
-#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)
-
-
-/**
- * @brief Broker comparator
- */
-static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a,
- const void *_b) {
- const rd_kafka_broker_t *a = _a, *b = _b;
- return (int)(a - b);
-}
-
-
-/**
- * @returns true if broker supports \p features, else false.
- */
-static RD_UNUSED
-int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) {
- int r;
- rd_kafka_broker_lock(rkb);
- r = (rkb->rkb_features & features) == features;
- rd_kafka_broker_unlock(rkb);
- return r;
-}
-
-int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
- int16_t ApiKey,
- int16_t minver, int16_t maxver,
- int *featuresp);
-
-int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb);
-
-rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk,
- int32_t nodeid);
-rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk,
- int32_t nodeid,
- int state);
-#define rd_kafka_broker_find_by_nodeid(rk,nodeid) \
- rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1)
-
-/**
- * Filter out brokers that are currently in a blocking request.
- */
-static RD_INLINE RD_UNUSED int
-rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) {
- return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0;
-}
-
-/**
- * Filter out brokers that cant do GroupCoordinator requests right now.
- */
-static RD_INLINE RD_UNUSED int
-rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) {
- return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 ||
- !(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD);
-}
-
-rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
- int (*filter) (rd_kafka_broker_t *rkb,
- void *opaque),
- void *opaque);
-
-rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms,
- int do_lock);
-
-rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, int state);
-
-int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
-void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
-
-void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
- int level, rd_kafka_resp_err_t err,
- const char *fmt, ...);
-
-void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);
-
-#define rd_kafka_broker_destroy(rkb) \
- rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \
- rd_kafka_broker_destroy_final(rkb))
-
-
-void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
- const struct rd_kafka_metadata_broker *mdb);
-rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
- rd_kafka_confsource_t source,
- rd_kafka_secproto_t proto,
- const char *name, uint16_t port,
- int32_t nodeid);
-
-void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb);
-void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr);
-
-int rd_kafka_send (rd_kafka_broker_t *rkb);
-int rd_kafka_recv (rd_kafka_broker_t *rkb);
-
-void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
- rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err);
-
-void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
-
-
-rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk);
-
-void msghdr_print (rd_kafka_t *rk,
- const char *what, const struct msghdr *msg,
- int hexdump);
-
-const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb);
-void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb);
-
-int rd_kafka_brokers_get_state_version (rd_kafka_t *rk);
-int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
- int timeout_ms);
-void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
deleted file mode 100644
index 9b50737..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.c
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_buf.h"
-#include "rdkafka_broker.h"
-
-void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf) {
-
- switch (rkbuf->rkbuf_reqhdr.ApiKey)
- {
- case RD_KAFKAP_Metadata:
- if (rkbuf->rkbuf_u.Metadata.topics)
- rd_list_destroy(rkbuf->rkbuf_u.Metadata.topics);
- if (rkbuf->rkbuf_u.Metadata.reason)
- rd_free(rkbuf->rkbuf_u.Metadata.reason);
- if (rkbuf->rkbuf_u.Metadata.rko)
- rd_kafka_op_reply(rkbuf->rkbuf_u.Metadata.rko,
- RD_KAFKA_RESP_ERR__DESTROY);
- if (rkbuf->rkbuf_u.Metadata.decr) {
- /* Decrease metadata cache's full_.._sent state. */
- mtx_lock(rkbuf->rkbuf_u.Metadata.decr_lock);
- rd_kafka_assert(NULL,
- (*rkbuf->rkbuf_u.Metadata.decr) > 0);
- (*rkbuf->rkbuf_u.Metadata.decr)--;
- mtx_unlock(rkbuf->rkbuf_u.Metadata.decr_lock);
- }
- break;
- }
-
- if (rkbuf->rkbuf_response)
- rd_kafka_buf_destroy(rkbuf->rkbuf_response);
-
- rd_kafka_replyq_destroy(&rkbuf->rkbuf_replyq);
- rd_kafka_replyq_destroy(&rkbuf->rkbuf_orig_replyq);
-
- rd_buf_destroy(&rkbuf->rkbuf_buf);
-
- if (rkbuf->rkbuf_rktp_vers)
- rd_list_destroy(rkbuf->rkbuf_rktp_vers);
-
- if (rkbuf->rkbuf_rkb)
- rd_kafka_broker_destroy(rkbuf->rkbuf_rkb);
-
- rd_refcnt_destroy(&rkbuf->rkbuf_refcnt);
-
- rd_free(rkbuf);
-}
-
-
-
-/**
- * @brief Pushes \p buf of size \p len as a new segment on the buffer.
- *
- * \p buf will NOT be freed by the buffer.
- */
-void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
- int allow_crc_calc, void (*free_cb) (void *)) {
- rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
-
- if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
- rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
-}
-
-
-
-/**
- * @brief Create a new buffer with \p segcmt initial segments and \p size bytes
- * of initial backing memory.
- * The underlying buffer will grow as needed.
- *
- * If \p rk is non-NULL (typical case):
- * Additional space for the Kafka protocol headers is inserted automatically.
- */
-rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_calloc(1, sizeof(*rkbuf));
-
- rkbuf->rkbuf_flags = flags;
-
- rd_buf_init(&rkbuf->rkbuf_buf, segcnt, size);
- rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
- rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
-
- return rkbuf;
-}
-
-
-/**
- * @brief Create new request buffer with the request-header written (will
- * need to be updated with Length, etc, later)
- */
-rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
- int segcnt, size_t size) {
- rd_kafka_buf_t *rkbuf;
-
- /* Make room for common protocol request headers */
- size += RD_KAFKAP_REQHDR_SIZE +
- RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);
- segcnt += 1; /* headers */
-
- rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
-
- rkbuf->rkbuf_rkb = rkb;
- rd_kafka_broker_keep(rkb);
-
- rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
-
- /* Write request header, will be updated later. */
- /* Length: updated later */
- rd_kafka_buf_write_i32(rkbuf, 0);
- /* ApiKey */
- rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
- /* ApiVersion: updated later */
- rd_kafka_buf_write_i16(rkbuf, 0);
- /* CorrId: updated later */
- rd_kafka_buf_write_i32(rkbuf, 0);
-
- /* ClientId */
- rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
-
- return rkbuf;
-}
-
-
-
-
-/**
- * @brief Create new read-only rkbuf shadowing a memory region.
- *
- * @remark \p free_cb (possibly NULL) will be used to free \p ptr when
- * buffer refcount reaches 0.
- * @remark the buffer may only be read from, not written to.
- */
-rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
- void (*free_cb) (void *)) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_calloc(1, sizeof(*rkbuf));
-
- rkbuf->rkbuf_reqhdr.ApiKey = RD_KAFKAP_None;
-
- rd_buf_init(&rkbuf->rkbuf_buf, 1, 0);
- rd_buf_push(&rkbuf->rkbuf_buf, ptr, size, free_cb);
-
- rkbuf->rkbuf_totlen = size;
-
- /* Initialize reader slice */
- rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
-
- rd_kafka_msgq_init(&rkbuf->rkbuf_msgq);
-
- rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
-
- return rkbuf;
-}
-
-
-
-void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
- TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
- (void)rd_atomic32_add(&rkbufq->rkbq_cnt, 1);
- (void)rd_atomic32_add(&rkbufq->rkbq_msg_cnt,
- rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
-}
-
-void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
- TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
- rd_kafka_assert(NULL, rd_atomic32_get(&rkbufq->rkbq_cnt) > 0);
- (void)rd_atomic32_sub(&rkbufq->rkbq_cnt, 1);
- (void)rd_atomic32_sub(&rkbufq->rkbq_msg_cnt,
- rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
-}
-
-void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
- TAILQ_INIT(&rkbufq->rkbq_bufs);
- rd_atomic32_init(&rkbufq->rkbq_cnt, 0);
- rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
-}
-
-/**
- * Concat all buffers from 'src' to tail of 'dst'
- */
-void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
- TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
- (void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
- (void)rd_atomic32_add(&dst->rkbq_msg_cnt, rd_atomic32_get(&src->rkbq_msg_cnt));
- rd_kafka_bufq_init(src);
-}
-
-/**
- * Purge the wait-response queue.
- * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
- * or rkb_outbufs since buffers may be re-enqueued on those queues.
- * 'rkbufq' needs to be bufq_init():ed before reuse after this call.
- */
-void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
- rd_kafka_bufq_t *rkbufq,
- rd_kafka_resp_err_t err) {
- rd_kafka_buf_t *rkbuf, *tmp;
-
- rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
- rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
- rd_atomic32_get(&rkbufq->rkbq_cnt));
-
- TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
- rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
- }
-}
-
-
-/**
- * @brief Update bufq for connection reset:
- *
- * - Purge connection-setup API requests from the queue.
- * - Reset any partially sent buffer's offset. (issue #756)
- *
- * Request types purged:
- * ApiVersion
- * SaslHandshake
- */
-void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
- rd_kafka_bufq_t *rkbufq) {
- rd_kafka_buf_t *rkbuf, *tmp;
-
- rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
- rd_rkb_dbg(rkb, QUEUE, "BUFQ",
- "Updating %d buffers on connection reset",
- rd_atomic32_get(&rkbufq->rkbq_cnt));
-
- TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
- switch (rkbuf->rkbuf_reqhdr.ApiKey)
- {
- case RD_KAFKAP_ApiVersion:
- case RD_KAFKAP_SaslHandshake:
- rd_kafka_bufq_deq(rkbufq, rkbuf);
- rd_kafka_buf_callback(rkb->rkb_rk, rkb,
- RD_KAFKA_RESP_ERR__DESTROY,
- NULL, rkbuf);
- break;
- default:
- /* Reset buffer send position */
- rd_slice_seek(&rkbuf->rkbuf_reader, 0);
- break;
- }
- }
-}
-
-
-void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
- rd_kafka_bufq_t *rkbq) {
- rd_kafka_buf_t *rkbuf;
- int cnt = rd_kafka_bufq_cnt(rkbq);
- rd_ts_t now;
-
- if (!cnt)
- return;
-
- now = rd_clock();
-
- rd_rkb_dbg(rkb, BROKER, fac, "bufq with %d buffer(s):", cnt);
-
- TAILQ_FOREACH(rkbuf, &rkbq->rkbq_bufs, rkbuf_link) {
- rd_rkb_dbg(rkb, BROKER, fac,
- " Buffer %s (%"PRIusz" bytes, corrid %"PRId32", "
- "connid %d, retry %d in %lldms, timeout in %lldms",
- rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
- rkbuf->rkbuf_totlen, rkbuf->rkbuf_corrid,
- rkbuf->rkbuf_connid, rkbuf->rkbuf_retries,
- rkbuf->rkbuf_ts_retry ?
- (now - rkbuf->rkbuf_ts_retry) / 1000LL : 0,
- rkbuf->rkbuf_ts_timeout ?
- (now - rkbuf->rkbuf_ts_timeout) / 1000LL : 0);
- }
-}
-
-
-
-
-/**
- * Retry failed request, depending on the error.
- * @remark \p rkb may be NULL
- * Returns 1 if the request was scheduled for retry, else 0.
- */
-int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
-
- if (unlikely(!rkb ||
- rkb->rkb_source == RD_KAFKA_INTERNAL ||
- rd_kafka_terminating(rkb->rkb_rk) ||
- rkbuf->rkbuf_retries + 1 >
- rkb->rkb_rk->rk_conf.max_retries))
- return 0;
-
- /* Try again */
- rkbuf->rkbuf_ts_sent = 0;
- rkbuf->rkbuf_retries++;
- rd_kafka_buf_keep(rkbuf);
- rd_kafka_broker_buf_retry(rkb, rkbuf);
- return 1;
-}
-
-
-/**
- * @brief Handle RD_KAFKA_OP_RECV_BUF.
- */
-void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
- rd_kafka_buf_t *request, *response;
-
- request = rko->rko_u.xbuf.rkbuf;
- rko->rko_u.xbuf.rkbuf = NULL;
-
- /* NULL on op_destroy() */
- if (request->rkbuf_replyq.q) {
- int32_t version = request->rkbuf_replyq.version;
- /* Current queue usage is done, but retain original replyq for
- * future retries, stealing
- * the current reference. */
- request->rkbuf_orig_replyq = request->rkbuf_replyq;
- rd_kafka_replyq_clear(&request->rkbuf_replyq);
- /* Callback might need to version check so we retain the
- * version across the clear() call which clears it. */
- request->rkbuf_replyq.version = version;
- }
-
- if (!request->rkbuf_cb) {
- rd_kafka_buf_destroy(request);
- return;
- }
-
- /* Let buf_callback() do destroy()s */
- response = request->rkbuf_response; /* May be NULL */
- request->rkbuf_response = NULL;
-
- rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
- request->rkbuf_rkb, err,
- response, request);
-}
-
-
-
-/**
- * Call request.rkbuf_cb(), but:
- * - if the rkbuf has a rkbuf_replyq the buffer is enqueued on that queue
- * with op type RD_KAFKA_OP_RECV_BUF.
- * - else call rkbuf_cb().
- *
- * \p response may be NULL.
- *
- * Will decrease refcount for both response and request, eventually.
- */
-void rd_kafka_buf_callback (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
- rd_kafka_buf_t *response, rd_kafka_buf_t *request){
-
- /* Decide if the request should be retried.
- * This is always done in the originating broker thread. */
- if (unlikely(err && err != RD_KAFKA_RESP_ERR__DESTROY &&
- rd_kafka_buf_retry(rkb, request))) {
- /* refcount for retry was increased in buf_retry() so we can
- * let go of this caller's refcounts. */
- rd_kafka_buf_destroy(request);
- if (response)
- rd_kafka_buf_destroy(response);
- return;
- }
-
- if (err != RD_KAFKA_RESP_ERR__DESTROY && request->rkbuf_replyq.q) {
- rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
-
- rd_kafka_assert(NULL, !request->rkbuf_response);
- request->rkbuf_response = response;
-
- /* Increment refcnt since rko_rkbuf will be decref:ed
- * if replyq_enq() fails and we dont want the rkbuf gone in that
- * case. */
- rd_kafka_buf_keep(request);
- rko->rko_u.xbuf.rkbuf = request;
-
- rko->rko_err = err;
-
- /* Copy original replyq for future retries, with its own
- * queue reference. */
- rd_kafka_replyq_copy(&request->rkbuf_orig_replyq,
- &request->rkbuf_replyq);
-
- rd_kafka_replyq_enq(&request->rkbuf_replyq, rko, 0);
-
- rd_kafka_buf_destroy(request); /* from keep above */
- return;
- }
-
- if (request->rkbuf_cb)
- request->rkbuf_cb(rk, rkb, err, response, request,
- request->rkbuf_opaque);
-
- rd_kafka_buf_destroy(request);
- if (response)
- rd_kafka_buf_destroy(response);
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
deleted file mode 100644
index 5aa2876..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_buf.h
+++ /dev/null
@@ -1,819 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-#include "rdkafka_int.h"
-#include "rdcrc32.h"
-#include "rdlist.h"
-#include "rdbuf.h"
-
-
-typedef struct rd_kafka_broker_s rd_kafka_broker_t;
-
-#define RD_KAFKA_HEADERS_IOV_CNT 2
-
-
-/**
- * Temporary buffer with memory aligned writes to accommodate
- * effective and platform safe struct writes.
- */
-typedef struct rd_tmpabuf_s {
- size_t size;
- size_t of;
- char *buf;
- int failed;
- int assert_on_fail;
-} rd_tmpabuf_t;
-
-/**
- * @brief Allocate new tmpabuf with \p size bytes pre-allocated.
- */
-static RD_UNUSED void
-rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
- tab->buf = rd_malloc(size);
- tab->size = size;
- tab->of = 0;
- tab->failed = 0;
- tab->assert_on_fail = assert_on_fail;
-}
-
-/**
- * @brief Free memory allocated by tmpabuf
- */
-static RD_UNUSED void
-rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
- rd_free(tab->buf);
-}
-
-/**
- * @returns 1 if a previous operation failed.
- */
-static RD_UNUSED RD_INLINE int
-rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
- return tab->failed;
-}
-
-/**
- * @brief Allocate \p size bytes for writing, returning an aligned pointer
- * to the memory.
- * @returns the allocated pointer (within the tmpabuf) on success or
- * NULL if the requested number of bytes + alignment is not available
- * in the tmpabuf.
- */
-static RD_UNUSED void *
-rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
- void *ptr;
-
- if (unlikely(tab->failed))
- return NULL;
-
- if (unlikely(tab->of + size > tab->size)) {
- if (tab->assert_on_fail) {
- fprintf(stderr,
- "%s: %s:%d: requested size %zd + %zd > %zd\n",
- __FUNCTION__, func, line, tab->of, size,
- tab->size);
- assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
- }
- return NULL;
- }
-
- ptr = (void *)(tab->buf + tab->of);
- tab->of += RD_ROUNDUP(size, 8);
-
- return ptr;
-}
-
-#define rd_tmpabuf_alloc(tab,size) \
- rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
-
-/**
- * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
- *
- * @returns the allocated and written-to pointer (within the tmpabuf) on success
- * or NULL if the requested number of bytes + alignment is not available
- * in the tmpabuf.
- */
-static RD_UNUSED void *
-rd_tmpabuf_write0 (const char *func, int line,
- rd_tmpabuf_t *tab, const void *buf, size_t size) {
- void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
-
- if (ptr)
- memcpy(ptr, buf, size);
-
- return ptr;
-}
-#define rd_tmpabuf_write(tab,buf,size) \
- rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
-
-
-/**
- * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
- */
-static RD_UNUSED char *
-rd_tmpabuf_write_str0 (const char *func, int line,
- rd_tmpabuf_t *tab, const char *str) {
- return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
-}
-#define rd_tmpabuf_write_str(tab,str) \
- rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
-
-
-
-/**
- * @name Read buffer interface
- *
- * Memory reading helper macros to be used when parsing network responses.
- *
- * Assumptions:
- * - an 'err_parse:' goto-label must be available for error bailouts,
- * the error code will be set in rkbuf->rkbuf_err
- * - local `int log_decode_errors` variable set to the logging level
- * to log parse errors (or 0 to turn off logging).
- */
-
-#define rd_kafka_buf_parse_fail(rkbuf,...) do { \
- if (log_decode_errors > 0) { \
- rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
- rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
- "PROTOERR", \
- "Protocol parse failure " \
- "at %"PRIusz"/%"PRIusz" (%s:%i) " \
- "(incorrect broker.version.fallback?)", \
- rd_slice_offset(&rkbuf->rkbuf_reader), \
- rd_slice_size(&rkbuf->rkbuf_reader), \
- __FUNCTION__, __LINE__); \
- rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
- "PROTOERR", __VA_ARGS__); \
- } \
- (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
- goto err_parse; \
- } while (0)
-
-
-
-/**
- * Returns the number of remaining bytes available to read.
- */
-#define rd_kafka_buf_read_remain(rkbuf) \
- rd_slice_remains(&(rkbuf)->rkbuf_reader)
-
-/**
- * Checks that at least 'len' bytes remain to be read in buffer, else fails.
- */
-#define rd_kafka_buf_check_len(rkbuf,len) do { \
- size_t __len0 = (size_t)(len); \
- if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
- rd_kafka_buf_parse_fail( \
- rkbuf, \
- "expected %"PRIusz" bytes > %"PRIusz \
- " remaining bytes", \
- __len0, rd_kafka_buf_read_remain(rkbuf)); \
- (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
- goto err_parse; \
- } \
- } while (0)
-
-/**
- * Skip (as in read and ignore) the next 'len' bytes.
- */
-#define rd_kafka_buf_skip(rkbuf, len) do { \
- size_t __len1 = (size_t)(len); \
- if (__len1 && \
- !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
- rd_kafka_buf_check_len(rkbuf, __len1); \
- } while (0)
-
-/**
- * Skip (as in read and ignore) up to fixed position \p pos.
- */
-#define rd_kafka_buf_skip_to(rkbuf, pos) do { \
- size_t __len1 = (size_t)(pos) - \
- rd_slice_offset(&(rkbuf)->rkbuf_reader); \
- if (__len1 && \
- !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
- rd_kafka_buf_check_len(rkbuf, __len1); \
- } while (0)
-
-
-
-/**
- * Read 'len' bytes and copy to 'dstptr'
- */
-#define rd_kafka_buf_read(rkbuf,dstptr,len) do { \
- size_t __len2 = (size_t)(len); \
- if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2)) \
- rd_kafka_buf_check_len(rkbuf, __len2); \
- } while (0)
-
-
-/**
- * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
- * without affecting the current reader position.
- */
-#define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do { \
- size_t __len2 = (size_t)(len); \
- if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset, \
- dstptr, __len2)) \
- rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
- } while (0)
-
-
-/**
- * Read a 16,32,64-bit integer and store it in 'dstptr'
- */
-#define rd_kafka_buf_read_i64(rkbuf,dstptr) do { \
- int64_t _v; \
- rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
- *(dstptr) = be64toh(_v); \
- } while (0)
-
-#define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do { \
- int64_t _v; \
- rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \
- *(dstptr) = be64toh(_v); \
- } while (0)
-
-#define rd_kafka_buf_read_i32(rkbuf,dstptr) do { \
- int32_t _v; \
- rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
- *(dstptr) = be32toh(_v); \
- } while (0)
-
-/* Same as .._read_i32 but does a direct assignment.
- * dst is assumed to be a scalar, not pointer. */
-#define rd_kafka_buf_read_i32a(rkbuf, dst) do { \
- int32_t _v; \
- rd_kafka_buf_read(rkbuf, &_v, 4); \
- dst = (int32_t) be32toh(_v); \
- } while (0)
-
-#define rd_kafka_buf_read_i16(rkbuf,dstptr) do { \
- int16_t _v; \
- rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
- *(dstptr) = be16toh(_v); \
- } while (0)
-
-
-#define rd_kafka_buf_read_i16a(rkbuf, dst) do { \
- int16_t _v; \
- rd_kafka_buf_read(rkbuf, &_v, 2); \
- dst = (int16_t)be16toh(_v); \
- } while (0)
-
-#define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
-
-#define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
-
-
-/**
- * @brief Read varint and store in int64_t \p dst
- */
-#define rd_kafka_buf_read_varint(rkbuf,dst) do { \
- int64_t _v; \
- size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \
- if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
- rd_kafka_buf_parse_fail(rkbuf, \
- "varint parsing failed: " \
- "buffer underflow"); \
- *(dst) = _v; \
- } while (0)
-
-/* Read Kafka String representation (2+N).
- * The kstr data will be updated to point to the rkbuf. */
-#define rd_kafka_buf_read_str(rkbuf, kstr) do { \
- int _klen; \
- rd_kafka_buf_read_i16a(rkbuf, (kstr)->len); \
- _klen = RD_KAFKAP_STR_LEN(kstr); \
- if (RD_KAFKAP_STR_LEN0(_klen) == 0) \
- (kstr)->str = NULL; \
- else if (!((kstr)->str = \
- rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
- _klen))) \
- rd_kafka_buf_check_len(rkbuf, _klen); \
- } while (0)
-
-/* Read Kafka String representation (2+N) and write it to the \p tmpabuf
- * with a trailing nul byte. */
-#define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do { \
- rd_kafkap_str_t _kstr; \
- size_t _slen; \
- char *_dst; \
- rd_kafka_buf_read_str(rkbuf, &_kstr); \
- _slen = RD_KAFKAP_STR_LEN(&_kstr); \
- if (!(_dst = \
- rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1))) \
- rd_kafka_buf_parse_fail( \
- rkbuf, \
- "Not enough room in tmpabuf: " \
- "%"PRIusz"+%"PRIusz \
- " > %"PRIusz, \
- (tmpabuf)->of, _slen+1, (tmpabuf)->size); \
- _dst[_slen] = '\0'; \
- dst = (void *)_dst; \
- } while (0)
-
-/**
- * Skip a string.
- */
-#define rd_kafka_buf_skip_str(rkbuf) do { \
- int16_t _slen; \
- rd_kafka_buf_read_i16(rkbuf, &_slen); \
- rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen)); \
- } while (0)
-
-/* Read Kafka Bytes representation (4+N).
- * The 'kbytes' will be updated to point to rkbuf data */
-#define rd_kafka_buf_read_bytes(rkbuf, kbytes) do { \
- int _klen; \
- rd_kafka_buf_read_i32a(rkbuf, _klen); \
- (kbytes)->len = _klen; \
- if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
- (kbytes)->data = NULL; \
- (kbytes)->len = 0; \
- } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
- (kbytes)->data = ""; \
- else if (!((kbytes)->data = \
- rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
- _klen))) \
- rd_kafka_buf_check_len(rkbuf, _klen); \
- } while (0)
-
-
-/**
- * @brief Read \p size bytes from buffer, setting \p *ptr to the start
- * of the memory region.
- */
-#define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do { \
- size_t _klen = size; \
- if (!(*(ptr) = (void *) \
- rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
- rd_kafka_buf_check_len(rkbuf, _klen); \
- } while (0)
-
-
-/**
- * @brief Read varint-lengted Kafka Bytes representation
- */
-#define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do { \
- int64_t _len2; \
- size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \
- &_len2); \
- if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
- rd_kafka_buf_parse_fail(rkbuf, \
- "varint parsing failed: " \
- "buffer underflow"); \
- (kbytes)->len = (int32_t)_len2; \
- if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
- (kbytes)->data = NULL; \
- (kbytes)->len = 0; \
- } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
- (kbytes)->data = ""; \
- else if (!((kbytes)->data = \
- rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
- _len2))) \
- rd_kafka_buf_check_len(rkbuf, _len2); \
- } while (0)
-
-
-/**
- * Response handling callback.
- *
- * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
- * which indicates that some entity is terminating (rd_kafka_t, broker,
- * toppar, queue, etc) and the callback may not be called in the
- * correct thread. In this case the callback must perform just
- * the most minimal cleanup and dont trigger any other operations.
- *
- * NOTE: rkb, reply and request may be NULL, depending on error situation.
- */
-typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *reply,
- rd_kafka_buf_t *request,
- void *opaque);
-
-struct rd_kafka_buf_s { /* rd_kafka_buf_t */
- TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
-
- int32_t rkbuf_corrid;
-
- rd_ts_t rkbuf_ts_retry; /* Absolute send retry time */
-
- int rkbuf_flags; /* RD_KAFKA_OP_F */
-
- rd_buf_t rkbuf_buf; /**< Send/Recv byte buffer */
- rd_slice_t rkbuf_reader; /**< Buffer slice reader for rkbuf_buf */
-
- int rkbuf_connid; /* broker connection id (used when buffer
- * was partially sent). */
- size_t rkbuf_totlen; /* recv: total expected length,
- * send: not used */
-
- rd_crc32_t rkbuf_crc; /* Current CRC calculation */
-
- struct rd_kafkap_reqhdr rkbuf_reqhdr; /* Request header.
- * These fields are encoded
- * and written to output buffer
- * on buffer finalization. */
- struct rd_kafkap_reshdr rkbuf_reshdr; /* Response header.
- * Decoded fields are copied
- * here from the buffer
- * to provide an ease-of-use
- * interface to the header */
-
- int32_t rkbuf_expected_size; /* expected size of message */
-
- rd_kafka_replyq_t rkbuf_replyq; /* Enqueue response on replyq */
- rd_kafka_replyq_t rkbuf_orig_replyq; /* Original replyq to be used
- * for retries from inside
- * the rkbuf_cb() callback
- * since rkbuf_replyq will
- * have been reset. */
- rd_kafka_resp_cb_t *rkbuf_cb; /* Response callback */
- struct rd_kafka_buf_s *rkbuf_response; /* Response buffer */
-
- struct rd_kafka_broker_s *rkbuf_rkb;
-
- rd_refcnt_t rkbuf_refcnt;
- void *rkbuf_opaque;
-
- int rkbuf_retries; /* Retries so far. */
-#define RD_KAFKA_BUF_NO_RETRIES 1000000 /* Do not retry */
-
- int rkbuf_features; /* Required feature(s) that must be
- * supported by broker. */
-
- rd_ts_t rkbuf_ts_enq;
- rd_ts_t rkbuf_ts_sent; /* Initially: Absolute time of transmission,
- * after response: RTT. */
- rd_ts_t rkbuf_ts_timeout;
-
- int64_t rkbuf_offset; /* Used by OffsetCommit */
-
- rd_list_t *rkbuf_rktp_vers; /* Toppar + Op Version map.
- * Used by FetchRequest. */
-
- rd_kafka_msgq_t rkbuf_msgq;
-
- rd_kafka_resp_err_t rkbuf_err; /* Buffer parsing error code */
-
- union {
- struct {
- rd_list_t *topics; /* Requested topics (char *) */
- char *reason; /* Textual reason */
- rd_kafka_op_t *rko; /* Originating rko with replyq
- * (if any) */
- int all_topics; /* Full/All topics requested */
-
- int *decr; /* Decrement this integer by one
- * when request is complete:
- * typically points to metadata
- * cache's full_.._sent.
- * Will be performed with
- * decr_lock held. */
- mtx_t *decr_lock;
-
- } Metadata;
- } rkbuf_u;
-};
-
-
-typedef struct rd_kafka_bufq_s {
- TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
- rd_atomic32_t rkbq_cnt;
- rd_atomic32_t rkbq_msg_cnt;
-} rd_kafka_bufq_t;
-
-#define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
-
-
-#define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
-#define rd_kafka_buf_destroy(rkbuf) \
- rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt, \
- rd_kafka_buf_destroy_final(rkbuf))
-
-void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
-void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
- int allow_crc_calc, void (*free_cb) (void *));
-#define rd_kafka_buf_push(rkbuf,buf,len,free_cb) \
- rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
-rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
-#define rd_kafka_buf_new(segcnt,size) \
- rd_kafka_buf_new0(segcnt,size,0)
-rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
- int segcnt, size_t size);
-rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
- void (*free_cb) (void *));
-void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
-void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
-void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
-void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
-void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
- rd_kafka_bufq_t *rkbufq,
- rd_kafka_resp_err_t err);
-void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
- rd_kafka_bufq_t *rkbufq);
-void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
- rd_kafka_bufq_t *rkbq);
-
-int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
-
-void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
-void rd_kafka_buf_callback (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
- rd_kafka_buf_t *response, rd_kafka_buf_t *request);
-
-
-
-/**
- *
- * Write buffer interface
- *
- */
-
-/**
- * Set request API type version
- */
-static RD_UNUSED RD_INLINE void
-rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
- int16_t version, int features) {
- rkbuf->rkbuf_reqhdr.ApiVersion = version;
- rkbuf->rkbuf_features = features;
-}
-
-
-/**
- * @returns the ApiVersion for a request
- */
-#define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
-
-
-
-/**
- * Write (copy) data to buffer at current write-buffer position.
- * There must be enough space allocated in the rkbuf.
- * Returns offset to written destination buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
- const void *data, size_t len) {
- size_t r;
-
- r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
-
- if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
- rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
-
- return r;
-}
-
-
-
-/**
- * Write (copy) 'data' to buffer at 'ptr'.
- * There must be enough space to fit 'len'.
- * This will overwrite the buffer at given location and length.
- *
- * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
- * is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
- */
-static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
- const void *data, size_t len) {
- rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
- rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
-}
-
-/**
- * Write int8_t to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
- int8_t v) {
- return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int8_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i8()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
- size_t of, int8_t v) {
- rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-/**
- * Write int16_t to buffer.
- * The value will be endian-swapped before write.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
- int16_t v) {
- v = htobe16(v);
- return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int16_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i16()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
- size_t of, int16_t v) {
- v = htobe16(v);
- rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-/**
- * Write int32_t to buffer.
- * The value will be endian-swapped before write.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
- int32_t v) {
- v = htobe32(v);
- return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int32_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i32()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
- size_t of, int32_t v) {
- v = htobe32(v);
- rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-/**
- * Update int32_t in buffer at offset 'of'.
- * 'of' should have been previously returned by `.._buf_write_i32()`.
- */
-static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
- size_t of, uint32_t v) {
- v = htobe32(v);
- rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-
-/**
- * Write int64_t to buffer.
- * The value will be endian-swapped before write.
- */
-static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf, int64_t v) {
- v = htobe64(v);
- return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
-}
-
-/**
- * Update int64_t in buffer at address 'ptr'.
- * 'of' should have been previously returned by `.._buf_write_i64()`.
- */
-static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
- size_t of, int64_t v) {
- v = htobe64(v);
- rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
-}
-
-
-/**
- * Write (copy) Kafka string to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
- const rd_kafkap_str_t *kstr) {
- return rd_kafka_buf_write(rkbuf, RD_KAFKAP_STR_SER(kstr),
- RD_KAFKAP_STR_SIZE(kstr));
-}
-
-/**
- * Write (copy) char * string to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
- const char *str, size_t len) {
- size_t r;
- if (!str)
- len = RD_KAFKAP_STR_LEN_NULL;
- else if (len == (size_t)-1)
- len = strlen(str);
- r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
- if (str)
- rd_kafka_buf_write(rkbuf, str, len);
- return r;
-}
-
-
-/**
- * Push (i.e., no copy) Kafka string to buffer iovec
- */
-static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
- const rd_kafkap_str_t *kstr) {
- rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
- RD_KAFKAP_STR_SIZE(kstr), NULL);
-}
-
-
-
-/**
- * Write (copy) Kafka bytes to buffer.
- */
-static RD_INLINE size_t rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
- const rd_kafkap_bytes_t *kbytes){
- return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
- RD_KAFKAP_BYTES_SIZE(kbytes));
-}
-
-/**
- * Push (i.e., no copy) Kafka bytes to buffer iovec
- */
-static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
- const rd_kafkap_bytes_t *kbytes){
- rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
- RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
-}
-
-/**
- * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
- */
-static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
- const void *payload, size_t size) {
- size_t r;
- if (!payload)
- size = RD_KAFKAP_BYTES_LEN_NULL;
- r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
- if (payload)
- rd_kafka_buf_write(rkbuf, payload, size);
- return r;
-}
-
-
-
-
-/**
- * Write Kafka Message to buffer
- * The number of bytes written is returned in '*outlenp'.
- *
- * Returns the buffer offset of the first byte.
- */
-size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
- rd_kafka_buf_t *rkbuf,
- int64_t Offset, int8_t MagicByte,
- int8_t Attributes, int64_t Timestamp,
- const void *key, int32_t key_len,
- const void *payload, int32_t len,
- int *outlenp);
-
-/**
- * Start calculating CRC from now and track it in '*crcp'.
- */
-static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
- rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
- rkbuf->rkbuf_crc = rd_crc32_init();
-}
-
-/**
- * Finalizes CRC calculation and returns the calculated checksum.
- */
-static RD_INLINE RD_UNUSED
-rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
- rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
- return rd_crc32_finalize(rkbuf->rkbuf_crc);
-}
-
-
-
-
-
-/**
- * @brief Check if buffer's replyq.version is outdated.
- * @param rkbuf: may be NULL, for convenience.
- *
- * @returns 1 if this is an outdated buffer, else 0.
- */
-static RD_UNUSED RD_INLINE int
-rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
- return rkbuf && rkbuf->rkbuf_replyq.version &&
- rkbuf->rkbuf_replyq.version < version;
-}