You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:41 UTC
[11/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h
new file mode 100644
index 0000000..8721f67
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h
@@ -0,0 +1,636 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#pragma once
+
+#include "rdkafka_topic.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_broker.h"
+
+extern const char *rd_kafka_fetch_states[];
+
+
+/**
+ * @brief Offset statistics
+ */
+struct offset_stats {
+ int64_t fetch_offset; /**< Next offset to fetch */
+ int64_t eof_offset; /**< Last offset we reported EOF for */
+ int64_t hi_offset; /**< Current broker hi offset */
+};
+
+/**
+ * @brief Reset offset_stats struct to default values
+ */
+static RD_UNUSED void rd_kafka_offset_stats_reset (struct offset_stats *offs) {
+ offs->fetch_offset = 0;
+ offs->eof_offset = RD_KAFKA_OFFSET_INVALID;
+ offs->hi_offset = RD_KAFKA_OFFSET_INVALID;
+}
+
+
+
+/**
+ * Topic + Partition combination
+ */
+struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink; /* rd_kafka_t link */
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/
+ CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_fetchlink; /* rkb_fetch_toppars */
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_itopic_t link*/
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;/* rd_kafka_cgrp_t link */
+ rd_kafka_itopic_t *rktp_rkt;
+ shptr_rd_kafka_itopic_t *rktp_s_rkt; /* shared pointer for rktp_rkt */
+ int32_t rktp_partition;
+ //LOCK: toppar_lock() + topic_wrlock()
+ //LOCK: .. in partition_available()
+ int32_t rktp_leader_id; /**< Current leader broker id.
+ * This is updated directly
+ * from metadata. */
+ rd_kafka_broker_t *rktp_leader; /**< Current leader broker
+ * This updated asynchronously
+ * by issuing JOIN op to
+ * broker thread, so be careful
+ * in using this since it
+ * may lag. */
+ rd_kafka_broker_t *rktp_next_leader; /**< Next leader broker after
+ * async migration op. */
+ rd_refcnt_t rktp_refcnt;
+ mtx_t rktp_lock;
+
+ //LOCK: toppar_lock. Should move the lock inside the msgq instead
+ //LOCK: toppar_lock. toppar_insert_msg(), concat_msgq()
+ //LOCK: toppar_lock. toppar_enq_msg(), deq_msg(), insert_msgq()
+ int rktp_msgq_wakeup_fd; /* Wake-up fd */
+ rd_kafka_msgq_t rktp_msgq; /* application->rdkafka queue.
+ * protected by rktp_lock */
+ rd_kafka_msgq_t rktp_xmit_msgq; /* internal broker xmit queue */
+
+ int rktp_fetch; /* On rkb_fetch_toppars list */
+
+ /* Consumer */
+ rd_kafka_q_t *rktp_fetchq; /* Queue of fetched messages
+ * from broker.
+ * Broker thread -> App */
+ rd_kafka_q_t *rktp_ops; /* * -> Main thread */
+
+
+ /**
+ * rktp version barriers
+ *
+ * rktp_version is the application/controller side's
+ * authoritative version, it depicts the most up to date state.
+ * This is what q_filter() matches an rko_version to.
+ *
+ * rktp_op_version is the last/current received state handled
+ * by the toppar in the broker thread. It is updated to rktp_version
+ * when receiving a new op.
+ *
+ * rktp_fetch_version is the current fetcher decision version.
+ * It is used in fetch_decide() to see if the fetch decision
+ * needs to be updated by comparing to rktp_op_version.
+ *
+ * Example:
+ * App thread : Send OP_START (v1 bump): rktp_version=1
+ * Broker thread: Recv OP_START (v1): rktp_op_version=1
+ * Broker thread: fetch_decide() detects that
+ * rktp_op_version != rktp_fetch_version and
+ * sets rktp_fetch_version=1.
+ * Broker thread: next Fetch request has it's tver state set to
+ * rktp_fetch_verison (v1).
+ *
+ * App thread : Send OP_SEEK (v2 bump): rktp_version=2
+ * Broker thread: Recv OP_SEEK (v2): rktp_op_version=2
+ * Broker thread: Recv IO FetchResponse with tver=1,
+ * when enqueued on rktp_fetchq they're discarded
+ * due to old version (tver<rktp_version).
+ * Broker thread: fetch_decide() detects version change and
+ * sets rktp_fetch_version=2.
+ * Broker thread: next Fetch request has tver=2
+ * Broker thread: Recv IO FetchResponse with tver=2 which
+ * is same as rktp_version so message is forwarded
+ * to app.
+ */
+ rd_atomic32_t rktp_version; /* Latest op version.
+ * Authoritative (app thread)*/
+ int32_t rktp_op_version; /* Op version of curr command
+ * state from.
+ * (broker thread) */
+ int32_t rktp_fetch_version; /* Op version of curr fetch.
+ (broker thread) */
+
+ enum {
+ RD_KAFKA_TOPPAR_FETCH_NONE = 0,
+ RD_KAFKA_TOPPAR_FETCH_STOPPING,
+ RD_KAFKA_TOPPAR_FETCH_STOPPED,
+ RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
+ RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
+ RD_KAFKA_TOPPAR_FETCH_ACTIVE,
+ } rktp_fetch_state; /* Broker thread's state */
+
+#define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \
+ ((fetch_state) >= RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
+
+ int32_t rktp_fetch_msg_max_bytes; /* Max number of bytes to
+ * fetch.
+ * Locality: broker thread
+ */
+
+ rd_ts_t rktp_ts_fetch_backoff; /* Back off fetcher for
+ * this partition until this
+ * absolute timestamp
+ * expires. */
+
+ int64_t rktp_query_offset; /* Offset to query broker for*/
+ int64_t rktp_next_offset; /* Next offset to start
+ * fetching from.
+ * Locality: toppar thread */
+ int64_t rktp_last_next_offset; /* Last next_offset handled
+ * by fetch_decide().
+ * Locality: broker thread */
+ int64_t rktp_app_offset; /* Last offset delivered to
+ * application + 1 */
+ int64_t rktp_stored_offset; /* Last stored offset, but
+ * maybe not committed yet. */
+ int64_t rktp_committing_offset; /* Offset currently being
+ * committed */
+ int64_t rktp_committed_offset; /* Last committed offset */
+ rd_ts_t rktp_ts_committed_offset; /* Timestamp of last
+ * commit */
+
+ struct offset_stats rktp_offsets; /* Current offsets.
+ * Locality: broker thread*/
+ struct offset_stats rktp_offsets_fin; /* Finalized offset for stats.
+ * Updated periodically
+ * by broker thread.
+ * Locks: toppar_lock */
+
+ int64_t rktp_hi_offset; /* Current high offset.
+ * Locks: toppar_lock */
+ int64_t rktp_lo_offset; /* Current broker low offset.
+ * This is outside of the stats
+ * struct due to this field
+ * being populated by the
+ * toppar thread rather than
+ * the broker thread.
+ * Locality: toppar thread
+ * Locks: toppar_lock */
+
+ rd_ts_t rktp_ts_offset_lag;
+
+ char *rktp_offset_path; /* Path to offset file */
+ FILE *rktp_offset_fp; /* Offset file pointer */
+ rd_kafka_cgrp_t *rktp_cgrp; /* Belongs to this cgrp */
+
+ int rktp_assigned; /* Partition in cgrp assignment */
+
+ rd_kafka_replyq_t rktp_replyq; /* Current replyq+version
+ * for propagating
+ * major operations, e.g.,
+ * FETCH_STOP. */
+ //LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_DESIRED
+ //LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_UNKNOWN
+ int rktp_flags;
+#define RD_KAFKA_TOPPAR_F_DESIRED 0x1 /* This partition is desired
+ * by a consumer. */
+#define RD_KAFKA_TOPPAR_F_UNKNOWN 0x2 /* Topic is (not yet) seen on
+ * a broker. */
+#define RD_KAFKA_TOPPAR_F_OFFSET_STORE 0x4 /* Offset store is active */
+#define RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING 0x8 /* Offset store stopping */
+#define RD_KAFKA_TOPPAR_F_APP_PAUSE 0x10 /* App pause()d consumption */
+#define RD_KAFKA_TOPPAR_F_LIB_PAUSE 0x20 /* librdkafka paused consumption */
+#define RD_KAFKA_TOPPAR_F_REMOVE 0x40 /* partition removed from cluster */
+#define RD_KAFKA_TOPPAR_F_LEADER_ERR 0x80 /* Operation failed:
+ * leader might be missing.
+ * Typically set from
+ * ProduceResponse failure. */
+
+ shptr_rd_kafka_toppar_t *rktp_s_for_desp; /* Shared pointer for
+ * rkt_desp list */
+ shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; /* Shared pointer for
+ * rkcg_toppars list */
+ shptr_rd_kafka_toppar_t *rktp_s_for_rkb; /* Shared pointer for
+ * rkb_toppars list */
+
+ /*
+ * Timers
+ */
+ rd_kafka_timer_t rktp_offset_query_tmr; /* Offset query timer */
+ rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */
+ rd_kafka_timer_t rktp_offset_sync_tmr; /* Offset file sync timer */
+ rd_kafka_timer_t rktp_consumer_lag_tmr; /* Consumer lag monitoring
+ * timer */
+
+ int rktp_wait_consumer_lag_resp; /* Waiting for consumer lag
+ * response. */
+
+ struct {
+ rd_atomic64_t tx_msgs;
+ rd_atomic64_t tx_bytes;
+ rd_atomic64_t msgs;
+ rd_atomic64_t rx_ver_drops;
+ } rktp_c;
+
+};
+
+
+/**
+ * Check if toppar is paused (consumer).
+ * Locks: toppar_lock() MUST be held.
+ */
+#define RD_KAFKA_TOPPAR_IS_PAUSED(rktp) \
+ ((rktp)->rktp_flags & (RD_KAFKA_TOPPAR_F_APP_PAUSE | \
+ RD_KAFKA_TOPPAR_F_LIB_PAUSE))
+
+
+
+
+/* Converts a shptr..toppar_t to a toppar_t */
+#define rd_kafka_toppar_s2i(s_rktp) rd_shared_ptr_obj(s_rktp)
+
+
+/**
+ * Returns a shared pointer for the topic.
+ */
+#define rd_kafka_toppar_keep(rktp) \
+ rd_shared_ptr_get(rktp, &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t)
+
+#define rd_kafka_toppar_keep_src(func,line,rktp) \
+ rd_shared_ptr_get_src(func, line, rktp, \
+ &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t)
+
+
+/**
+ * Frees a shared pointer previously returned by ..toppar_keep()
+ */
+#define rd_kafka_toppar_destroy(s_rktp) \
+ rd_shared_ptr_put(s_rktp, \
+ &rd_kafka_toppar_s2i(s_rktp)->rktp_refcnt, \
+ rd_kafka_toppar_destroy_final( \
+ rd_kafka_toppar_s2i(s_rktp)))
+
+
+
+
+#define rd_kafka_toppar_lock(rktp) mtx_lock(&(rktp)->rktp_lock)
+#define rd_kafka_toppar_unlock(rktp) mtx_unlock(&(rktp)->rktp_lock)
+
+static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp)
+ RD_UNUSED;
+static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp) {
+ static RD_TLS char ret[256];
+
+ rd_snprintf(ret, sizeof(ret), "%.*s [%"PRId32"]",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition);
+
+ return ret;
+}
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
+ int32_t partition,
+ const char *func, int line);
+#define rd_kafka_toppar_new(rkt,partition) \
+ rd_kafka_toppar_new0(rkt, partition, __FUNCTION__, __LINE__)
+void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
+ int fetch_state);
+void rd_kafka_toppar_insert_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
+void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
+void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
+void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
+ rd_kafka_msgq_t *rkmq);
+void rd_kafka_toppar_concat_msgq (rd_kafka_toppar_t *rktp,
+ rd_kafka_msgq_t *rkmq);
+void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
+ rd_kafka_resp_err_t err);
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
+ const rd_kafka_itopic_t *rkt,
+ int32_t partition,
+ int ua_on_miss);
+#define rd_kafka_toppar_get(rkt,partition,ua_on_miss) \
+ rd_kafka_toppar_get0(__FUNCTION__,__LINE__,rkt,partition,ua_on_miss)
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ int ua_on_miss,
+ int create_on_miss);
+shptr_rd_kafka_toppar_t *
+rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt,
+ int32_t partition,
+ int ua_on_miss,
+ rd_kafka_resp_err_t *errp);
+
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt,
+ int32_t partition);
+void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp);
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt,
+ int32_t partition);
+void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp);
+
+int rd_kafka_toppar_ua_move (rd_kafka_itopic_t *rkt, rd_kafka_msgq_t *rkmq);
+
+void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
+ int64_t Offset);
+
+void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
+ const char *metadata);
+
+void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
+ rd_kafka_broker_t *rkb,
+ int for_removal);
+
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
+ int64_t offset,
+ rd_kafka_q_t *fwdq,
+ rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
+ rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
+ int64_t offset,
+ rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_pause (rd_kafka_toppar_t *rktp,
+ int pause, int flag);
+
+void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
+ rd_kafka_resp_err_t err);
+
+/**
+ * Updates the current toppar fetch round-robin next pointer.
+ */
+static RD_INLINE RD_UNUSED
+void rd_kafka_broker_fetch_toppar_next (rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *sugg_next) {
+ if (CIRCLEQ_EMPTY(&rkb->rkb_fetch_toppars) ||
+ (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_fetch_toppars))
+ rkb->rkb_fetch_toppar_next = NULL;
+ else if (sugg_next)
+ rkb->rkb_fetch_toppar_next = sugg_next;
+ else
+ rkb->rkb_fetch_toppar_next =
+ CIRCLEQ_FIRST(&rkb->rkb_fetch_toppars);
+}
+
+
+rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
+ rd_kafka_broker_t *rkb,
+ int force_remove);
+
+
+
+rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp);
+
+
+void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
+ rd_kafka_replyq_t replyq);
+
+void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
+ int64_t query_offset, int backoff_ms);
+
+
+rd_kafka_assignor_t *
+rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol);
+
+
+rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp,
+ int proper_broker);
+void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
+ const char *reason,
+ rd_kafka_resp_err_t err);
+
+rd_kafka_resp_err_t
+rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
+ rd_kafka_topic_partition_list_t *partitions);
+
+
+rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
+ int32_t partition);
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp);
+
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist,
+ const char *topic, int32_t partition,
+ shptr_rd_kafka_toppar_t *_private);
+
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_list_upsert (
+ rd_kafka_topic_partition_list_t *rktparlist,
+ const char *topic, int32_t partition);
+
+int rd_kafka_topic_partition_match (rd_kafka_t *rk,
+ const rd_kafka_group_member_t *rkgm,
+ const rd_kafka_topic_partition_t *rktpar,
+ const char *topic, int *matched_by_regex);
+
+
+void rd_kafka_topic_partition_list_sort_by_topic (
+ rd_kafka_topic_partition_list_t *rktparlist);
+
+void
+rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
+ int64_t offset);
+
+int rd_kafka_topic_partition_list_set_offsets (
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ int from_rktp, int64_t def_value, int is_commit);
+
+int rd_kafka_topic_partition_list_count_abs_offsets (
+ const rd_kafka_topic_partition_list_t *rktparlist);
+
+shptr_rd_kafka_toppar_t *
+rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
+ rd_kafka_topic_partition_t *rktpar);
+
+shptr_rd_kafka_toppar_t *
+rd_kafka_topic_partition_list_get_toppar (
+ rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar);
+
+void
+rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t
+ *rktparlist);
+
+int
+rd_kafka_topic_partition_list_get_leaders (
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_list_t *leaders, rd_list_t *query_topics);
+
+rd_kafka_resp_err_t
+rd_kafka_topic_partition_list_query_leaders (
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_list_t *leaders, int timeout_ms);
+
+int
+rd_kafka_topic_partition_list_get_topics (
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_list_t *rkts);
+
+int
+rd_kafka_topic_partition_list_get_topic_names (
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ rd_list_t *topics, int include_regex);
+
+void
+rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac,
+ const rd_kafka_topic_partition_list_t *rktparlist);
+
+#define RD_KAFKA_FMT_F_OFFSET 0x1 /* Print offset */
+#define RD_KAFKA_FMT_F_ONLY_ERR 0x2 /* Only include errored entries */
+#define RD_KAFKA_FMT_F_NO_ERR 0x4 /* Dont print error string */
+const char *
+rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
+ char *dest, size_t dest_size,
+ int fmt_flags);
+
+void
+rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
+ const rd_kafka_topic_partition_list_t *src);
+
+int rd_kafka_topic_partition_leader_cmp (const void *_a, const void *_b);
+
+rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ int (*match) (const void *elem, const void *opaque),
+ void *opaque);
+
+size_t
+rd_kafka_topic_partition_list_sum (
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
+ void *opaque);
+
+void rd_kafka_topic_partition_list_set_err (
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_kafka_resp_err_t err);
+
+int rd_kafka_topic_partition_list_regex_cnt (
+ const rd_kafka_topic_partition_list_t *rktparlist);
+
+/**
+ * @brief Toppar + Op version tuple used for mapping Fetched partitions
+ * back to their fetch versions.
+ */
+struct rd_kafka_toppar_ver {
+ shptr_rd_kafka_toppar_t *s_rktp;
+ int32_t version;
+};
+
+
+/**
+ * @brief Toppar + Op version comparator.
+ */
+static RD_INLINE RD_UNUSED
+int rd_kafka_toppar_ver_cmp (const void *_a, const void *_b) {
+ const struct rd_kafka_toppar_ver *a = _a, *b = _b;
+ const rd_kafka_toppar_t *rktp_a = rd_kafka_toppar_s2i(a->s_rktp);
+ const rd_kafka_toppar_t *rktp_b = rd_kafka_toppar_s2i(b->s_rktp);
+ int r;
+
+ if (rktp_a->rktp_rkt != rktp_b->rktp_rkt &&
+ (r = rd_kafkap_str_cmp(rktp_a->rktp_rkt->rkt_topic,
+ rktp_b->rktp_rkt->rkt_topic)))
+ return r;
+
+ return rktp_a->rktp_partition - rktp_b->rktp_partition;
+}
+
+/**
+ * @brief Frees up resources for \p tver but not the \p tver itself.
+ */
+static RD_INLINE RD_UNUSED
+void rd_kafka_toppar_ver_destroy (struct rd_kafka_toppar_ver *tver) {
+ rd_kafka_toppar_destroy(tver->s_rktp);
+}
+
+
+/**
+ * @returns 1 if rko version is outdated, else 0.
+ */
+static RD_INLINE RD_UNUSED
+int rd_kafka_op_version_outdated (rd_kafka_op_t *rko, int version) {
+ if (!rko->rko_version)
+ return 0;
+
+ if (version)
+ return rko->rko_version < version;
+
+ if (rko->rko_rktp)
+ return rko->rko_version <
+ rd_atomic32_get(&rd_kafka_toppar_s2i(
+ rko->rko_rktp)->rktp_version);
+ return 0;
+}
+
+void
+rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets);
+
+void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp);
+
+
+/**
+ * @brief Represents a leader and the partitions it is leader for.
+ */
+struct rd_kafka_partition_leader {
+ rd_kafka_broker_t *rkb;
+ rd_kafka_topic_partition_list_t *partitions;
+};
+
+static RD_UNUSED void
+rd_kafka_partition_leader_destroy (struct rd_kafka_partition_leader *leader) {
+ rd_kafka_broker_destroy(leader->rkb);
+ rd_kafka_topic_partition_list_destroy(leader->partitions);
+ rd_free(leader);
+}
+
+static RD_UNUSED struct rd_kafka_partition_leader *
+rd_kafka_partition_leader_new (rd_kafka_broker_t *rkb) {
+ struct rd_kafka_partition_leader *leader = rd_malloc(sizeof(*leader));
+ leader->rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+ leader->partitions = rd_kafka_topic_partition_list_new(0);
+ return leader;
+}
+
+static RD_UNUSED
+int rd_kafka_partition_leader_cmp (const void *_a, const void *_b) {
+ const struct rd_kafka_partition_leader *a = _a, *b = _b;
+ return rd_kafka_broker_cmp(a->rkb, b->rkb);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c
new file mode 100644
index 0000000..fc2d711
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c
@@ -0,0 +1,224 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_pattern.h"
+
+void rd_kafka_pattern_destroy (rd_kafka_pattern_list_t *plist,
+ rd_kafka_pattern_t *rkpat) {
+ TAILQ_REMOVE(&plist->rkpl_head, rkpat, rkpat_link);
+ rd_regex_destroy(rkpat->rkpat_re);
+ rd_free(rkpat->rkpat_orig);
+ rd_free(rkpat);
+}
+
+void rd_kafka_pattern_add (rd_kafka_pattern_list_t *plist,
+ rd_kafka_pattern_t *rkpat) {
+ TAILQ_INSERT_TAIL(&plist->rkpl_head, rkpat, rkpat_link);
+}
+
+rd_kafka_pattern_t *rd_kafka_pattern_new (const char *pattern,
+ char *errstr, int errstr_size) {
+ rd_kafka_pattern_t *rkpat;
+
+ rkpat = rd_calloc(1, sizeof(*rkpat));
+
+ /* Verify and precompile pattern */
+ if (!(rkpat->rkpat_re = rd_regex_comp(pattern, errstr, errstr_size))) {
+ rd_free(rkpat);
+ return NULL;
+ }
+
+ rkpat->rkpat_orig = rd_strdup(pattern);
+
+ return rkpat;
+}
+
+
+
+int rd_kafka_pattern_match (rd_kafka_pattern_list_t *plist, const char *str) {
+ rd_kafka_pattern_t *rkpat;
+
+ TAILQ_FOREACH(rkpat, &plist->rkpl_head, rkpat_link) {
+ if (rd_regex_exec(rkpat->rkpat_re, str))
+ return 1;
+ }
+
+ return 0;
+}
+
+
+/**
+ * Append pattern to list.
+ */
+int rd_kafka_pattern_list_append (rd_kafka_pattern_list_t *plist,
+ const char *pattern,
+ char *errstr, int errstr_size) {
+ rd_kafka_pattern_t *rkpat;
+ rkpat = rd_kafka_pattern_new(pattern, errstr, errstr_size);
+ if (!rkpat)
+ return -1;
+
+ rd_kafka_pattern_add(plist, rkpat);
+ return 0;
+}
+
+/**
+ * Remove matching patterns.
+ * Returns the number of removed patterns.
+ */
+int rd_kafka_pattern_list_remove (rd_kafka_pattern_list_t *plist,
+ const char *pattern) {
+ rd_kafka_pattern_t *rkpat, *rkpat_tmp;
+ int cnt = 0;
+
+ TAILQ_FOREACH_SAFE(rkpat, &plist->rkpl_head, rkpat_link, rkpat_tmp) {
+ if (!strcmp(rkpat->rkpat_orig, pattern)) {
+ rd_kafka_pattern_destroy(plist, rkpat);
+ cnt++;
+ }
+ }
+ return cnt;
+}
+
+/**
+ * Parse a patternlist and populate a list with it.
+ */
+static int rd_kafka_pattern_list_parse (rd_kafka_pattern_list_t *plist,
+ const char *patternlist,
+ char *errstr, size_t errstr_size) {
+ char *s;
+ rd_strdupa(&s, patternlist);
+
+ while (s && *s) {
+ char *t = s;
+ char re_errstr[256];
+
+ /* Find separator */
+ while ((t = strchr(t, ','))) {
+ if (t > s && *(t-1) == ',') {
+ /* separator was escaped,
+ remove escape and scan again. */
+ memmove(t-1, t, strlen(t)+1);
+ t++;
+ } else {
+ *t = '\0';
+ t++;
+ break;
+ }
+ }
+
+ if (rd_kafka_pattern_list_append(plist, s, re_errstr,
+ sizeof(re_errstr)) == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Failed to parse pattern \"%s\": "
+ "%s", s, re_errstr);
+ rd_kafka_pattern_list_clear(plist);
+ return -1;
+ }
+
+ s = t;
+ }
+
+ return 0;
+}
+
+
+/**
+ * Clear a pattern list.
+ */
+void rd_kafka_pattern_list_clear (rd_kafka_pattern_list_t *plist) {
+ rd_kafka_pattern_t *rkpat;
+
+ while ((rkpat = TAILQ_FIRST(&plist->rkpl_head)))
+ rd_kafka_pattern_destroy(plist, rkpat);
+
+ if (plist->rkpl_orig) {
+ rd_free(plist->rkpl_orig);
+ plist->rkpl_orig = NULL;
+ }
+}
+
+
+/**
+ * Free a pattern list previously created with list_new()
+ */
+void rd_kafka_pattern_list_destroy (rd_kafka_pattern_list_t *plist) {
+ rd_kafka_pattern_list_clear(plist);
+ rd_free(plist);
+}
+
+/**
+ * Initialize a pattern list, optionally populating it with the
+ * comma-separated patterns in 'patternlist'.
+ */
+int rd_kafka_pattern_list_init (rd_kafka_pattern_list_t *plist,
+ const char *patternlist,
+ char *errstr, size_t errstr_size) {
+ TAILQ_INIT(&plist->rkpl_head);
+ if (patternlist) {
+ if (rd_kafka_pattern_list_parse(plist, patternlist,
+ errstr, errstr_size) == -1)
+ return -1;
+ plist->rkpl_orig = rd_strdup(patternlist);
+ } else
+ plist->rkpl_orig = NULL;
+
+ return 0;
+}
+
+
+/**
+ * Allocate and initialize a new list.
+ */
+rd_kafka_pattern_list_t *rd_kafka_pattern_list_new (const char *patternlist,
+ char *errstr,
+ int errstr_size) {
+ rd_kafka_pattern_list_t *plist;
+
+ plist = rd_calloc(1, sizeof(*plist));
+
+ if (rd_kafka_pattern_list_init(plist, patternlist,
+ errstr, errstr_size) == -1) {
+ rd_free(plist);
+ return NULL;
+ }
+
+ return plist;
+}
+
+
+/**
+ * Make a copy of a pattern list.
+ */
+rd_kafka_pattern_list_t *
+rd_kafka_pattern_list_copy (rd_kafka_pattern_list_t *src) {
+ char errstr[16];
+ return rd_kafka_pattern_list_new(src->rkpl_orig,
+ errstr, sizeof(errstr));
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h
new file mode 100644
index 0000000..6e6f976
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h
@@ -0,0 +1,65 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#pragma once
+
+#include "rdregex.h"
+
+typedef struct rd_kafka_pattern_s {
+ TAILQ_ENTRY(rd_kafka_pattern_s) rkpat_link;
+
+ rd_regex_t *rkpat_re; /* Compiled regex */
+ char *rkpat_orig; /* Original pattern */
+} rd_kafka_pattern_t;
+
+typedef struct rd_kafka_pattern_list_s {
+ TAILQ_HEAD(,rd_kafka_pattern_s) rkpl_head;
+ char *rkpl_orig;
+} rd_kafka_pattern_list_t;
+
+void rd_kafka_pattern_destroy (rd_kafka_pattern_list_t *plist,
+ rd_kafka_pattern_t *rkpat);
+void rd_kafka_pattern_add (rd_kafka_pattern_list_t *plist,
+ rd_kafka_pattern_t *rkpat);
+rd_kafka_pattern_t *rd_kafka_pattern_new (const char *pattern,
+ char *errstr, int errstr_size);
+int rd_kafka_pattern_match (rd_kafka_pattern_list_t *plist, const char *str);
+int rd_kafka_pattern_list_append (rd_kafka_pattern_list_t *plist,
+ const char *pattern,
+ char *errstr, int errstr_size);
+int rd_kafka_pattern_list_remove (rd_kafka_pattern_list_t *plist,
+ const char *pattern);
+void rd_kafka_pattern_list_clear (rd_kafka_pattern_list_t *plist);
+void rd_kafka_pattern_list_destroy (rd_kafka_pattern_list_t *plist);
+int rd_kafka_pattern_list_init (rd_kafka_pattern_list_t *plist,
+ const char *patternlist,
+ char *errstr, size_t errstr_size);
+rd_kafka_pattern_list_t *rd_kafka_pattern_list_new (const char *patternlist,
+ char *errstr,
+ int errstr_size);
+rd_kafka_pattern_list_t *
+rd_kafka_pattern_list_copy (rd_kafka_pattern_list_t *src);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c
new file mode 100644
index 0000000..b899899
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c
@@ -0,0 +1,209 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_plugin.h"
+#include "rddl.h"
+
+
+typedef struct rd_kafka_plugin_s {
+ char *rkplug_path; /* Library path */
+ rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */
+ void *rkplug_handle; /* dlopen (or similar) handle */
+ void *rkplug_opaque; /* Plugin's opaque */
+
+} rd_kafka_plugin_t;
+
+
+/**
+ * @brief Plugin path comparator
+ */
+static int rd_kafka_plugin_cmp (const void *_a, const void *_b) {
+ const rd_kafka_plugin_t *a = _a, *b = _b;
+
+ return strcmp(a->rkplug_path, b->rkplug_path);
+}
+
+
+/**
+ * @brief Add plugin (by library path) and calls its conf_init() constructor
+ *
+ * @returns an error code on error.
+ * @remark duplicate plugins are silently ignored.
+ *
+ * @remark Libraries are refcounted and thus not unloaded until all
+ * plugins referencing the library have been destroyed.
+ * (dlopen() and LoadLibrary() does this for us)
+ */
+static rd_kafka_resp_err_t
+rd_kafka_plugin_new (rd_kafka_conf_t *conf, const char *path,
+ char *errstr, size_t errstr_size) {
+ rd_kafka_plugin_t *rkplug;
+ const rd_kafka_plugin_t skel = { .rkplug_path = (char *)path };
+ rd_kafka_plugin_f_conf_init_t *conf_init;
+ rd_kafka_resp_err_t err;
+ void *handle;
+ void *plug_opaque = NULL;
+
+ /* Avoid duplicates */
+ if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) {
+ rd_snprintf(errstr, errstr_size,
+ "Ignoring duplicate plugin %s", path);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+ "Loading plugin \"%s\"", path);
+
+ /* Attempt to load library */
+ if (!(handle = rd_dl_open(path, errstr, errstr_size))) {
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+ "Failed to load plugin \"%s\": %s",
+ path, errstr);
+ return RD_KAFKA_RESP_ERR__FS;
+ }
+
+ /* Find conf_init() function */
+ if (!(conf_init = rd_dl_sym(handle, "conf_init",
+ errstr, errstr_size))) {
+ rd_dl_close(handle);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ /* Call conf_init() */
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT",
+ "Calling plugin \"%s\" conf_init()", path);
+
+ if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) {
+ rd_dl_close(handle);
+ return err;
+ }
+
+ rkplug = rd_calloc(1, sizeof(*rkplug));
+ rkplug->rkplug_path = rd_strdup(path);
+ rkplug->rkplug_handle = handle;
+ rkplug->rkplug_opaque = plug_opaque;
+
+ rd_list_add(&conf->plugins, rkplug);
+
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+ "Plugin \"%s\" loaded", path);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Free the plugin, any conf_destroy() interceptors will have been
+ * called prior to this call.
+ * @remark plugin is not removed from any list (caller's responsibility)
+ * @remark this relies on the actual library loader to refcount libraries,
+ * especially in the config copy case.
+ * This is true for POSIX dlopen() and Win32 LoadLibrary().
+ * @locality application thread
+ */
+static void rd_kafka_plugin_destroy (rd_kafka_plugin_t *rkplug) {
+ rd_dl_close(rkplug->rkplug_handle);
+ rd_free(rkplug->rkplug_path);
+ rd_free(rkplug);
+}
+
+
+
+/**
+ * @brief Initialize all configured plugins.
+ *
+ * @remark Any previously loaded plugins will be unloaded.
+ *
+ * @returns the error code of the first failing plugin.
+ * @locality application thread calling rd_kafka_new().
+ */
+static rd_kafka_conf_res_t
+rd_kafka_plugins_conf_set0 (rd_kafka_conf_t *conf, const char *paths,
+ char *errstr, size_t errstr_size) {
+ char *s;
+
+ rd_list_destroy(&conf->plugins);
+ rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy);
+
+ if (!paths || !*paths)
+ return RD_KAFKA_CONF_OK;
+
+ /* Split paths by ; */
+ rd_strdupa(&s, paths);
+
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+ "Loading plugins from conf object %p: \"%s\"",
+ conf, paths);
+
+ while (s && *s) {
+ char *path = s;
+ char *t;
+ rd_kafka_resp_err_t err;
+
+ if ((t = strchr(s, ';'))) {
+ *t = '\0';
+ s = t+1;
+ } else {
+ s = NULL;
+ }
+
+ if ((err = rd_kafka_plugin_new(conf, path,
+ errstr, errstr_size))) {
+ /* Failed to load plugin */
+ size_t elen = errstr_size > 0 ? strlen(errstr) : 0;
+
+ /* See if there is room for appending the
+ * plugin path to the error message. */
+ if (elen + strlen("(plugin )") + strlen(path) <
+ errstr_size)
+ rd_snprintf(errstr+elen, errstr_size-elen,
+ " (plugin %s)", path);
+
+ rd_list_destroy(&conf->plugins);
+ return RD_KAFKA_CONF_INVALID;
+ }
+ }
+
+ return RD_KAFKA_CONF_OK;
+}
+
+
+/**
+ * @brief Conf setter for "plugin.library.paths"
+ */
+rd_kafka_conf_res_t rd_kafka_plugins_conf_set (
+ int scope, void *pconf, const char *name, const char *value,
+ void *dstptr, rd_kafka_conf_set_mode_t set_mode,
+ char *errstr, size_t errstr_size) {
+
+ assert(scope == _RK_GLOBAL);
+ return rd_kafka_plugins_conf_set0((rd_kafka_conf_t *)pconf,
+ set_mode == _RK_CONF_PROP_SET_DEL ?
+ NULL : value, errstr, errstr_size);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h
new file mode 100644
index 0000000..b588a7d
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h
@@ -0,0 +1,37 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKA_PLUGIN_H
+#define _RDKAFKA_PLUGIN_H
+
+rd_kafka_conf_res_t rd_kafka_plugins_conf_set (
+ int scope, void *conf, const char *name, const char *value,
+ void *dstptr, rd_kafka_conf_set_mode_t set_mode,
+ char *errstr, size_t errstr_size);
+
+#endif /* _RDKAFKA_PLUGIN_H */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h
new file mode 100644
index 0000000..d778c4d
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h
@@ -0,0 +1,498 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+
+#include "rdendian.h"
+#include "rdvarint.h"
+
+
+
+/*
+ * Kafka protocol definitions.
+ */
+
+#define RD_KAFKA_PORT 9092
+#define RD_KAFKA_PORT_STR "9092"
+
+
+/**
+ * Request types
+ */
+struct rd_kafkap_reqhdr {
+ int32_t Size;
+ int16_t ApiKey;
+#define RD_KAFKAP_None -1
+#define RD_KAFKAP_Produce 0
+#define RD_KAFKAP_Fetch 1
+#define RD_KAFKAP_Offset 2
+#define RD_KAFKAP_Metadata 3
+#define RD_KAFKAP_LeaderAndIsr 4
+#define RD_KAFKAP_StopReplica 5
+#define RD_KAFKAP_OffsetCommit 8
+#define RD_KAFKAP_OffsetFetch 9
+#define RD_KAFKAP_GroupCoordinator 10
+#define RD_KAFKAP_JoinGroup 11
+#define RD_KAFKAP_Heartbeat 12
+#define RD_KAFKAP_LeaveGroup 13
+#define RD_KAFKAP_SyncGroup 14
+#define RD_KAFKAP_DescribeGroups 15
+#define RD_KAFKAP_ListGroups 16
+#define RD_KAFKAP_SaslHandshake 17
+#define RD_KAFKAP_ApiVersion 18
+#define RD_KAFKAP_CreateTopics 19
+#define RD_KAFKAP_DeleteTopics 20
+#define RD_KAFKAP_DeleteRecords 21
+#define RD_KAFKAP_InitProducerId 22
+#define RD_KAFKAP_OffsetForLeaderEpoch 23
+#define RD_KAFKAP_AddPartitionsToTxn 24
+#define RD_KAFKAP_AddOffsetsToTxn 25
+#define RD_KAFKAP_EndTxn 26
+#define RD_KAFKAP_WriteTxnMarkers 27
+#define RD_KAFKAP_TxnOffsetCommit 28
+#define RD_KAFKAP_DescribeAcls 29
+#define RD_KAFKAP_CreateAcls 30
+#define RD_KAFKAP_DeleteAcls 31
+#define RD_KAFKAP_DescribeConfigs 32
+#define RD_KAFKAP_AlterConfigs 33
+#define RD_KAFKAP__NUM 34
+ int16_t ApiVersion;
+ int32_t CorrId;
+ /* ClientId follows */
+};
+
+#define RD_KAFKAP_REQHDR_SIZE (4+2+2+4)
+#define RD_KAFKAP_RESHDR_SIZE (4+4)
+
+/**
+ * Response header
+ */
+struct rd_kafkap_reshdr {
+ int32_t Size;
+ int32_t CorrId;
+};
+
+
+
+static RD_UNUSED
+const char *rd_kafka_ApiKey2str (int16_t ApiKey) {
+ static const char *names[] = {
+ [RD_KAFKAP_Produce] = "Produce",
+ [RD_KAFKAP_Fetch] = "Fetch",
+ [RD_KAFKAP_Offset] = "Offset",
+ [RD_KAFKAP_Metadata] = "Metadata",
+ [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
+ [RD_KAFKAP_StopReplica] = "StopReplica",
+ [RD_KAFKAP_OffsetCommit] = "OffsetCommit",
+ [RD_KAFKAP_OffsetFetch] = "OffsetFetch",
+ [RD_KAFKAP_GroupCoordinator] = "GroupCoordinator",
+ [RD_KAFKAP_JoinGroup] = "JoinGroup",
+ [RD_KAFKAP_Heartbeat] = "Heartbeat",
+ [RD_KAFKAP_LeaveGroup] = "LeaveGroup",
+ [RD_KAFKAP_SyncGroup] = "SyncGroup",
+ [RD_KAFKAP_DescribeGroups] = "DescribeGroups",
+ [RD_KAFKAP_ListGroups] = "ListGroups",
+ [RD_KAFKAP_SaslHandshake] = "SaslHandshake",
+ [RD_KAFKAP_ApiVersion] = "ApiVersion",
+ [RD_KAFKAP_CreateTopics] = "CreateTopics",
+ [RD_KAFKAP_DeleteTopics] = "DeleteTopics",
+ [RD_KAFKAP_DeleteRecords] = "DeleteRecords",
+ [RD_KAFKAP_InitProducerId] = "InitProducerId",
+ [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
+ [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
+ [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
+ [RD_KAFKAP_EndTxn] = "EndTxn",
+ [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
+ [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
+ [RD_KAFKAP_DescribeAcls] = "DescribeAcls",
+ [RD_KAFKAP_CreateAcls] = "CreateAcls",
+ [RD_KAFKAP_DeleteAcls] = "DeleteAcls",
+ [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
+ [RD_KAFKAP_AlterConfigs] = "AlterConfigs"
+ };
+ static RD_TLS char ret[32];
+
+ if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names)) {
+ rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
+ return ret;
+ }
+
+ return names[ApiKey];
+}
+
+
+
+
+
+
+
+
+/**
+ * @brief ApiKey version support tuple.
+ */
+struct rd_kafka_ApiVersion {
+ int16_t ApiKey;
+ int16_t MinVer;
+ int16_t MaxVer;
+};
+
+/**
+ * @brief ApiVersion.ApiKey comparator.
+ */
+static RD_UNUSED int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) {
+ const struct rd_kafka_ApiVersion *a = _a, *b = _b;
+
+ return a->ApiKey - b->ApiKey;
+}
+
+
+
+#define RD_KAFKAP_READ_UNCOMMITTED 0
+#define RD_KAFKAP_READ_COMMITTED 1
+
+
+/**
+ *
+ * Kafka protocol string representation prefixed with a convenience header
+ *
+ * Serialized format:
+ * { uint16, data.. }
+ *
+ */
+typedef struct rd_kafkap_str_s {
+ /* convenience header (aligned access, host endian) */
+ int len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
+ const char *str; /* points into data[] or other memory,
+ * not NULL-terminated */
+} rd_kafkap_str_t;
+
+
+#define RD_KAFKAP_STR_LEN_NULL -1
+#define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)
+
+/* Returns the length of the string of a kafka protocol string representation */
+#define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
+#define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)
+
+/* Returns the actual size of a kafka protocol string representation. */
+#define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
+#define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)
+
+
+/* Serialized Kafka string: only works for _new() kstrs */
+#define RD_KAFKAP_STR_SER(kstr) ((kstr)+1)
+
+/* Macro suitable for "%.*s" printing. */
+#define RD_KAFKAP_STR_PR(kstr) \
+ (int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
+ (kstr)->str
+
+/* strndupa() a Kafka string */
+#define RD_KAFKAP_STR_DUPA(destptr,kstr) \
+ rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))
+
+/* strndup() a Kafka string */
+#define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))
+
+/**
+ * Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
+ */
+static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) {
+ rd_free(kstr);
+}
+
+
+
+/**
+ * Allocate a new Kafka string and make a copy of 'str'.
+ * If 'len' is -1 the length will be calculated.
+ * Supports Kafka NULL strings.
+ * Nul-terminates the string, but the trailing \0 is not part of
+ * the serialized string.
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) {
+ rd_kafkap_str_t *kstr;
+ int16_t klen;
+
+ if (!str)
+ len = RD_KAFKAP_STR_LEN_NULL;
+ else if (len == -1)
+ len = str ? (int)strlen(str) : RD_KAFKAP_STR_LEN_NULL;
+
+ kstr = rd_malloc(sizeof(*kstr) + 2 +
+ (len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
+ kstr->len = len;
+
+ /* Serialised format: 16-bit string length */
+ klen = htobe16(len);
+ memcpy(kstr+1, &klen, 2);
+
+ /* Serialised format: non null-terminated string */
+ if (len == RD_KAFKAP_STR_LEN_NULL)
+ kstr->str = NULL;
+ else {
+ kstr->str = ((const char *)(kstr+1))+2;
+ memcpy((void *)kstr->str, str, len);
+ ((char *)kstr->str)[len] = '\0';
+ }
+
+ return kstr;
+}
+
+
+/**
+ * Makes a copy of `src`. The copy will be fully allocated and should
+ * be freed with rd_kafka_pstr_destroy()
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) {
+ return rd_kafkap_str_new(src->str, src->len);
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
+ const rd_kafkap_str_t *b) {
+ int minlen = RD_MIN(a->len, b->len);
+ int r = memcmp(a->str, b->str, minlen);
+ if (r)
+ return r;
+ else
+ return a->len - b->len;
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a,
+ const char *str) {
+ int len = (int)strlen(str);
+ int minlen = RD_MIN(a->len, len);
+ int r = memcmp(a->str, str, minlen);
+ if (r)
+ return r;
+ else
+ return a->len - len;
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str,
+ const rd_kafkap_str_t *b){
+ int len = (int)strlen(str);
+ int minlen = RD_MIN(b->len, len);
+ int r = memcmp(str, b->str, minlen);
+ if (r)
+ return r;
+ else
+ return len - b->len;
+}
+
+
+
+/**
+ *
+ * Kafka protocol bytes array representation prefixed with a convenience header
+ *
+ * Serialized format:
+ * { uint32, data.. }
+ *
+ */
+typedef struct rd_kafkap_bytes_s {
+ /* convenience header (aligned access, host endian) */
+ int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
+ const void *data; /* points just past the struct, or other memory,
+ * not NULL-terminated */
+ const char _data[1]; /* Bytes following struct when new()ed */
+} rd_kafkap_bytes_t;
+
+
+#define RD_KAFKAP_BYTES_LEN_NULL -1
+#define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
+ ((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)
+
+/* Returns the length of the bytes of a kafka protocol bytes representation */
+#define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len))
+#define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)
+
+/* Returns the actual size of a kafka protocol bytes representation. */
+#define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
+#define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)
+
+
+/* Serialized Kafka bytes: only works for _new() kbytes */
+#define RD_KAFKAP_BYTES_SER(kbytes) ((kbytes)+1)
+
+
+/**
+ * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
+ */
+static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) {
+ rd_free(kbytes);
+}
+
+
+/**
+ * @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
+ * If \p len > 0 but \p bytes is NULL no copying is performed by
+ * the bytes structure will be allocated to fit \p size bytes.
+ *
+ * Supports:
+ * - Kafka NULL bytes (bytes==NULL,len==0),
+ * - Empty bytes (bytes!=NULL,len==0)
+ * - Copy data (bytes!=NULL,len>0)
+ * - No-copy, just alloc (bytes==NULL,len>0)
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) {
+ rd_kafkap_bytes_t *kbytes;
+ int32_t klen;
+
+ if (!bytes && !len)
+ len = RD_KAFKAP_BYTES_LEN_NULL;
+
+ kbytes = rd_malloc(sizeof(*kbytes) + 4 +
+ (len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
+ kbytes->len = len;
+
+ klen = htobe32(len);
+ memcpy(kbytes+1, &klen, 4);
+
+ if (len == RD_KAFKAP_BYTES_LEN_NULL)
+ kbytes->data = NULL;
+ else {
+ kbytes->data = ((const char *)(kbytes+1))+4;
+ if (bytes)
+ memcpy((void *)kbytes->data, bytes, len);
+ }
+
+ return kbytes;
+}
+
+
+/**
+ * Makes a copy of `src`. The copy will be fully allocated and should
+ * be freed with rd_kafkap_bytes_destroy()
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) {
+ return rd_kafkap_bytes_new(src->data, src->len);
+}
+
+
+static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
+ const rd_kafkap_bytes_t *b) {
+ int minlen = RD_MIN(a->len, b->len);
+ int r = memcmp(a->data, b->data, minlen);
+ if (r)
+ return r;
+ else
+ return a->len - b->len;
+}
+
+static RD_INLINE RD_UNUSED
+int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a,
+ const char *data, int len) {
+ int minlen = RD_MIN(a->len, len);
+ int r = memcmp(a->data, data, minlen);
+ if (r)
+ return r;
+ else
+ return a->len - len;
+}
+
+
+
+
+typedef struct rd_kafka_buf_s rd_kafka_buf_t;
+
+
+#define RD_KAFKA_NODENAME_SIZE 128
+
+
+
+
+/**
+ * @brief Message overheads (worst-case)
+ */
+
+/**
+ * MsgVersion v0..v1
+ */
+/* Offset + MessageSize */
+#define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4)
+/* CRC + Magic + Attr + KeyLen + ValueLen */
+#define RD_KAFKAP_MESSAGE_V0_HDR_SIZE (4+1+1+4+4)
+/* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
+#define RD_KAFKAP_MESSAGE_V1_HDR_SIZE (4+1+1+8+4+4)
+/* Maximum per-message overhead */
+#define RD_KAFKAP_MESSAGE_V0_OVERHEAD \
+ (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
+#define RD_KAFKAP_MESSAGE_V1_OVERHEAD \
+ (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)
+
+/**
+ * MsgVersion v2
+ */
+#define RD_KAFKAP_MESSAGE_V2_OVERHEAD \
+ ( \
+ /* Length (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + \
+ /* Attributes */ \
+ 1 + \
+ /* TimestampDelta (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int64_t) + \
+ /* OffsetDelta (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + \
+ /* KeyLen (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + \
+ /* ValueLen (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + \
+ /* HeaderCnt (varint): */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) \
+ )
+
+
+
+/**
+ * @brief MessageSets are not explicitly versioned but depends on the
+ * Produce/Fetch API version and the encompassed Message versions.
+ * We use the Message version (MsgVersion, aka MagicByte) to describe
+ * the MessageSet version, that is, MsgVersion <= 1 uses the old
+ * MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
+ */
+
+/* Old MessageSet header: none */
+#define RD_KAFKAP_MSGSET_V0_SIZE 0
+
+/* MessageSet v2 header */
+#define RD_KAFKAP_MSGSET_V2_SIZE (8+4+4+1+4+2+4+8+8+8+2+4+4)
+
+/* Byte offsets for MessageSet fields */
+#define RD_KAFKAP_MSGSET_V2_OF_Length (8)
+#define RD_KAFKAP_MSGSET_V2_OF_CRC (8+4+4+1)
+#define RD_KAFKAP_MSGSET_V2_OF_Attributes (8+4+4+1+4)
+#define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta (8+4+4+1+4+2)
+#define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp (8+4+4+1+4+2+4)
+#define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp (8+4+4+1+4+2+4+8)
+#define RD_KAFKAP_MSGSET_V2_OF_RecordCount (8+4+4+1+4+2+4+8+8+8+2+4)