You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:41 UTC

[11/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h
new file mode 100644
index 0000000..8721f67
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_partition.h
@@ -0,0 +1,636 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#pragma once
+
+#include "rdkafka_topic.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_broker.h"
+
+extern const char *rd_kafka_fetch_states[];
+
+
+/**
+ * @brief Offset statistics
+ */
+struct offset_stats {
+        int64_t fetch_offset; /**< Next offset to fetch */
+        int64_t eof_offset;   /**< Last offset we reported EOF for */
+        int64_t hi_offset;    /**< Current broker hi offset */
+};
+
+/**
+ * @brief Reset offset_stats struct to default values
+ */
+static RD_UNUSED void rd_kafka_offset_stats_reset (struct offset_stats *offs) {
+        offs->fetch_offset = 0;
+        offs->eof_offset = RD_KAFKA_OFFSET_INVALID;
+        offs->hi_offset = RD_KAFKA_OFFSET_INVALID;
+}
+
+
+
+/**
+ * Topic + Partition combination
+ */
+struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
+	TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink;  /* rd_kafka_t link */
+	TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/
+        CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_fetchlink; /* rkb_fetch_toppars */
+	TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_itopic_t link*/
+        TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;/* rd_kafka_cgrp_t link */
+        rd_kafka_itopic_t       *rktp_rkt;
+        shptr_rd_kafka_itopic_t *rktp_s_rkt;  /* shared pointer for rktp_rkt */
+	int32_t            rktp_partition;
+        //LOCK: toppar_lock() + topic_wrlock()
+        //LOCK: .. in partition_available()
+        int32_t            rktp_leader_id;   /**< Current leader broker id.
+                                              *   This is updated directly
+                                              *   from metadata. */
+	rd_kafka_broker_t *rktp_leader;      /**< Current leader broker
+                                              *   This updated asynchronously
+                                              *   by issuing JOIN op to
+                                              *   broker thread, so be careful
+                                              *   in using this since it
+                                              *   may lag. */
+        rd_kafka_broker_t *rktp_next_leader; /**< Next leader broker after
+                                              *   async migration op. */
+	rd_refcnt_t        rktp_refcnt;
+	mtx_t              rktp_lock;
+
+        //LOCK: toppar_lock. Should move the lock inside the msgq instead
+        //LOCK: toppar_lock. toppar_insert_msg(), concat_msgq()
+        //LOCK: toppar_lock. toppar_enq_msg(), deq_msg(), insert_msgq()
+        int                rktp_msgq_wakeup_fd; /* Wake-up fd */
+	rd_kafka_msgq_t    rktp_msgq;      /* application->rdkafka queue.
+					    * protected by rktp_lock */
+	rd_kafka_msgq_t    rktp_xmit_msgq; /* internal broker xmit queue */
+
+        int                rktp_fetch;     /* On rkb_fetch_toppars list */
+
+	/* Consumer */
+	rd_kafka_q_t      *rktp_fetchq;          /* Queue of fetched messages
+						  * from broker.
+                                                  * Broker thread -> App */
+        rd_kafka_q_t      *rktp_ops;             /* * -> Main thread */
+
+
+	/**
+	 * rktp version barriers
+	 *
+	 * rktp_version is the application/controller side's
+	 * authoritative version, it depicts the most up to date state.
+	 * This is what q_filter() matches an rko_version to.
+	 *
+	 * rktp_op_version is the last/current received state handled
+	 * by the toppar in the broker thread. It is updated to rktp_version
+	 * when receiving a new op.
+	 *
+	 * rktp_fetch_version is the current fetcher decision version.
+	 * It is used in fetch_decide() to see if the fetch decision
+	 * needs to be updated by comparing to rktp_op_version.
+	 *
+	 * Example:
+	 *   App thread   : Send OP_START (v1 bump): rktp_version=1
+	 *   Broker thread: Recv OP_START (v1): rktp_op_version=1
+	 *   Broker thread: fetch_decide() detects that
+	 *                  rktp_op_version != rktp_fetch_version and
+	 *                  sets rktp_fetch_version=1.
+	 *   Broker thread: next Fetch request has it's tver state set to
+	 *                  rktp_fetch_verison (v1).
+	 *
+	 *   App thread   : Send OP_SEEK (v2 bump): rktp_version=2
+	 *   Broker thread: Recv OP_SEEK (v2): rktp_op_version=2
+	 *   Broker thread: Recv IO FetchResponse with tver=1,
+	 *                  when enqueued on rktp_fetchq they're discarded
+	 *                  due to old version (tver<rktp_version).
+	 *   Broker thread: fetch_decide() detects version change and
+	 *                  sets rktp_fetch_version=2.
+	 *   Broker thread: next Fetch request has tver=2
+	 *   Broker thread: Recv IO FetchResponse with tver=2 which
+	 *                  is same as rktp_version so message is forwarded
+	 *                  to app.
+	 */
+        rd_atomic32_t      rktp_version;         /* Latest op version.
+                                                  * Authoritative (app thread)*/
+	int32_t            rktp_op_version;      /* Op version of curr command
+						  * state from.
+						  * (broker thread) */
+        int32_t            rktp_fetch_version;   /* Op version of curr fetch.
+                                                    (broker thread) */
+
+	enum {
+		RD_KAFKA_TOPPAR_FETCH_NONE = 0,
+                RD_KAFKA_TOPPAR_FETCH_STOPPING,
+                RD_KAFKA_TOPPAR_FETCH_STOPPED,
+		RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
+		RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
+		RD_KAFKA_TOPPAR_FETCH_ACTIVE,
+	} rktp_fetch_state;           /* Broker thread's state */
+
+#define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \
+        ((fetch_state) >= RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
+
+	int32_t            rktp_fetch_msg_max_bytes; /* Max number of bytes to
+                                                      * fetch.
+                                                      * Locality: broker thread
+                                                      */
+
+        rd_ts_t            rktp_ts_fetch_backoff; /* Back off fetcher for
+                                                   * this partition until this
+                                                   * absolute timestamp
+                                                   * expires. */
+
+	int64_t            rktp_query_offset;    /* Offset to query broker for*/
+	int64_t            rktp_next_offset;     /* Next offset to start
+                                                  * fetching from.
+                                                  * Locality: toppar thread */
+	int64_t            rktp_last_next_offset; /* Last next_offset handled
+						   * by fetch_decide().
+						   * Locality: broker thread */
+	int64_t            rktp_app_offset;      /* Last offset delivered to
+						  * application + 1 */
+	int64_t            rktp_stored_offset;   /* Last stored offset, but
+						  * maybe not committed yet. */
+        int64_t            rktp_committing_offset; /* Offset currently being
+                                                    * committed */
+	int64_t            rktp_committed_offset; /* Last committed offset */
+	rd_ts_t            rktp_ts_committed_offset; /* Timestamp of last
+                                                      * commit */
+
+        struct offset_stats rktp_offsets; /* Current offsets.
+                                           * Locality: broker thread*/
+        struct offset_stats rktp_offsets_fin; /* Finalized offset for stats.
+                                               * Updated periodically
+                                               * by broker thread.
+                                               * Locks: toppar_lock */
+
+	int64_t rktp_hi_offset;              /* Current high offset.
+					      * Locks: toppar_lock */
+        int64_t rktp_lo_offset;              /* Current broker low offset.
+                                              * This is outside of the stats
+                                              * struct due to this field
+                                              * being populated by the
+                                              * toppar thread rather than
+                                              * the broker thread.
+                                              * Locality: toppar thread
+                                              * Locks: toppar_lock */
+
+        rd_ts_t            rktp_ts_offset_lag;
+
+	char              *rktp_offset_path;     /* Path to offset file */
+	FILE              *rktp_offset_fp;       /* Offset file pointer */
+        rd_kafka_cgrp_t   *rktp_cgrp;            /* Belongs to this cgrp */
+
+        int                rktp_assigned;   /* Partition in cgrp assignment */
+
+        rd_kafka_replyq_t  rktp_replyq; /* Current replyq+version
+					 * for propagating
+					 * major operations, e.g.,
+					 * FETCH_STOP. */
+        //LOCK: toppar_lock().  RD_KAFKA_TOPPAR_F_DESIRED
+        //LOCK: toppar_lock().  RD_KAFKA_TOPPAR_F_UNKNOWN
+	int                rktp_flags;
+#define RD_KAFKA_TOPPAR_F_DESIRED  0x1      /* This partition is desired
+					     * by a consumer. */
+#define RD_KAFKA_TOPPAR_F_UNKNOWN  0x2      /* Topic is (not yet) seen on
+					     * a broker. */
+#define RD_KAFKA_TOPPAR_F_OFFSET_STORE 0x4  /* Offset store is active */
+#define RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING 0x8 /* Offset store stopping */
+#define RD_KAFKA_TOPPAR_F_APP_PAUSE  0x10   /* App pause()d consumption */
+#define RD_KAFKA_TOPPAR_F_LIB_PAUSE  0x20   /* librdkafka paused consumption */
+#define RD_KAFKA_TOPPAR_F_REMOVE     0x40   /* partition removed from cluster */
+#define RD_KAFKA_TOPPAR_F_LEADER_ERR 0x80   /* Operation failed:
+                                             * leader might be missing.
+                                             * Typically set from
+                                             * ProduceResponse failure. */
+
+        shptr_rd_kafka_toppar_t *rktp_s_for_desp; /* Shared pointer for
+                                                   * rkt_desp list */
+        shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; /* Shared pointer for
+                                                   * rkcg_toppars list */
+        shptr_rd_kafka_toppar_t *rktp_s_for_rkb;  /* Shared pointer for
+                                                   * rkb_toppars list */
+
+	/*
+	 * Timers
+	 */
+	rd_kafka_timer_t rktp_offset_query_tmr;  /* Offset query timer */
+	rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */
+	rd_kafka_timer_t rktp_offset_sync_tmr;   /* Offset file sync timer */
+        rd_kafka_timer_t rktp_consumer_lag_tmr;  /* Consumer lag monitoring
+						  * timer */
+
+        int rktp_wait_consumer_lag_resp;         /* Waiting for consumer lag
+                                                  * response. */
+
+	struct {
+		rd_atomic64_t tx_msgs;
+		rd_atomic64_t tx_bytes;
+                rd_atomic64_t msgs;
+                rd_atomic64_t rx_ver_drops;
+	} rktp_c;
+
+};
+
+
+/**
+ * Check if toppar is paused (consumer).
+ * Locks: toppar_lock() MUST be held.
+ */
+#define RD_KAFKA_TOPPAR_IS_PAUSED(rktp)				\
+	((rktp)->rktp_flags & (RD_KAFKA_TOPPAR_F_APP_PAUSE |	\
+			       RD_KAFKA_TOPPAR_F_LIB_PAUSE))
+
+
+
+
+/* Converts a shptr..toppar_t to a toppar_t */
+#define rd_kafka_toppar_s2i(s_rktp) rd_shared_ptr_obj(s_rktp)
+
+
+/**
+ * Returns a shared pointer for the topic.
+ */
+#define rd_kafka_toppar_keep(rktp)                                      \
+        rd_shared_ptr_get(rktp, &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t)
+
+#define rd_kafka_toppar_keep_src(func,line,rktp)			\
+        rd_shared_ptr_get_src(func, line, rktp,				\
+			      &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t)
+
+
+/**
+ * Frees a shared pointer previously returned by ..toppar_keep()
+ */
+#define rd_kafka_toppar_destroy(s_rktp)                                 \
+        rd_shared_ptr_put(s_rktp,                                       \
+                          &rd_kafka_toppar_s2i(s_rktp)->rktp_refcnt,    \
+                          rd_kafka_toppar_destroy_final(                \
+                                  rd_kafka_toppar_s2i(s_rktp)))
+
+
+
+
+#define rd_kafka_toppar_lock(rktp)     mtx_lock(&(rktp)->rktp_lock)
+#define rd_kafka_toppar_unlock(rktp)   mtx_unlock(&(rktp)->rktp_lock)
+
+static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp)
+	RD_UNUSED;
+static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp) {
+	static RD_TLS char ret[256];
+
+	rd_snprintf(ret, sizeof(ret), "%.*s [%"PRId32"]",
+		    RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+		    rktp->rktp_partition);
+
+	return ret;
+}
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
+					       int32_t partition,
+					       const char *func, int line);
+#define rd_kafka_toppar_new(rkt,partition) \
+	rd_kafka_toppar_new0(rkt, partition, __FUNCTION__, __LINE__)
+void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
+                                      int fetch_state);
+void rd_kafka_toppar_insert_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
+void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
+void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
+void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
+				  rd_kafka_msgq_t *rkmq);
+void rd_kafka_toppar_concat_msgq (rd_kafka_toppar_t *rktp,
+				  rd_kafka_msgq_t *rkmq);
+void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
+                                rd_kafka_resp_err_t err);
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
+                                               const rd_kafka_itopic_t *rkt,
+                                               int32_t partition,
+                                               int ua_on_miss);
+#define rd_kafka_toppar_get(rkt,partition,ua_on_miss) \
+        rd_kafka_toppar_get0(__FUNCTION__,__LINE__,rkt,partition,ua_on_miss)
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
+                                               const char *topic,
+                                               int32_t partition,
+                                               int ua_on_miss,
+                                               int create_on_miss);
+shptr_rd_kafka_toppar_t *
+rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt,
+                           int32_t partition,
+                           int ua_on_miss,
+                           rd_kafka_resp_err_t *errp);
+
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt,
+                                                      int32_t partition);
+void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp);
+shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt,
+                                                      int32_t partition);
+void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp);
+
+int rd_kafka_toppar_ua_move (rd_kafka_itopic_t *rkt, rd_kafka_msgq_t *rkmq);
+
+void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
+                                         int64_t Offset);
+
+void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
+				    const char *metadata);
+
+void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
+				      rd_kafka_broker_t *rkb,
+				      int for_removal);
+
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
+                                                    int64_t offset,
+                                                    rd_kafka_q_t *fwdq,
+                                                    rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
+                                                   rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
+                                             int64_t offset,
+                                             rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_pause (rd_kafka_toppar_t *rktp,
+					      int pause, int flag);
+
+void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
+                                    rd_kafka_resp_err_t err);
+
+/**
+ * Updates the current toppar fetch round-robin next pointer.
+ */
+static RD_INLINE RD_UNUSED
+void rd_kafka_broker_fetch_toppar_next (rd_kafka_broker_t *rkb,
+                                        rd_kafka_toppar_t *sugg_next) {
+        if (CIRCLEQ_EMPTY(&rkb->rkb_fetch_toppars) ||
+            (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_fetch_toppars))
+                rkb->rkb_fetch_toppar_next = NULL;
+        else if (sugg_next)
+                rkb->rkb_fetch_toppar_next = sugg_next;
+        else
+                rkb->rkb_fetch_toppar_next =
+                        CIRCLEQ_FIRST(&rkb->rkb_fetch_toppars);
+}
+
+
+rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
+                                      rd_kafka_broker_t *rkb,
+                                      int force_remove);
+
+
+
+rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
+                                               rd_kafka_toppar_t *rktp);
+
+
+void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
+                                   rd_kafka_replyq_t replyq);
+
+void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
+				     int64_t query_offset, int backoff_ms);
+
+
+rd_kafka_assignor_t *
+rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol);
+
+
+rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp,
+                                           int proper_broker);
+void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
+                                         const char *reason,
+                                         rd_kafka_resp_err_t err);
+
+rd_kafka_resp_err_t
+rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
+			       rd_kafka_topic_partition_list_t *partitions);
+
+
+rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
+							  int32_t partition);
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp);
+
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist,
+                                    const char *topic, int32_t partition,
+				    shptr_rd_kafka_toppar_t *_private);
+
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_list_upsert (
+        rd_kafka_topic_partition_list_t *rktparlist,
+        const char *topic, int32_t partition);
+
+int rd_kafka_topic_partition_match (rd_kafka_t *rk,
+				    const rd_kafka_group_member_t *rkgm,
+				    const rd_kafka_topic_partition_t *rktpar,
+				    const char *topic, int *matched_by_regex);
+
+
+void rd_kafka_topic_partition_list_sort_by_topic (
+        rd_kafka_topic_partition_list_t *rktparlist);
+
+void
+rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
+					     int64_t offset);
+
+int rd_kafka_topic_partition_list_set_offsets (
+	rd_kafka_t *rk,
+        rd_kafka_topic_partition_list_t *rktparlist,
+        int from_rktp, int64_t def_value, int is_commit);
+
+int rd_kafka_topic_partition_list_count_abs_offsets (
+	const rd_kafka_topic_partition_list_t *rktparlist);
+
+shptr_rd_kafka_toppar_t *
+rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
+                                     rd_kafka_topic_partition_t *rktpar);
+
+shptr_rd_kafka_toppar_t *
+rd_kafka_topic_partition_list_get_toppar (
+        rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar);
+
+void
+rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
+                                              rd_kafka_topic_partition_list_t
+                                              *rktparlist);
+
+int
+rd_kafka_topic_partition_list_get_leaders (
+        rd_kafka_t *rk,
+        rd_kafka_topic_partition_list_t *rktparlist,
+        rd_list_t *leaders, rd_list_t *query_topics);
+
+rd_kafka_resp_err_t
+rd_kafka_topic_partition_list_query_leaders (
+        rd_kafka_t *rk,
+        rd_kafka_topic_partition_list_t *rktparlist,
+        rd_list_t *leaders, int timeout_ms);
+
+int
+rd_kafka_topic_partition_list_get_topics (
+        rd_kafka_t *rk,
+        rd_kafka_topic_partition_list_t *rktparlist,
+        rd_list_t *rkts);
+
+int
+rd_kafka_topic_partition_list_get_topic_names (
+        const rd_kafka_topic_partition_list_t *rktparlist,
+        rd_list_t *topics, int include_regex);
+
+void
+rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac,
+				   const rd_kafka_topic_partition_list_t *rktparlist);
+
+#define RD_KAFKA_FMT_F_OFFSET    0x1  /* Print offset */
+#define RD_KAFKA_FMT_F_ONLY_ERR  0x2  /* Only include errored entries */
+#define RD_KAFKA_FMT_F_NO_ERR    0x4  /* Dont print error string */
+const char *
+rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
+                                   char *dest, size_t dest_size,
+                                   int fmt_flags);
+
+void
+rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
+                                      const rd_kafka_topic_partition_list_t *src);
+
+int rd_kafka_topic_partition_leader_cmp (const void *_a, const void *_b);
+
+rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
+        const rd_kafka_topic_partition_list_t *rktparlist,
+        int (*match) (const void *elem, const void *opaque),
+        void *opaque);
+
+size_t
+rd_kafka_topic_partition_list_sum (
+        const rd_kafka_topic_partition_list_t *rktparlist,
+        size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
+        void *opaque);
+
+void rd_kafka_topic_partition_list_set_err (
+        rd_kafka_topic_partition_list_t *rktparlist,
+        rd_kafka_resp_err_t err);
+
+int rd_kafka_topic_partition_list_regex_cnt (
+        const rd_kafka_topic_partition_list_t *rktparlist);
+
+/**
+ * @brief Toppar + Op version tuple used for mapping Fetched partitions
+ *        back to their fetch versions.
+ */
+struct rd_kafka_toppar_ver {
+	shptr_rd_kafka_toppar_t *s_rktp;
+	int32_t version;
+};
+
+
+/**
+ * @brief Toppar + Op version comparator.
+ */
+static RD_INLINE RD_UNUSED
+int rd_kafka_toppar_ver_cmp (const void *_a, const void *_b) {
+	const struct rd_kafka_toppar_ver *a = _a, *b = _b;
+	const rd_kafka_toppar_t *rktp_a = rd_kafka_toppar_s2i(a->s_rktp);
+	const rd_kafka_toppar_t *rktp_b = rd_kafka_toppar_s2i(b->s_rktp);
+	int r;
+
+	if (rktp_a->rktp_rkt != rktp_b->rktp_rkt &&
+	    (r = rd_kafkap_str_cmp(rktp_a->rktp_rkt->rkt_topic,
+				   rktp_b->rktp_rkt->rkt_topic)))
+		return r;
+
+	return rktp_a->rktp_partition - rktp_b->rktp_partition;
+}
+
+/**
+ * @brief Frees up resources for \p tver but not the \p tver itself.
+ */
+static RD_INLINE RD_UNUSED
+void rd_kafka_toppar_ver_destroy (struct rd_kafka_toppar_ver *tver) {
+	rd_kafka_toppar_destroy(tver->s_rktp);
+}
+
+
+/**
+ * @returns 1 if rko version is outdated, else 0.
+ */
+static RD_INLINE RD_UNUSED
+int rd_kafka_op_version_outdated (rd_kafka_op_t *rko, int version) {
+	if (!rko->rko_version)
+		return 0;
+
+	if (version)
+		return rko->rko_version < version;
+
+	if (rko->rko_rktp)
+		return rko->rko_version <
+			rd_atomic32_get(&rd_kafka_toppar_s2i(
+						rko->rko_rktp)->rktp_version);
+	return 0;
+}
+
+void
+rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
+				      rd_kafka_resp_err_t err,
+				      rd_kafka_topic_partition_list_t *offsets);
+
+void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp);
+
+
+/**
+ * @brief Represents a leader and the partitions it is leader for.
+ */
+struct rd_kafka_partition_leader {
+        rd_kafka_broker_t *rkb;
+        rd_kafka_topic_partition_list_t *partitions;
+};
+
+static RD_UNUSED void
+rd_kafka_partition_leader_destroy (struct rd_kafka_partition_leader *leader) {
+        rd_kafka_broker_destroy(leader->rkb);
+        rd_kafka_topic_partition_list_destroy(leader->partitions);
+        rd_free(leader);
+}
+
+static RD_UNUSED struct rd_kafka_partition_leader *
+rd_kafka_partition_leader_new (rd_kafka_broker_t *rkb) {
+        struct rd_kafka_partition_leader *leader = rd_malloc(sizeof(*leader));
+        leader->rkb = rkb;
+        rd_kafka_broker_keep(rkb);
+        leader->partitions = rd_kafka_topic_partition_list_new(0);
+        return leader;
+}
+
+static RD_UNUSED
+int rd_kafka_partition_leader_cmp (const void *_a, const void *_b) {
+        const struct rd_kafka_partition_leader *a = _a, *b = _b;
+        return rd_kafka_broker_cmp(a->rkb, b->rkb);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c
new file mode 100644
index 0000000..fc2d711
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.c
@@ -0,0 +1,224 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_pattern.h"
+
+void rd_kafka_pattern_destroy (rd_kafka_pattern_list_t *plist,
+                               rd_kafka_pattern_t *rkpat) {
+        TAILQ_REMOVE(&plist->rkpl_head, rkpat, rkpat_link);
+	rd_regex_destroy(rkpat->rkpat_re);
+        rd_free(rkpat->rkpat_orig);
+        rd_free(rkpat);
+}
+
+void rd_kafka_pattern_add (rd_kafka_pattern_list_t *plist,
+                           rd_kafka_pattern_t *rkpat) {
+        TAILQ_INSERT_TAIL(&plist->rkpl_head, rkpat, rkpat_link);
+}
+
+rd_kafka_pattern_t *rd_kafka_pattern_new (const char *pattern,
+                                          char *errstr, int errstr_size) {
+        rd_kafka_pattern_t *rkpat;
+
+	rkpat = rd_calloc(1, sizeof(*rkpat));
+
+	/* Verify and precompile pattern */
+	if (!(rkpat->rkpat_re = rd_regex_comp(pattern, errstr, errstr_size))) {
+		rd_free(rkpat);
+		return NULL;
+	}
+
+        rkpat->rkpat_orig = rd_strdup(pattern);
+
+        return rkpat;
+}
+
+
+
+int rd_kafka_pattern_match (rd_kafka_pattern_list_t *plist, const char *str) {
+        rd_kafka_pattern_t *rkpat;
+
+        TAILQ_FOREACH(rkpat, &plist->rkpl_head, rkpat_link) {
+		if (rd_regex_exec(rkpat->rkpat_re, str))
+                        return 1;
+        }
+
+        return 0;
+}
+
+
+/**
+ * Append pattern to list.
+ */
+int rd_kafka_pattern_list_append (rd_kafka_pattern_list_t *plist,
+                                  const char *pattern,
+                                  char *errstr, int errstr_size) {
+        rd_kafka_pattern_t *rkpat;
+        rkpat = rd_kafka_pattern_new(pattern, errstr, errstr_size);
+        if (!rkpat)
+                return -1;
+
+        rd_kafka_pattern_add(plist, rkpat);
+        return 0;
+}
+
+/**
+ * Remove matching patterns.
+ * Returns the number of removed patterns.
+ */
+int rd_kafka_pattern_list_remove (rd_kafka_pattern_list_t *plist,
+                                  const char *pattern) {
+        rd_kafka_pattern_t *rkpat, *rkpat_tmp;
+        int cnt = 0;
+
+        TAILQ_FOREACH_SAFE(rkpat, &plist->rkpl_head, rkpat_link, rkpat_tmp) {
+                if (!strcmp(rkpat->rkpat_orig, pattern)) {
+                        rd_kafka_pattern_destroy(plist, rkpat);
+                        cnt++;
+                }
+        }
+        return cnt;
+}
+
+/**
+ * Parse a patternlist and populate a list with it.
+ */
+static int rd_kafka_pattern_list_parse (rd_kafka_pattern_list_t *plist,
+                                        const char *patternlist,
+                                        char *errstr, size_t errstr_size) {
+		char *s;
+		rd_strdupa(&s, patternlist);
+
+        while (s && *s) {
+                char *t = s;
+                char re_errstr[256];
+
+                /* Find separator */
+                while ((t = strchr(t, ','))) {
+                        if (t > s && *(t-1) == ',') {
+                                /* separator was escaped,
+                                   remove escape and scan again. */
+                                memmove(t-1, t, strlen(t)+1);
+                                t++;
+                        } else {
+                                *t = '\0';
+                                t++;
+                                break;
+                        }
+                }
+
+                if (rd_kafka_pattern_list_append(plist, s, re_errstr,
+                                                 sizeof(re_errstr)) == -1) {
+                        rd_snprintf(errstr, errstr_size,
+                                    "Failed to parse pattern \"%s\": "
+                                    "%s", s, re_errstr);
+                        rd_kafka_pattern_list_clear(plist);
+                        return -1;
+                }
+
+                s = t;
+        }
+
+        return 0;
+}
+
+
+/**
+ * Clear a pattern list.
+ */
+void rd_kafka_pattern_list_clear (rd_kafka_pattern_list_t *plist) {
+        rd_kafka_pattern_t *rkpat;
+
+        while ((rkpat = TAILQ_FIRST(&plist->rkpl_head)))
+                rd_kafka_pattern_destroy(plist, rkpat);
+
+        if (plist->rkpl_orig) {
+                rd_free(plist->rkpl_orig);
+                plist->rkpl_orig = NULL;
+        }
+}
+
+
+/**
+ * Free a pattern list previously created with list_new()
+ */
+void rd_kafka_pattern_list_destroy (rd_kafka_pattern_list_t *plist) {
+        rd_kafka_pattern_list_clear(plist);
+        rd_free(plist);
+}
+
+/**
+ * Initialize a pattern list, optionally populating it with the
+ * comma-separated patterns in 'patternlist'.
+ */
+int rd_kafka_pattern_list_init (rd_kafka_pattern_list_t *plist,
+                                const char *patternlist,
+                                char *errstr, size_t errstr_size) {
+        TAILQ_INIT(&plist->rkpl_head);
+        if (patternlist) {
+                if (rd_kafka_pattern_list_parse(plist, patternlist,
+                                                errstr, errstr_size) == -1)
+                        return -1;
+                plist->rkpl_orig = rd_strdup(patternlist);
+        } else
+                plist->rkpl_orig = NULL;
+
+        return 0;
+}
+
+
+/**
+ * Allocate and initialize a new list.
+ */
+rd_kafka_pattern_list_t *rd_kafka_pattern_list_new (const char *patternlist,
+                                                    char *errstr,
+                                                    int errstr_size) {
+        rd_kafka_pattern_list_t *plist;
+
+        plist = rd_calloc(1, sizeof(*plist));
+
+        if (rd_kafka_pattern_list_init(plist, patternlist,
+                                       errstr, errstr_size) == -1) {
+                rd_free(plist);
+                return NULL;
+        }
+
+        return plist;
+}
+
+
+/**
+ * Make a copy of a pattern list.
+ */
+rd_kafka_pattern_list_t *
+rd_kafka_pattern_list_copy (rd_kafka_pattern_list_t *src) {
+	char errstr[16];
+	return rd_kafka_pattern_list_new(src->rkpl_orig,
+					 errstr, sizeof(errstr));
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h
new file mode 100644
index 0000000..6e6f976
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_pattern.h
@@ -0,0 +1,65 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#pragma once
+
+#include "rdregex.h"
+
+typedef struct rd_kafka_pattern_s {
+        TAILQ_ENTRY(rd_kafka_pattern_s)  rkpat_link;
+
+	rd_regex_t  *rkpat_re;   /* Compiled regex */
+        char        *rkpat_orig;  /* Original pattern */
+} rd_kafka_pattern_t;
+
+typedef struct rd_kafka_pattern_list_s {
+        TAILQ_HEAD(,rd_kafka_pattern_s) rkpl_head;
+        char   *rkpl_orig;
+} rd_kafka_pattern_list_t;
+
+void rd_kafka_pattern_destroy (rd_kafka_pattern_list_t *plist,
+                               rd_kafka_pattern_t *rkpat);
+void rd_kafka_pattern_add (rd_kafka_pattern_list_t *plist,
+                           rd_kafka_pattern_t *rkpat);
+rd_kafka_pattern_t *rd_kafka_pattern_new (const char *pattern,
+                                          char *errstr, int errstr_size);
+int rd_kafka_pattern_match (rd_kafka_pattern_list_t *plist, const char *str);
+int rd_kafka_pattern_list_append (rd_kafka_pattern_list_t *plist,
+                                  const char *pattern,
+                                  char *errstr, int errstr_size);
+int rd_kafka_pattern_list_remove (rd_kafka_pattern_list_t *plist,
+                                  const char *pattern);
+void rd_kafka_pattern_list_clear (rd_kafka_pattern_list_t *plist);
+void rd_kafka_pattern_list_destroy (rd_kafka_pattern_list_t *plist);
+int rd_kafka_pattern_list_init (rd_kafka_pattern_list_t *plist,
+                                const char *patternlist,
+                                char *errstr, size_t errstr_size);
+rd_kafka_pattern_list_t *rd_kafka_pattern_list_new (const char *patternlist,
+                                                    char *errstr,
+                                                    int errstr_size);
+rd_kafka_pattern_list_t *
+rd_kafka_pattern_list_copy (rd_kafka_pattern_list_t *src);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c
new file mode 100644
index 0000000..b899899
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.c
@@ -0,0 +1,209 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_plugin.h"
+#include "rddl.h"
+
+
+typedef struct rd_kafka_plugin_s {
+        char *rkplug_path;         /* Library path */
+        rd_kafka_t *rkplug_rk;     /* Backpointer to the rk handle */
+        void *rkplug_handle;       /* dlopen (or similar) handle */
+        void *rkplug_opaque;       /* Plugin's opaque */
+
+} rd_kafka_plugin_t;
+
+
+/**
+ * @brief Plugin path comparator
+ */
+static int rd_kafka_plugin_cmp (const void *_a, const void *_b) {
+        const rd_kafka_plugin_t *a = _a, *b = _b;
+
+        return strcmp(a->rkplug_path, b->rkplug_path);
+}
+
+
+/**
+ * @brief Add plugin (by library path) and calls its conf_init() constructor
+ *
+ * @returns an error code on error.
+ * @remark duplicate plugins are silently ignored.
+ *
+ * @remark Libraries are refcounted and thus not unloaded until all
+ *         plugins referencing the library have been destroyed.
+ *         (dlopen() and LoadLibrary() does this for us)
+ */
+static rd_kafka_resp_err_t
+rd_kafka_plugin_new (rd_kafka_conf_t *conf, const char *path,
+                     char *errstr, size_t errstr_size) {
+        rd_kafka_plugin_t *rkplug;
+        const rd_kafka_plugin_t skel = { .rkplug_path = (char *)path };
+        rd_kafka_plugin_f_conf_init_t *conf_init;
+        rd_kafka_resp_err_t err;
+        void *handle;
+        void *plug_opaque = NULL;
+
+        /* Avoid duplicates */
+        if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) {
+                rd_snprintf(errstr, errstr_size,
+                            "Ignoring duplicate plugin %s", path);
+                return RD_KAFKA_RESP_ERR_NO_ERROR;
+        }
+
+        rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+                      "Loading plugin \"%s\"", path);
+
+        /* Attempt to load library */
+        if (!(handle = rd_dl_open(path, errstr, errstr_size))) {
+                rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+                              "Failed to load plugin \"%s\": %s",
+                              path, errstr);
+                return RD_KAFKA_RESP_ERR__FS;
+        }
+
+        /* Find conf_init() function */
+        if (!(conf_init = rd_dl_sym(handle, "conf_init",
+                                    errstr, errstr_size))) {
+                rd_dl_close(handle);
+                return RD_KAFKA_RESP_ERR__INVALID_ARG;
+        }
+
+        /* Call conf_init() */
+        rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT",
+                      "Calling plugin \"%s\" conf_init()", path);
+
+        if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) {
+                rd_dl_close(handle);
+                return err;
+        }
+
+        rkplug = rd_calloc(1, sizeof(*rkplug));
+        rkplug->rkplug_path        = rd_strdup(path);
+        rkplug->rkplug_handle      = handle;
+        rkplug->rkplug_opaque = plug_opaque;
+
+        rd_list_add(&conf->plugins, rkplug);
+
+        rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+                      "Plugin \"%s\" loaded", path);
+
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Free the plugin, any conf_destroy() interceptors will have been
+ *        called prior to this call.
+ * @remark plugin is not removed from any list (caller's responsibility)
+ * @remark this relies on the actual library loader to refcount libraries,
+ *         especially in the config copy case.
+ *         This is true for POSIX dlopen() and Win32 LoadLibrary().
+ * @locality application thread
+ */
+static void rd_kafka_plugin_destroy (rd_kafka_plugin_t *rkplug) {
+        rd_dl_close(rkplug->rkplug_handle);
+        rd_free(rkplug->rkplug_path);
+        rd_free(rkplug);
+}
+
+
+
+/**
+ * @brief Initialize all configured plugins.
+ *
+ * @remark Any previously loaded plugins will be unloaded.
+ *
+ * @returns the error code of the first failing plugin.
+ * @locality application thread calling rd_kafka_new().
+ */
+static rd_kafka_conf_res_t
+rd_kafka_plugins_conf_set0 (rd_kafka_conf_t *conf, const char *paths,
+                            char *errstr, size_t errstr_size) {
+        char *s;
+
+        rd_list_destroy(&conf->plugins);
+        rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy);
+
+        if (!paths || !*paths)
+                return RD_KAFKA_CONF_OK;
+
+        /* Split paths by ; */
+        rd_strdupa(&s, paths);
+
+        rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+                      "Loading plugins from conf object %p: \"%s\"",
+                      conf, paths);
+
+        while (s && *s) {
+                char *path = s;
+                char *t;
+                rd_kafka_resp_err_t err;
+
+                if ((t = strchr(s, ';'))) {
+                        *t = '\0';
+                        s = t+1;
+                } else {
+                        s = NULL;
+                }
+
+                if ((err = rd_kafka_plugin_new(conf, path,
+                                               errstr, errstr_size))) {
+                        /* Failed to load plugin */
+                        size_t elen = errstr_size > 0 ? strlen(errstr) : 0;
+
+                        /* See if there is room for appending the
+                         * plugin path to the error message. */
+                        if (elen + strlen("(plugin )") + strlen(path) <
+                            errstr_size)
+                                rd_snprintf(errstr+elen, errstr_size-elen,
+                                            " (plugin %s)", path);
+
+                        rd_list_destroy(&conf->plugins);
+                        return RD_KAFKA_CONF_INVALID;
+                }
+        }
+
+        return RD_KAFKA_CONF_OK;
+}
+
+
+/**
+ * @brief Conf setter for "plugin.library.paths"
+ */
+rd_kafka_conf_res_t rd_kafka_plugins_conf_set (
+        int scope, void *pconf, const char *name, const char *value,
+        void *dstptr, rd_kafka_conf_set_mode_t set_mode,
+        char *errstr, size_t errstr_size) {
+
+        assert(scope == _RK_GLOBAL);
+        return rd_kafka_plugins_conf_set0((rd_kafka_conf_t *)pconf,
+                                          set_mode == _RK_CONF_PROP_SET_DEL ?
+                                          NULL : value, errstr, errstr_size);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h
new file mode 100644
index 0000000..b588a7d
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_plugin.h
@@ -0,0 +1,37 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKA_PLUGIN_H
+#define _RDKAFKA_PLUGIN_H
+
+rd_kafka_conf_res_t rd_kafka_plugins_conf_set (
+        int scope, void *conf, const char *name, const char *value,
+        void *dstptr, rd_kafka_conf_set_mode_t set_mode,
+        char *errstr, size_t errstr_size);
+
+#endif /* _RDKAFKA_PLUGIN_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h
new file mode 100644
index 0000000..d778c4d
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_proto.h
@@ -0,0 +1,498 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+
+#include "rdendian.h"
+#include "rdvarint.h"
+
+
+
+/*
+ * Kafka protocol definitions.
+ */
+
+#define RD_KAFKA_PORT      9092
+#define RD_KAFKA_PORT_STR "9092"
+
+
+/**
+ * Request types
+ */
+struct rd_kafkap_reqhdr {
+        int32_t  Size;
+        int16_t  ApiKey;
+#define RD_KAFKAP_None         -1
+#define RD_KAFKAP_Produce       0
+#define RD_KAFKAP_Fetch         1
+#define RD_KAFKAP_Offset        2
+#define RD_KAFKAP_Metadata      3
+#define RD_KAFKAP_LeaderAndIsr  4
+#define RD_KAFKAP_StopReplica   5
+#define RD_KAFKAP_OffsetCommit  8
+#define RD_KAFKAP_OffsetFetch   9
+#define RD_KAFKAP_GroupCoordinator 10
+#define RD_KAFKAP_JoinGroup     11
+#define RD_KAFKAP_Heartbeat     12
+#define RD_KAFKAP_LeaveGroup    13
+#define RD_KAFKAP_SyncGroup     14
+#define RD_KAFKAP_DescribeGroups 15
+#define RD_KAFKAP_ListGroups    16
+#define RD_KAFKAP_SaslHandshake 17
+#define RD_KAFKAP_ApiVersion    18
+#define RD_KAFKAP_CreateTopics  19
+#define RD_KAFKAP_DeleteTopics  20
+#define RD_KAFKAP_DeleteRecords 21
+#define RD_KAFKAP_InitProducerId 22
+#define RD_KAFKAP_OffsetForLeaderEpoch 23
+#define RD_KAFKAP_AddPartitionsToTxn 24
+#define RD_KAFKAP_AddOffsetsToTxn 25
+#define RD_KAFKAP_EndTxn        26
+#define RD_KAFKAP_WriteTxnMarkers 27
+#define RD_KAFKAP_TxnOffsetCommit 28
+#define RD_KAFKAP_DescribeAcls  29
+#define RD_KAFKAP_CreateAcls    30
+#define RD_KAFKAP_DeleteAcls    31
+#define RD_KAFKAP_DescribeConfigs 32
+#define RD_KAFKAP_AlterConfigs  33
+#define RD_KAFKAP__NUM          34
+        int16_t  ApiVersion;
+        int32_t  CorrId;
+        /* ClientId follows */
+};
+
+#define RD_KAFKAP_REQHDR_SIZE (4+2+2+4)
+#define RD_KAFKAP_RESHDR_SIZE (4+4)
+
+/**
+ * Response header
+ */
+struct rd_kafkap_reshdr {
+	int32_t  Size;
+	int32_t  CorrId;
+};
+
+
+
+static RD_UNUSED
+const char *rd_kafka_ApiKey2str (int16_t ApiKey) {
+        static const char *names[] = {
+                [RD_KAFKAP_Produce] = "Produce",
+                [RD_KAFKAP_Fetch] = "Fetch",
+                [RD_KAFKAP_Offset] = "Offset",
+                [RD_KAFKAP_Metadata] = "Metadata",
+                [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
+                [RD_KAFKAP_StopReplica] = "StopReplica",
+                [RD_KAFKAP_OffsetCommit] = "OffsetCommit",
+                [RD_KAFKAP_OffsetFetch] = "OffsetFetch",
+                [RD_KAFKAP_GroupCoordinator] = "GroupCoordinator",
+                [RD_KAFKAP_JoinGroup] = "JoinGroup",
+                [RD_KAFKAP_Heartbeat] = "Heartbeat",
+                [RD_KAFKAP_LeaveGroup] = "LeaveGroup",
+                [RD_KAFKAP_SyncGroup] = "SyncGroup",
+                [RD_KAFKAP_DescribeGroups] = "DescribeGroups",
+                [RD_KAFKAP_ListGroups] = "ListGroups",
+                [RD_KAFKAP_SaslHandshake] = "SaslHandshake",
+                [RD_KAFKAP_ApiVersion] = "ApiVersion",
+                [RD_KAFKAP_CreateTopics] = "CreateTopics",
+                [RD_KAFKAP_DeleteTopics] = "DeleteTopics",
+                [RD_KAFKAP_DeleteRecords] = "DeleteRecords",
+                [RD_KAFKAP_InitProducerId] = "InitProducerId",
+                [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
+                [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
+                [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
+                [RD_KAFKAP_EndTxn] = "EndTxn",
+                [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
+                [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
+                [RD_KAFKAP_DescribeAcls] = "DescribeAcls",
+                [RD_KAFKAP_CreateAcls] = "CreateAcls",
+                [RD_KAFKAP_DeleteAcls] = "DeleteAcls",
+                [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
+                [RD_KAFKAP_AlterConfigs] = "AlterConfigs"
+        };
+        static RD_TLS char ret[32];
+
+        if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names)) {
+                rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
+                return ret;
+        }
+
+        return names[ApiKey];
+}
+
+
+
+
+
+
+
+
+/**
+ * @brief ApiKey version support tuple.
+ */
+struct rd_kafka_ApiVersion {
+	int16_t ApiKey;
+	int16_t MinVer;
+	int16_t MaxVer;
+};
+
+/**
+ * @brief ApiVersion.ApiKey comparator.
+ */
+static RD_UNUSED int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) {
+	const struct rd_kafka_ApiVersion *a = _a, *b = _b;
+
+	return a->ApiKey - b->ApiKey;
+}
+
+
+
+#define RD_KAFKAP_READ_UNCOMMITTED  0
+#define RD_KAFKAP_READ_COMMITTED    1
+
+
+/**
+ *
+ * Kafka protocol string representation prefixed with a convenience header
+ *
+ * Serialized format:
+ *  { uint16, data.. }
+ *
+ */
+typedef struct rd_kafkap_str_s {
+	/* convenience header (aligned access, host endian) */
+	int         len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
+	const char *str; /* points into data[] or other memory,
+			  * not NULL-terminated */
+} rd_kafkap_str_t;
+
+
+#define RD_KAFKAP_STR_LEN_NULL -1
+#define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)
+
+/* Returns the length of the string of a kafka protocol string representation */
+#define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
+#define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)
+
+/* Returns the actual size of a kafka protocol string representation. */
+#define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
+#define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)
+
+
+/* Serialized Kafka string: only works for _new() kstrs */
+#define RD_KAFKAP_STR_SER(kstr)  ((kstr)+1)
+
+/* Macro suitable for "%.*s" printing. */
+#define RD_KAFKAP_STR_PR(kstr)						\
+	(int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
+		(kstr)->str
+
+/* strndupa() a Kafka string */
+#define RD_KAFKAP_STR_DUPA(destptr,kstr) \
+	rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))
+
+/* strndup() a Kafka string */
+#define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))
+
+/**
+ * Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
+ */
+static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) {
+	rd_free(kstr);
+}
+
+
+
+/**
+ * Allocate a new Kafka string and make a copy of 'str'.
+ * If 'len' is -1 the length will be calculated.
+ * Supports Kafka NULL strings.
+ * Nul-terminates the string, but the trailing \0 is not part of
+ * the serialized string.
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) {
+	rd_kafkap_str_t *kstr;
+	int16_t klen;
+
+	if (!str)
+		len = RD_KAFKAP_STR_LEN_NULL;
+	else if (len == -1)
+		len = str ? (int)strlen(str) : RD_KAFKAP_STR_LEN_NULL;
+
+	kstr = rd_malloc(sizeof(*kstr) + 2 +
+			 (len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
+	kstr->len = len;
+
+	/* Serialised format: 16-bit string length */
+	klen = htobe16(len);
+	memcpy(kstr+1, &klen, 2);
+
+	/* Serialised format: non null-terminated string */
+	if (len == RD_KAFKAP_STR_LEN_NULL)
+		kstr->str = NULL;
+	else {
+		kstr->str = ((const char *)(kstr+1))+2;
+		memcpy((void *)kstr->str, str, len);
+		((char *)kstr->str)[len] = '\0';
+	}
+
+	return kstr;
+}
+
+
+/**
+ * Makes a copy of `src`. The copy will be fully allocated and should
+ * be freed with rd_kafka_pstr_destroy()
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) {
+        return rd_kafkap_str_new(src->str, src->len);
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
+						 const rd_kafkap_str_t *b) {
+	int minlen = RD_MIN(a->len, b->len);
+	int r = memcmp(a->str, b->str, minlen);
+	if (r)
+		return r;
+	else
+		return a->len - b->len;
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a,
+						     const char *str) {
+	int len = (int)strlen(str);
+	int minlen = RD_MIN(a->len, len);
+	int r = memcmp(a->str, str, minlen);
+	if (r)
+		return r;
+	else
+		return a->len - len;
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str,
+						      const rd_kafkap_str_t *b){
+	int len = (int)strlen(str);
+	int minlen = RD_MIN(b->len, len);
+	int r = memcmp(str, b->str, minlen);
+	if (r)
+		return r;
+	else
+		return len - b->len;
+}
+
+
+
+/**
+ *
+ * Kafka protocol bytes array representation prefixed with a convenience header
+ *
+ * Serialized format:
+ *  { uint32, data.. }
+ *
+ */
+typedef struct rd_kafkap_bytes_s {
+	/* convenience header (aligned access, host endian) */
+	int32_t     len;   /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
+	const void *data;  /* points just past the struct, or other memory,
+			    * not NULL-terminated */
+	const char _data[1]; /* Bytes following struct when new()ed */
+} rd_kafkap_bytes_t;
+
+
+#define RD_KAFKAP_BYTES_LEN_NULL -1
+#define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
+	((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)
+
+/* Returns the length of the bytes of a kafka protocol bytes representation */
+#define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len))
+#define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)
+
+/* Returns the actual size of a kafka protocol bytes representation. */
+#define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
+#define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)
+
+
+/* Serialized Kafka bytes: only works for _new() kbytes */
+#define RD_KAFKAP_BYTES_SER(kbytes)  ((kbytes)+1)
+
+
+/**
+ * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
+ */
+static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) {
+	rd_free(kbytes);
+}
+
+
+/**
+ * @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
+ * If \p len > 0 but \p bytes is NULL no copying is performed by
+ * the bytes structure will be allocated to fit \p size bytes.
+ *
+ * Supports:
+ *  - Kafka NULL bytes (bytes==NULL,len==0),
+ *  - Empty bytes (bytes!=NULL,len==0)
+ *  - Copy data (bytes!=NULL,len>0)
+ *  - No-copy, just alloc (bytes==NULL,len>0)
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) {
+	rd_kafkap_bytes_t *kbytes;
+	int32_t klen;
+
+	if (!bytes && !len)
+		len = RD_KAFKAP_BYTES_LEN_NULL;
+
+	kbytes = rd_malloc(sizeof(*kbytes) + 4 +
+			 (len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
+	kbytes->len = len;
+
+	klen = htobe32(len);
+	memcpy(kbytes+1, &klen, 4);
+
+	if (len == RD_KAFKAP_BYTES_LEN_NULL)
+		kbytes->data = NULL;
+	else {
+		kbytes->data = ((const char *)(kbytes+1))+4;
+                if (bytes)
+                        memcpy((void *)kbytes->data, bytes, len);
+	}
+
+	return kbytes;
+}
+
+
+/**
+ * Makes a copy of `src`. The copy will be fully allocated and should
+ * be freed with rd_kafkap_bytes_destroy()
+ */
+static RD_INLINE RD_UNUSED
+rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) {
+        return rd_kafkap_bytes_new(src->data, src->len);
+}
+
+
+static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
+						   const rd_kafkap_bytes_t *b) {
+	int minlen = RD_MIN(a->len, b->len);
+	int r = memcmp(a->data, b->data, minlen);
+	if (r)
+		return r;
+	else
+		return a->len - b->len;
+}
+
+static RD_INLINE RD_UNUSED
+int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a,
+			      const char *data, int len) {
+	int minlen = RD_MIN(a->len, len);
+	int r = memcmp(a->data, data, minlen);
+	if (r)
+		return r;
+	else
+		return a->len - len;
+}
+
+
+
+
+typedef struct rd_kafka_buf_s rd_kafka_buf_t;
+
+
+#define RD_KAFKA_NODENAME_SIZE  128
+
+
+
+
+/**
+ * @brief Message overheads (worst-case)
+ */
+
+/**
+ * MsgVersion v0..v1
+ */
+/* Offset + MessageSize */
+#define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4)
+/* CRC + Magic + Attr + KeyLen + ValueLen */
+#define RD_KAFKAP_MESSAGE_V0_HDR_SIZE    (4+1+1+4+4)
+/* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
+#define RD_KAFKAP_MESSAGE_V1_HDR_SIZE    (4+1+1+8+4+4)
+/* Maximum per-message overhead */
+#define RD_KAFKAP_MESSAGE_V0_OVERHEAD                                   \
+        (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
+#define RD_KAFKAP_MESSAGE_V1_OVERHEAD                                   \
+        (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)
+
+/**
+ * MsgVersion v2
+ */
+#define RD_KAFKAP_MESSAGE_V2_OVERHEAD                                  \
+        (                                                              \
+        /* Length (varint) */                                          \
+        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
+        /* Attributes */                                               \
+        1 +                                                            \
+        /* TimestampDelta (varint) */                                  \
+        RD_UVARINT_ENC_SIZEOF(int64_t) +                               \
+        /* OffsetDelta (varint) */                                     \
+        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
+        /* KeyLen (varint) */                                          \
+        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
+        /* ValueLen (varint) */                                        \
+        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
+        /* HeaderCnt (varint): */                                      \
+        RD_UVARINT_ENC_SIZEOF(int32_t)                                 \
+        )
+
+
+
+/**
+ * @brief MessageSets are not explicitly versioned but depends on the
+ *        Produce/Fetch API version and the encompassed Message versions.
+ *        We use the Message version (MsgVersion, aka MagicByte) to describe
+ *        the MessageSet version, that is, MsgVersion <= 1 uses the old
+ *        MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
+ */
+
+/* Old MessageSet header: none */
+#define RD_KAFKAP_MSGSET_V0_SIZE                0
+
+/* MessageSet v2 header */
+#define RD_KAFKAP_MSGSET_V2_SIZE                (8+4+4+1+4+2+4+8+8+8+2+4+4)
+
+/* Byte offsets for MessageSet fields */
+#define RD_KAFKAP_MSGSET_V2_OF_Length           (8)
+#define RD_KAFKAP_MSGSET_V2_OF_CRC              (8+4+4+1)
+#define RD_KAFKAP_MSGSET_V2_OF_Attributes       (8+4+4+1+4)
+#define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta  (8+4+4+1+4+2)
+#define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp    (8+4+4+1+4+2+4)
+#define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp     (8+4+4+1+4+2+4+8)
+#define RD_KAFKAP_MSGSET_V2_OF_RecordCount      (8+4+4+1+4+2+4+8+8+8+2+4)