You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:57 UTC

[29/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h
deleted file mode 100644
index 0424b5d..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-#include "rdinterval.h"
-
-#include "rdkafka_assignor.h"
-
-/**
- * Client groups implementation
- *
- * Client groups handling for a single cgrp is assigned to a single
- * rd_kafka_broker_t object at any given time.
- * The main thread will call cgrp_serve() to serve its cgrps.
- *
- * This means that the cgrp itself does not need to be locked since it
- * is only ever used from the main thread.
- *
- */
-
-
-extern const char *rd_kafka_cgrp_join_state_names[];
-
-/**
- * Client group
- */
-typedef struct rd_kafka_cgrp_s {
-        TAILQ_ENTRY(rd_kafka_cgrp_s) rkcg_rkb_link;  /* rkb_cgrps */
-        const rd_kafkap_str_t    *rkcg_group_id;
-        rd_kafkap_str_t          *rkcg_member_id;  /* Last assigned MemberId */
-        const rd_kafkap_str_t    *rkcg_client_id;
-
-        enum {
-                /* Init state */
-                RD_KAFKA_CGRP_STATE_INIT,
-
-                /* Cgrp has been stopped. This is a final state */
-                RD_KAFKA_CGRP_STATE_TERM,
-
-                /* Query for group coordinator */
-                RD_KAFKA_CGRP_STATE_QUERY_COORD,
-
-                /* Outstanding query, awaiting response */
-                RD_KAFKA_CGRP_STATE_WAIT_COORD,
-
-                /* Wait ack from assigned cgrp manager broker thread */
-                RD_KAFKA_CGRP_STATE_WAIT_BROKER,
-
-                /* Wait for manager broker thread to connect to broker */
-                RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT,
-
-                /* Coordinator is up and manager is assigned. */
-                RD_KAFKA_CGRP_STATE_UP,
-        } rkcg_state;
-        rd_ts_t            rkcg_ts_statechange;     /* Timestamp of last
-                                                     * state change. */
-
-
-        enum {
-                RD_KAFKA_CGRP_JOIN_STATE_INIT,
-
-                /* all: JoinGroupRequest sent, awaiting response. */
-                RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN,
-
-                /* Leader: MetadataRequest sent, awaiting response. */
-                RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA,
-
-                /* Follower: SyncGroupRequest sent, awaiting response. */
-                RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC,
-
-                /* all: waiting for previous assignment to decommission */
-                RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN,
-
-                /* all: waiting for application's rebalance_cb to assign() */
-                RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB,
-
-		/* all: waiting for application's rebalance_cb to revoke */
-                RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB,
-
-                /* all: synchronized and assigned
-                 *      may be an empty assignment. */
-                RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED,
-
-		/* all: fetchers are started and operational */
-		RD_KAFKA_CGRP_JOIN_STATE_STARTED
-        } rkcg_join_state;
-
-        /* State when group leader */
-        struct {
-                char *protocol;
-                rd_kafka_group_member_t *members;
-                int member_cnt;
-        } rkcg_group_leader;
-
-        rd_kafka_q_t      *rkcg_q;                  /* Application poll queue */
-        rd_kafka_q_t      *rkcg_ops;                /* Manager ops queue */
-	rd_kafka_q_t      *rkcg_wait_coord_q;       /* Ops awaiting coord */
-	int32_t            rkcg_version;            /* Ops queue version barrier
-						     * Increased by:
-						     *  Rebalance delegation
-						     *  Assign/Unassign
-						     */
-        mtx_t              rkcg_lock;
-
-        int                rkcg_flags;
-#define RD_KAFKA_CGRP_F_TERMINATE    0x1            /* Terminate cgrp (async) */
-#define RD_KAFKA_CGRP_F_WAIT_UNASSIGN 0x4           /* Waiting for unassign
-						     * to complete */
-#define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN 0x8       /* Send LeaveGroup when
-						     * unassign is done */
-#define RD_KAFKA_CGRP_F_SUBSCRIPTION 0x10           /* If set:
-                                                     *   subscription
-                                                     * else:
-                                                     *   static assignment */
-#define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT  0x20  /* A Heartbeat request
-                                                     * is in transit, dont
-                                                     * send a new one. */
-#define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION 0x40  /* Subscription contains
-                                                     * wildcards. */
-
-        rd_interval_t      rkcg_coord_query_intvl;  /* Coordinator query intvl*/
-        rd_interval_t      rkcg_heartbeat_intvl;    /* Heartbeat intvl */
-        rd_interval_t      rkcg_join_intvl;         /* JoinGroup interval */
-        rd_interval_t      rkcg_timeout_scan_intvl; /* Timeout scanner */
-
-        TAILQ_HEAD(, rd_kafka_topic_s)  rkcg_topics;/* Topics subscribed to */
-
-        rd_list_t          rkcg_toppars;            /* Toppars subscribed to*/
-
-	int                rkcg_assigned_cnt;       /* Assigned partitions */
-
-        int32_t            rkcg_coord_id;           /* Current coordinator id */
-
-        int32_t            rkcg_generation_id;      /* Current generation id */
-
-        rd_kafka_assignor_t *rkcg_assignor;         /* Selected partition
-                                                     * assignor strategy. */
-
-        rd_kafka_broker_t *rkcg_rkb;                /* Current handling broker,
-                                                     * if the coordinator broker
-                                                     * is not available this
-                                                     * will be another broker
-                                                     * that will handle the
-                                                     * querying of coordinator
-                                                     * etc.
-                                                     * Broker in this sense
-                                                     * is a broker_t object,
-                                                     * not necessarily a
-                                                     * real broker. */
-
-        /* Current subscription */
-        rd_kafka_topic_partition_list_t *rkcg_subscription;
-	/* The actual topics subscribed (after metadata+wildcard matching) */
-	rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
-
-        /* Current assignment */
-        rd_kafka_topic_partition_list_t *rkcg_assignment;
-
-        int rkcg_wait_unassign_cnt;                 /* Waiting for this number
-                                                     * of partitions to be
-                                                     * unassigned and
-                                                     * decommissioned before
-                                                     * transitioning to the
-                                                     * next state. */
-
-	int rkcg_wait_commit_cnt;                   /* Waiting for this number
-						     * of commits to finish. */
-
-        rd_kafka_resp_err_t rkcg_last_err;          /* Last error propagated to
-                                                     * application.
-                                                     * This is for silencing
-                                                     * same errors. */
-
-        rd_kafka_timer_t   rkcg_offset_commit_tmr;  /* Offset commit timer */
-
-        rd_kafka_t        *rkcg_rk;
-
-        rd_kafka_op_t     *rkcg_reply_rko;          /* Send reply for op
-                                                     * (OP_TERMINATE)
-                                                     * to this rko's queue. */
-
-	rd_ts_t            rkcg_ts_terminate;       /* Timestamp of when
-						     * cgrp termination was
-						     * initiated. */
-
-        /* Protected by rd_kafka_*lock() */
-        struct {
-                rd_ts_t            ts_rebalance;       /* Timestamp of
-                                                        * last rebalance */
-                int                rebalance_cnt;      /* Number of
-                                                          rebalances */
-                int                assignment_size;    /* Partition count
-                                                        * of last rebalance
-                                                        * assignment */
-        } rkcg_c;
-
-} rd_kafka_cgrp_t;
-
-
-
-
-#define rd_kafka_cgrp_lock(rkcg)    mtx_lock(&(rkcg)->rkcg_lock)
-#define rd_kafka_cgrp_unlock(rkcg)  mtx_unlock(&(rkcg)->rkcg_lock)
-
-/* Check if broker is the coordinator */
-#define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,rkb)          \
-        ((rkcg)->rkcg_coord_id != -1 &&                  \
-         (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid)
-
-extern const char *rd_kafka_cgrp_state_names[];
-
-void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg);
-rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
-                                    const rd_kafkap_str_t *group_id,
-                                    const rd_kafkap_str_t *client_id);
-void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg);
-
-void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
-                       rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
-                       rd_kafka_resp_err_t err);
-void rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko);
-void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq);
-
-
-rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del (rd_kafka_cgrp_t *rkcg,
-                                                     const char *pattern);
-rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add (rd_kafka_cgrp_t *rkcg,
-                                                     const char *pattern);
-
-int rd_kafka_cgrp_topic_check (rd_kafka_cgrp_t *rkcg, const char *topic);
-
-void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id);
-
-void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
-					   rd_kafka_resp_err_t err);
-
-void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
-				     rd_kafka_broker_t *rkb,
-                                     rd_kafka_resp_err_t err,
-                                     const rd_kafkap_bytes_t *member_state);
-void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state);
-
-int rd_kafka_cgrp_reassign_broker (rd_kafka_cgrp_t *rkcg);
-
-void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
-				const char *reason);
-void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
-			       const char *reason);
-void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join);
-#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c
deleted file mode 100644
index a0e4d9f..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c
+++ /dev/null
@@ -1,2151 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rd.h"
-
-#include <stdlib.h>
-#include <ctype.h>
-#include <stddef.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_feature.h"
-#include "rdkafka_interceptor.h"
-#if WITH_PLUGINS
-#include "rdkafka_plugin.h"
-#endif
-
-struct rd_kafka_property {
-	rd_kafka_conf_scope_t scope;
-	const char *name;
-	enum {
-		_RK_C_STR,
-		_RK_C_INT,
-		_RK_C_S2I,  /* String to Integer mapping.
-			     * Supports limited canonical str->int mappings
-			     * using s2i[] */
-		_RK_C_S2F,  /* CSV String to Integer flag mapping (OR:ed) */
-		_RK_C_BOOL,
-		_RK_C_PTR,  /* Only settable through special set functions */
-                _RK_C_PATLIST, /* Pattern list */
-                _RK_C_KSTR, /* Kafka string */
-                _RK_C_ALIAS, /* Alias: points to other property through .sdef */
-                _RK_C_INTERNAL, /* Internal, don't expose to application */
-                _RK_C_INVALID,  /* Invalid property, used to catch known
-                                 * but unsupported Java properties. */
-	} type;
-	int   offset;
-	const char *desc;
-	int   vmin;
-	int   vmax;
-	int   vdef;        /* Default value (int) */
-	const char *sdef;  /* Default value (string) */
-        void  *pdef;       /* Default value (pointer) */
-	struct {
-		int val;
-		const char *str;
-	} s2i[16];  /* _RK_C_S2I and _RK_C_S2F */
-
-	/* Value validator (STR) */
-	int (*validate) (const struct rd_kafka_property *prop,
-			 const char *val, int ival);
-
-        /* Configuration object constructors and destructor for use when
-         * the property value itself is not used, or needs extra care. */
-        void (*ctor) (int scope, void *pconf);
-        void (*dtor) (int scope, void *pconf);
-        void (*copy) (int scope, void *pdst, const void *psrc,
-                      void *dstptr, const void *srcptr,
-                      size_t filter_cnt, const char **filter);
-
-        rd_kafka_conf_res_t (*set) (int scope, void *pconf,
-                                    const char *name, const char *value,
-                                    void *dstptr,
-                                    rd_kafka_conf_set_mode_t set_mode,
-                                    char *errstr, size_t errstr_size);
-};
-
-
-#define _RK(field)  offsetof(rd_kafka_conf_t, field)
-#define _RKT(field) offsetof(rd_kafka_topic_conf_t, field)
-
-
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
-                       char *dest, size_t *dest_size);
-
-
-/**
- * @brief Validate \p broker.version.fallback property.
- */
-static int
-rd_kafka_conf_validate_broker_version (const struct rd_kafka_property *prop,
-				       const char *val, int ival) {
-	struct rd_kafka_ApiVersion *apis;
-	size_t api_cnt;
-	return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL);
-}
-
-/**
- * @brief Validate that string is a single item, without delimters (, space).
- */
-static RD_UNUSED int
-rd_kafka_conf_validate_single (const struct rd_kafka_property *prop,
-				const char *val, int ival) {
-	return !strchr(val, ',') && !strchr(val, ' ');
-}
-
-
-/**
- * librdkafka configuration property definitions.
- */
-static const struct rd_kafka_property rd_kafka_properties[] = {
-	/* Global properties */
-	{ _RK_GLOBAL, "builtin.features", _RK_C_S2F, _RK(builtin_features),
-	"Indicates the builtin features for this build of librdkafka. "
-	"An application can either query this value or attempt to set it "
-	"with its list of required features to check for library support.",
-	0, 0x7fffffff, 0xffff,
-	.s2i = {
-#if WITH_ZLIB
-		{ 0x1, "gzip" },
-#endif
-#if WITH_SNAPPY
-		{ 0x2, "snappy" },
-#endif
-#if WITH_SSL
-		{ 0x4, "ssl" },
-#endif
-                { 0x8, "sasl" },
-		{ 0x10, "regex" },
-		{ 0x20, "lz4" },
-#if defined(_MSC_VER) || WITH_SASL_CYRUS
-                { 0x40, "sasl_gssapi" },
-#endif
-                { 0x80, "sasl_plain" },
-#if WITH_SASL_SCRAM
-                { 0x100, "sasl_scram" },
-#endif
-#if WITH_PLUGINS
-                { 0x200, "plugins" },
-#endif
-		{ 0, NULL }
-		}
-	},
-	{ _RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
-	  "Client identifier.",
-	  .sdef =  "rdkafka" },
-	{ _RK_GLOBAL, "metadata.broker.list", _RK_C_STR, _RK(brokerlist),
-	  "Initial list of brokers as a CSV list of broker host or host:port. "
-	  "The application may also use `rd_kafka_brokers_add()` to add "
-	  "brokers during runtime." },
-	{ _RK_GLOBAL, "bootstrap.servers", _RK_C_ALIAS, 0,
-	  "See metadata.broker.list",
-	  .sdef = "metadata.broker.list" },
-	{ _RK_GLOBAL, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
-	  "Maximum Kafka protocol request message size.",
-	  1000, 1000000000, 1000000 },
-	{ _RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT,
-	  _RK(msg_copy_max_size),
-	  "Maximum size for message to be copied to buffer. "
-	  "Messages larger than this will be passed by reference (zero-copy) "
-	  "at the expense of larger iovecs.",
-	  0, 1000000000, 0xffff },
-	{ _RK_GLOBAL, "receive.message.max.bytes", _RK_C_INT,
-          _RK(recv_max_msg_size),
-	  "Maximum Kafka protocol response message size. "
-	  "This is a safety precaution to avoid memory exhaustion in case of "
-	  "protocol hickups. "
-          "The value should be at least fetch.message.max.bytes * number of "
-          "partitions consumed from + messaging overhead (e.g. 200000 bytes).",
-	  1000, 1000000000, 100000000 },
-	{ _RK_GLOBAL, "max.in.flight.requests.per.connection", _RK_C_INT,
-	  _RK(max_inflight),
-	  "Maximum number of in-flight requests per broker connection. "
-	  "This is a generic property applied to all broker communication, "
-	  "however it is primarily relevant to produce requests. "
-	  "In particular, note that other mechanisms limit the number "
-	  "of outstanding consumer fetch request per broker to one.",
-	  1, 1000000, 1000000 },
-        { _RK_GLOBAL, "max.in.flight", _RK_C_ALIAS,
-          .sdef = "max.in.flight.requests.per.connection" },
-	{ _RK_GLOBAL, "metadata.request.timeout.ms", _RK_C_INT,
-	  _RK(metadata_request_timeout_ms),
-	  "Non-topic request timeout in milliseconds. "
-	  "This is for metadata requests, etc.",
-	  10, 900*1000, 60*1000},
-	{ _RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
-	  _RK(metadata_refresh_interval_ms),
-	  "Topic metadata refresh interval in milliseconds. "
-	  "The metadata is automatically refreshed on error and connect. "
-	  "Use -1 to disable the intervalled refresh.",
-	  -1, 3600*1000, 5*60*1000 },
-	{ _RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT,
-          _RK(metadata_max_age_ms),
-          "Metadata cache max age. "
-          "Defaults to metadata.refresh.interval.ms * 3",
-          1, 24*3600*1000, -1 },
-        { _RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
-          _RK(metadata_refresh_fast_interval_ms),
-          "When a topic loses its leader a new metadata request will be "
-          "enqueued with this initial interval, exponentially increasing "
-          "until the topic metadata has been refreshed. "
-          "This is used to recover quickly from transitioning leader brokers.",
-          1, 60*1000, 250 },
-        { _RK_GLOBAL, "topic.metadata.refresh.fast.cnt", _RK_C_INT,
-          _RK(metadata_refresh_fast_cnt),
-          "*Deprecated: No longer used.*",
-          0, 1000, 10 },
-        { _RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
-          _RK(metadata_refresh_sparse),
-          "Sparse metadata requests (consumes less network bandwidth)",
-          0, 1, 1 },
-        { _RK_GLOBAL, "topic.blacklist", _RK_C_PATLIST,
-          _RK(topic_blacklist),
-          "Topic blacklist, a comma-separated list of regular expressions "
-          "for matching topic names that should be ignored in "
-          "broker metadata information as if the topics did not exist." },
-	{ _RK_GLOBAL, "debug", _RK_C_S2F, _RK(debug),
-	  "A comma-separated list of debug contexts to enable. "
-	  "Debugging the Producer: broker,topic,msg. Consumer: cgrp,topic,fetch",
-	  .s2i = {
-                        { RD_KAFKA_DBG_GENERIC,  "generic" },
-			{ RD_KAFKA_DBG_BROKER,   "broker" },
-			{ RD_KAFKA_DBG_TOPIC,    "topic" },
-			{ RD_KAFKA_DBG_METADATA, "metadata" },
-			{ RD_KAFKA_DBG_QUEUE,    "queue" },
-			{ RD_KAFKA_DBG_MSG,      "msg" },
-			{ RD_KAFKA_DBG_PROTOCOL, "protocol" },
-                        { RD_KAFKA_DBG_CGRP,     "cgrp" },
-			{ RD_KAFKA_DBG_SECURITY, "security" },
-			{ RD_KAFKA_DBG_FETCH,    "fetch" },
-			{ RD_KAFKA_DBG_FEATURE,  "feature" },
-                        { RD_KAFKA_DBG_INTERCEPTOR, "interceptor" },
-                        { RD_KAFKA_DBG_PLUGIN,   "plugin" },
-			{ RD_KAFKA_DBG_ALL,      "all" },
-		} },
-	{ _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
-	  "Timeout for network requests.",
-	  10, 300*1000, 60*1000 },
-	{ _RK_GLOBAL, "socket.blocking.max.ms", _RK_C_INT,
-	  _RK(socket_blocking_max_ms),
-	  "Maximum time a broker socket operation may block. "
-          "A lower value improves responsiveness at the expense of "
-          "slightly higher CPU usage. **Deprecated**",
-	  1, 60*1000, 1000 },
-	{ _RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT,
-	  _RK(socket_sndbuf_size),
-	  "Broker socket send buffer size. System default is used if 0.",
-	  0, 100000000, 0 },
-	{ _RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT,
-	  _RK(socket_rcvbuf_size),
-	  "Broker socket receive buffer size. System default is used if 0.",
-	  0, 100000000, 0 },
-	{ _RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL,
-	  _RK(socket_keepalive),
-          "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets",
-          0, 1, 0 },
-	{ _RK_GLOBAL, "socket.nagle.disable", _RK_C_BOOL,
-	  _RK(socket_nagle_disable),
-          "Disable the Nagle algorithm (TCP_NODELAY).",
-          0, 1, 0 },
-        { _RK_GLOBAL, "socket.max.fails", _RK_C_INT,
-          _RK(socket_max_fails),
-          "Disconnect from broker when this number of send failures "
-          "(e.g., timed out requests) is reached. Disable with 0. "
-          "NOTE: The connection is automatically re-established.",
-          0, 1000000, 3 },
-	{ _RK_GLOBAL, "broker.address.ttl", _RK_C_INT,
-	  _RK(broker_addr_ttl),
-	  "How long to cache the broker address resolving "
-          "results (milliseconds).",
-	  0, 86400*1000, 1*1000 },
-        { _RK_GLOBAL, "broker.address.family", _RK_C_S2I,
-          _RK(broker_addr_family),
-          "Allowed broker IP address families: any, v4, v6",
-          .vdef = AF_UNSPEC,
-          .s2i = {
-                        { AF_UNSPEC, "any" },
-                        { AF_INET, "v4" },
-                        { AF_INET6, "v6" },
-                } },
-        { _RK_GLOBAL, "reconnect.backoff.jitter.ms", _RK_C_INT,
-          _RK(reconnect_jitter_ms),
-          "Throttle broker reconnection attempts by this value +-50%.",
-          0, 60*60*1000, 500 },
-	{ _RK_GLOBAL, "statistics.interval.ms", _RK_C_INT,
-	  _RK(stats_interval_ms),
-	  "librdkafka statistics emit interval. The application also needs to "
-	  "register a stats callback using `rd_kafka_conf_set_stats_cb()`. "
-	  "The granularity is 1000ms. A value of 0 disables statistics.",
-	  0, 86400*1000, 0 },
-	{ _RK_GLOBAL, "enabled_events", _RK_C_INT,
-	  _RK(enabled_events),
-	  "See `rd_kafka_conf_set_events()`",
-	  0, 0x7fffffff, 0 },
-	{ _RK_GLOBAL, "error_cb", _RK_C_PTR,
-	  _RK(error_cb),
-	  "Error callback (set with rd_kafka_conf_set_error_cb())" },
-	{ _RK_GLOBAL, "throttle_cb", _RK_C_PTR,
-	  _RK(throttle_cb),
-	  "Throttle callback (set with rd_kafka_conf_set_throttle_cb())" },
-	{ _RK_GLOBAL, "stats_cb", _RK_C_PTR,
-	  _RK(stats_cb),
-	  "Statistics callback (set with rd_kafka_conf_set_stats_cb())" },
-	{ _RK_GLOBAL, "log_cb", _RK_C_PTR,
-	  _RK(log_cb),
-	  "Log callback (set with rd_kafka_conf_set_log_cb())",
-          .pdef = rd_kafka_log_print },
-        { _RK_GLOBAL, "log_level", _RK_C_INT,
-          _RK(log_level),
-          "Logging level (syslog(3) levels)",
-          0, 7, 6 },
-        { _RK_GLOBAL, "log.queue", _RK_C_BOOL, _RK(log_queue),
-          "Disable spontaneous log_cb from internal librdkafka "
-          "threads, instead enqueue log messages on queue set with "
-          "`rd_kafka_set_log_queue()` and serve log callbacks or "
-          "events through the standard poll APIs. "
-          "**NOTE**: Log messages will linger in a temporary queue "
-          "until the log queue has been set.",
-          0, 1, 0 },
-	{ _RK_GLOBAL, "log.thread.name", _RK_C_BOOL,
-	  _RK(log_thread_name),
-	  "Print internal thread name in log messages "
-	  "(useful for debugging librdkafka internals)",
-	  0, 1, 1 },
-	{ _RK_GLOBAL, "log.connection.close", _RK_C_BOOL,
-	  _RK(log_connection_close),
-	  "Log broker disconnects. "
-          "It might be useful to turn this off when interacting with "
-          "0.9 brokers with an aggressive `connection.max.idle.ms` value.",
-	  0, 1, 1 },
-        { _RK_GLOBAL, "socket_cb", _RK_C_PTR,
-          _RK(socket_cb),
-          "Socket creation callback to provide race-free CLOEXEC",
-          .pdef =
-#ifdef __linux__
-          rd_kafka_socket_cb_linux
-#else
-          rd_kafka_socket_cb_generic
-#endif
-        },
-        { _RK_GLOBAL, "connect_cb", _RK_C_PTR,
-          _RK(connect_cb),
-          "Socket connect callback",
-        },
-        { _RK_GLOBAL, "closesocket_cb", _RK_C_PTR,
-          _RK(closesocket_cb),
-          "Socket close callback",
-        },
-        { _RK_GLOBAL, "open_cb", _RK_C_PTR,
-          _RK(open_cb),
-          "File open callback to provide race-free CLOEXEC",
-          .pdef =
-#ifdef __linux__
-          rd_kafka_open_cb_linux
-#else
-          rd_kafka_open_cb_generic
-#endif
-        },
-	{ _RK_GLOBAL, "opaque", _RK_C_PTR,
-	  _RK(opaque),
-	  "Application opaque (set with rd_kafka_conf_set_opaque())" },
-        { _RK_GLOBAL, "default_topic_conf", _RK_C_PTR,
-          _RK(topic_conf),
-          "Default topic configuration for automatically subscribed topics" },
-	{ _RK_GLOBAL, "internal.termination.signal", _RK_C_INT,
-	  _RK(term_sig),
-	  "Signal that librdkafka will use to quickly terminate on "
-	  "rd_kafka_destroy(). If this signal is not set then there will be a "
-	  "delay before rd_kafka_wait_destroyed() returns true "
-	  "as internal threads are timing out their system calls. "
-	  "If this signal is set however the delay will be minimal. "
-	  "The application should mask this signal as an internal "
-	  "signal handler is installed.",
-	  0, 128, 0 },
-	{ _RK_GLOBAL, "api.version.request", _RK_C_BOOL,
-	  _RK(api_version_request),
-	  "Request broker's supported API versions to adjust functionality to "
-	  "available protocol features. If set to false, or the "
-          "ApiVersionRequest fails, the fallback version "
-	  "`broker.version.fallback` will be used. "
-	  "**NOTE**: Depends on broker version >=0.10.0. If the request is not "
-	  "supported by (an older) broker the `broker.version.fallback` fallback is used.",
-	  0, 1, 1 },
-	{ _RK_GLOBAL, "api.version.request.timeout.ms", _RK_C_INT,
-	  _RK(api_version_request_timeout_ms),
-	  "Timeout for broker API version requests.",
-	  1, 5*60*1000, 10*1000 },
-	{ _RK_GLOBAL, "api.version.fallback.ms", _RK_C_INT,
-	  _RK(api_version_fallback_ms),
-	  "Dictates how long the `broker.version.fallback` fallback is used "
-	  "in the case the ApiVersionRequest fails. "
-	  "**NOTE**: The ApiVersionRequest is only issued when a new connection "
-	  "to the broker is made (such as after an upgrade).",
-	  0, 86400*7*1000, 20*60*1000 /* longer than default Idle timeout (10m)*/ },
-
-	{ _RK_GLOBAL, "broker.version.fallback", _RK_C_STR,
-	  _RK(broker_version_fallback),
-	  "Older broker versions (<0.10.0) provides no way for a client to query "
-	  "for supported protocol features "
-	  "(ApiVersionRequest, see `api.version.request`) making it impossible "
-	  "for the client to know what features it may use. "
-	  "As a workaround a user may set this property to the expected broker "
-	  "version and the client will automatically adjust its feature set "
-	  "accordingly if the ApiVersionRequest fails (or is disabled). "
-	  "The fallback broker version will be used for `api.version.fallback.ms`. "
-          "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value, "
-          "such as 0.10.2.1, enables ApiVersionRequests.",
-	  .sdef = "0.9.0",
-	  .validate = rd_kafka_conf_validate_broker_version },
-
-	/* Security related global properties */
-	{ _RK_GLOBAL, "security.protocol", _RK_C_S2I,
-	  _RK(security_protocol),
-	  "Protocol used to communicate with brokers.",
-	  .vdef = RD_KAFKA_PROTO_PLAINTEXT,
-	  .s2i = {
-			{ RD_KAFKA_PROTO_PLAINTEXT, "plaintext" },
-#if WITH_SSL
-			{ RD_KAFKA_PROTO_SSL, "ssl" },
-#endif
-			{ RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext" },
-#if WITH_SSL
-			{ RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl" },
-#endif
-			{ 0, NULL }
-		} },
-
-#if WITH_SSL
-	{ _RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR,
-	  _RK(ssl.cipher_suites),
-	  "A cipher suite is a named combination of authentication, "
-	  "encryption, MAC and key exchange algorithm used to negotiate the "
-	  "security settings for a network connection using TLS or SSL network "
-	  "protocol. See manual page for `ciphers(1)` and "
-	  "`SSL_CTX_set_cipher_list(3)."
-	},
-	{ _RK_GLOBAL, "ssl.key.location", _RK_C_STR,
-	  _RK(ssl.key_location),
-	  "Path to client's private key (PEM) used for authentication."
-	},
-	{ _RK_GLOBAL, "ssl.key.password", _RK_C_STR,
-	  _RK(ssl.key_password),
-	  "Private key passphrase"
-	},
-	{ _RK_GLOBAL, "ssl.certificate.location", _RK_C_STR,
-	  _RK(ssl.cert_location),
-	  "Path to client's public key (PEM) used for authentication."
-	},
-	{ _RK_GLOBAL, "ssl.ca.location", _RK_C_STR,
-	  _RK(ssl.ca_location),
-	  "File or directory path to CA certificate(s) for verifying "
-	  "the broker's key."
-	},
-	{ _RK_GLOBAL, "ssl.crl.location", _RK_C_STR,
-	  _RK(ssl.crl_location),
-	  "Path to CRL for verifying broker's certificate validity."
-	},
-#endif /* WITH_SSL */
-
-        /* Point user in the right direction if they try to apply
-         * Java client SSL / JAAS properties. */
-        { _RK_GLOBAL, "ssl.keystore.location", _RK_C_INVALID,
-          _RK(dummy),
-          "Java KeyStores are not supported, use `ssl.key.location` and "
-          "a private key (PEM) file instead. "
-          "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information."
-        },
-        { _RK_GLOBAL, "ssl.truststore.location", _RK_C_INVALID,
-          _RK(dummy),
-          "Java TrustStores are not supported, use `ssl.ca.location` "
-          "and a certificate file instead. "
-          "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information."
-        },
-        { _RK_GLOBAL, "sasl.jaas.config", _RK_C_INVALID,
-          _RK(dummy),
-          "Java JAAS configuration is not supported, see "
-          "https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka "
-          "for more information."
-        },
-
-	{_RK_GLOBAL,"sasl.mechanisms", _RK_C_STR,
-	 _RK(sasl.mechanisms),
-	 "SASL mechanism to use for authentication. "
-	 "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. "
-	 "**NOTE**: Despite the name only one mechanism must be configured.",
-	 .sdef = "GSSAPI",
-	 .validate = rd_kafka_conf_validate_single },
-	{ _RK_GLOBAL, "sasl.kerberos.service.name", _RK_C_STR,
-	  _RK(sasl.service_name),
-	  "Kerberos principal name that Kafka runs as.",
-	  .sdef = "kafka" },
-	{ _RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR,
-	  _RK(sasl.principal),
-	  "This client's Kerberos principal name.",
-	  .sdef = "kafkaclient" },
-#ifndef _MSC_VER
-	{ _RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR,
-	  _RK(sasl.kinit_cmd),
-	  "Full kerberos kinit command string, %{config.prop.name} is replaced "
-	  "by corresponding config object value, %{broker.name} returns the "
-	  "broker's hostname.",
-	  .sdef = "kinit -S \"%{sasl.kerberos.service.name}/%{broker.name}\" "
-	  "-k -t \"%{sasl.kerberos.keytab}\" %{sasl.kerberos.principal}" },
-	{ _RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR,
-	  _RK(sasl.keytab),
-	  "Path to Kerberos keytab file. Uses system default if not set."
-	  "**NOTE**: This is not automatically used but must be added to the "
-	  "template in sasl.kerberos.kinit.cmd as "
-	  "` ... -t %{sasl.kerberos.keytab}`." },
-	{ _RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT,
-	  _RK(sasl.relogin_min_time),
-	  "Minimum time in milliseconds between key refresh attempts.",
-	  1, 86400*1000, 60*1000 },
-#endif
-	{ _RK_GLOBAL, "sasl.username", _RK_C_STR,
-	  _RK(sasl.username),
-	  "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" },
-	{ _RK_GLOBAL, "sasl.password", _RK_C_STR,
-	  _RK(sasl.password),
-	  "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" },
-
-#if WITH_PLUGINS
-        /* Plugins */
-        { _RK_GLOBAL, "plugin.library.paths", _RK_C_STR,
-          _RK(plugin_paths),
-          "List of plugin libaries to load (; separated). "
-          "The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the "
-          "platform-specific extension (such as .dll or .so) will be appended automatically.",
-          .set = rd_kafka_plugins_conf_set },
-#endif
-
-        /* Interceptors are added through specific API and not exposed
-         * as configuration properties.
-         * The interceptor property must be defined after plugin.library.paths
-         * so that the plugin libraries are properly loaded before
-         * interceptors are configured when duplicating configuration objects.*/
-        { _RK_GLOBAL, "interceptors", _RK_C_INTERNAL,
-          _RK(interceptors),
-          "Interceptors added through rd_kafka_conf_interceptor_add_..() "
-          "and any configuration handled by interceptors.",
-          .ctor = rd_kafka_conf_interceptor_ctor,
-          .dtor = rd_kafka_conf_interceptor_dtor,
-          .copy = rd_kafka_conf_interceptor_copy },
-
-        /* Global client group properties */
-        { _RK_GLOBAL|_RK_CGRP, "group.id", _RK_C_STR,
-          _RK(group_id_str),
-          "Client group id string. All clients sharing the same group.id "
-          "belong to the same group." },
-        { _RK_GLOBAL|_RK_CGRP, "partition.assignment.strategy", _RK_C_STR,
-          _RK(partition_assignment_strategy),
-          "Name of partition assignment strategy to use when elected "
-          "group leader assigns partitions to group members.",
-	  .sdef = "range,roundrobin" },
-        { _RK_GLOBAL|_RK_CGRP, "session.timeout.ms", _RK_C_INT,
-          _RK(group_session_timeout_ms),
-          "Client group session and failure detection timeout.",
-          1, 3600*1000, 30*1000 },
-        { _RK_GLOBAL|_RK_CGRP, "heartbeat.interval.ms", _RK_C_INT,
-          _RK(group_heartbeat_intvl_ms),
-          "Group session keepalive heartbeat interval.",
-          1, 3600*1000, 1*1000 },
-        { _RK_GLOBAL|_RK_CGRP, "group.protocol.type", _RK_C_KSTR,
-          _RK(group_protocol_type),
-          "Group protocol type",
-          .sdef = "consumer" },
-        { _RK_GLOBAL|_RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT,
-          _RK(coord_query_intvl_ms),
-          "How often to query for the current client group coordinator. "
-          "If the currently assigned coordinator is down the configured "
-          "query interval will be divided by ten to more quickly recover "
-          "in case of coordinator reassignment.",
-          1, 3600*1000, 10*60*1000 },
-
-        /* Global consumer properties */
-        { _RK_GLOBAL|_RK_CONSUMER, "enable.auto.commit", _RK_C_BOOL,
-          _RK(enable_auto_commit),
-          "Automatically and periodically commit offsets in the background.",
-          0, 1, 1 },
-        { _RK_GLOBAL|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT,
-	  _RK(auto_commit_interval_ms),
-	  "The frequency in milliseconds that the consumer offsets "
-	  "are committed (written) to offset storage. (0 = disable). "
-          "This setting is used by the high-level consumer.",
-          0, 86400*1000, 5*1000 },
-        { _RK_GLOBAL|_RK_CONSUMER, "enable.auto.offset.store", _RK_C_BOOL,
-          _RK(enable_auto_offset_store),
-          "Automatically store offset of last message provided to "
-	  "application.",
-          0, 1, 1 },
-	{ _RK_GLOBAL|_RK_CONSUMER, "queued.min.messages", _RK_C_INT,
-	  _RK(queued_min_msgs),
-	  "Minimum number of messages per topic+partition "
-          "librdkafka tries to maintain in the local consumer queue.",
-	  1, 10000000, 100000 },
-	{ _RK_GLOBAL|_RK_CONSUMER, "queued.max.messages.kbytes", _RK_C_INT,
-	  _RK(queued_max_msg_kbytes),
-          "Maximum number of kilobytes per topic+partition in the "
-          "local consumer queue. "
-	  "This value may be overshot by fetch.message.max.bytes. "
-	  "This property has higher priority than queued.min.messages.",
-          1, 1000000000, 1000000 /* 1 Gig */ },
-	{ _RK_GLOBAL|_RK_CONSUMER, "fetch.wait.max.ms", _RK_C_INT,
-	  _RK(fetch_wait_max_ms),
-	  "Maximum time the broker may wait to fill the response "
-	  "with fetch.min.bytes.",
-	  0, 300*1000, 100 },
-        { _RK_GLOBAL|_RK_CONSUMER, "fetch.message.max.bytes", _RK_C_INT,
-          _RK(fetch_msg_max_bytes),
-          "Initial maximum number of bytes per topic+partition to request when "
-          "fetching messages from the broker. "
-	  "If the client encounters a message larger than this value "
-	  "it will gradually try to increase it until the "
-	  "entire message can be fetched.",
-          1, 1000000000, 1024*1024 },
-	{ _RK_GLOBAL|_RK_CONSUMER, "max.partition.fetch.bytes", _RK_C_ALIAS,
-	  .sdef = "fetch.message.max.bytes" },
-	{ _RK_GLOBAL|_RK_CONSUMER, "fetch.min.bytes", _RK_C_INT,
-	  _RK(fetch_min_bytes),
-	  "Minimum number of bytes the broker responds with. "
-	  "If fetch.wait.max.ms expires the accumulated data will "
-	  "be sent to the client regardless of this setting.",
-	  1, 100000000, 1 },
-	{ _RK_GLOBAL|_RK_CONSUMER, "fetch.error.backoff.ms", _RK_C_INT,
-	  _RK(fetch_error_backoff_ms),
-	  "How long to postpone the next fetch request for a "
-	  "topic+partition in case of a fetch error.",
-	  0, 300*1000, 500 },
-        { _RK_GLOBAL|_RK_CONSUMER, "offset.store.method", _RK_C_S2I,
-          _RK(offset_store_method),
-          "Offset commit store method: "
-          "'file' - local file store (offset.store.path, et.al), "
-          "'broker' - broker commit store "
-          "(requires Apache Kafka 0.8.2 or later on the broker).",
-          .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
-          .s2i = {
-                        { RD_KAFKA_OFFSET_METHOD_NONE, "none" },
-                        { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
-                        { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
-                }
-        },
-        { _RK_GLOBAL|_RK_CONSUMER, "consume_cb", _RK_C_PTR,
-	  _RK(consume_cb),
-	  "Message consume callback (set with rd_kafka_conf_set_consume_cb())"},
-	{ _RK_GLOBAL|_RK_CONSUMER, "rebalance_cb", _RK_C_PTR,
-	  _RK(rebalance_cb),
-	  "Called after consumer group has been rebalanced "
-          "(set with rd_kafka_conf_set_rebalance_cb())" },
-	{ _RK_GLOBAL|_RK_CONSUMER, "offset_commit_cb", _RK_C_PTR,
-	  _RK(offset_commit_cb),
-	  "Offset commit result propagation callback. "
-          "(set with rd_kafka_conf_set_offset_commit_cb())" },
-	{ _RK_GLOBAL|_RK_CONSUMER, "enable.partition.eof", _RK_C_BOOL,
-	  _RK(enable_partition_eof),
-	  "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the "
-	  "consumer reaches the end of a partition.",
-	  0, 1, 1 },
-        { _RK_GLOBAL|_RK_CONSUMER, "check.crcs", _RK_C_BOOL,
-          _RK(check_crcs),
-          "Verify CRC32 of consumed messages, ensuring no on-the-wire or "
-          "on-disk corruption to the messages occurred. This check comes "
-          "at slightly increased CPU usage.",
-          0, 1, 0 },
-	/* Global producer properties */
-	{ _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.messages", _RK_C_INT,
-	  _RK(queue_buffering_max_msgs),
-	  "Maximum number of messages allowed on the producer queue.",
-	  1, 10000000, 100000 },
-	{ _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.kbytes", _RK_C_INT,
-	  _RK(queue_buffering_max_kbytes),
-	  "Maximum total message size sum allowed on the producer queue. "
-	  "This property has higher priority than queue.buffering.max.messages.",
-	  1, INT_MAX/1024, 4000000 },
-	{ _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.ms", _RK_C_INT,
-	  _RK(buffering_max_ms),
-	  "Delay in milliseconds to wait for messages in the producer queue "
-          "to accumulate before constructing message batches (MessageSets) to "
-          "transmit to brokers. "
-	  "A higher value allows larger and more effective "
-          "(less overhead, improved compression) batches of messages to "
-          "accumulate at the expense of increased message delivery latency.",
-	  0, 900*1000, 0 },
-        { _RK_GLOBAL|_RK_PRODUCER, "linger.ms", _RK_C_ALIAS,
-          .sdef = "queue.buffering.max.ms" },
-	{ _RK_GLOBAL|_RK_PRODUCER, "message.send.max.retries", _RK_C_INT,
-	  _RK(max_retries),
-	  "How many times to retry sending a failing MessageSet. "
-	  "**Note:** retrying may cause reordering.",
-          0, 10000000, 2 },
-          { _RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
-                .sdef = "message.send.max.retries" },
-	{ _RK_GLOBAL|_RK_PRODUCER, "retry.backoff.ms", _RK_C_INT,
-	  _RK(retry_backoff_ms),
-	  "The backoff time in milliseconds before retrying a message send.",
-	  1, 300*1000, 100 },
-	{ _RK_GLOBAL|_RK_PRODUCER, "compression.codec", _RK_C_S2I,
-	  _RK(compression_codec),
-	  "compression codec to use for compressing message sets. "
-	  "This is the default value for all topics, may be overriden by "
-	  "the topic configuration property `compression.codec`. ",
-	  .vdef = RD_KAFKA_COMPRESSION_NONE,
-	  .s2i = {
-			{ RD_KAFKA_COMPRESSION_NONE,   "none" },
-#if WITH_ZLIB
-			{ RD_KAFKA_COMPRESSION_GZIP,   "gzip" },
-#endif
-#if WITH_SNAPPY
-			{ RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
-#endif
-                        { RD_KAFKA_COMPRESSION_LZ4, "lz4" },
-			{ 0 }
-		} },
-	{ _RK_GLOBAL|_RK_PRODUCER, "batch.num.messages", _RK_C_INT,
-	  _RK(batch_num_messages),
-	  "Maximum number of messages batched in one MessageSet. "
-	  "The total MessageSet size is also limited by message.max.bytes.",
-	  1, 1000000, 10000 },
-	{ _RK_GLOBAL|_RK_PRODUCER, "delivery.report.only.error", _RK_C_BOOL,
-	  _RK(dr_err_only),
-	  "Only provide delivery reports for failed messages.",
-	  0, 1, 0 },
-	{ _RK_GLOBAL|_RK_PRODUCER, "dr_cb", _RK_C_PTR,
-	  _RK(dr_cb),
-	  "Delivery report callback (set with rd_kafka_conf_set_dr_cb())" },
-	{ _RK_GLOBAL|_RK_PRODUCER, "dr_msg_cb", _RK_C_PTR,
-	  _RK(dr_msg_cb),
-	  "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())" },
-
-
-        /*
-         * Topic properties
-         */
-
-        /* Topic producer properties */
-	{ _RK_TOPIC|_RK_PRODUCER, "request.required.acks", _RK_C_INT,
-	  _RKT(required_acks),
-	  "This field indicates how many acknowledgements the leader broker "
-	  "must receive from ISR brokers before responding to the request: "
-	  "*0*=Broker does not send any response/ack to client, "
-	  "*1*=Only the leader broker will need to ack the message, "
-	  "*-1* or *all*=broker will block until message is committed by all "
-	  "in sync replicas (ISRs) or broker's `in.sync.replicas` "
-	  "setting before sending response. ",
-	  -1, 1000, 1,
-	  .s2i = {
-			{ -1, "all" },
-		}
-	},
-	{ _RK_TOPIC | _RK_PRODUCER, "acks", _RK_C_ALIAS,
-	  .sdef = "request.required.acks" },
-
-	{ _RK_TOPIC|_RK_PRODUCER, "request.timeout.ms", _RK_C_INT,
-	  _RKT(request_timeout_ms),
-	  "The ack timeout of the producer request in milliseconds. "
-	  "This value is only enforced by the broker and relies "
-	  "on `request.required.acks` being != 0.",
-	  1, 900*1000, 5*1000 },
-	{ _RK_TOPIC|_RK_PRODUCER, "message.timeout.ms", _RK_C_INT,
-	  _RKT(message_timeout_ms),
-	  "Local message timeout. "
-	  "This value is only enforced locally and limits the time a "
-	  "produced message waits for successful delivery. "
-          "A time of 0 is infinite.",
-	  0, 900*1000, 300*1000 },
-        { _RK_TOPIC|_RK_PRODUCER, "produce.offset.report", _RK_C_BOOL,
-          _RKT(produce_offset_report),
-          "Report offset of produced message back to application. "
-          "The application must be use the `dr_msg_cb` to retrieve the offset "
-          "from `rd_kafka_message_t.offset`.",
-          0, 1, 0 },
-	{ _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR,
-	  _RKT(partitioner),
-	  "Partitioner callback "
-	  "(set with rd_kafka_topic_conf_set_partitioner_cb())" },
-	{ _RK_TOPIC, "opaque", _RK_C_PTR,
-	  _RKT(opaque),
-	  "Application opaque (set with rd_kafka_topic_conf_set_opaque())" },
-	{ _RK_TOPIC | _RK_PRODUCER, "compression.codec", _RK_C_S2I,
-	  _RKT(compression_codec),
-	  "Compression codec to use for compressing message sets. ",
-	  .vdef = RD_KAFKA_COMPRESSION_INHERIT,
-	  .s2i = {
-		  { RD_KAFKA_COMPRESSION_NONE, "none" },
-#if WITH_ZLIB
-		  { RD_KAFKA_COMPRESSION_GZIP, "gzip" },
-#endif
-#if WITH_SNAPPY
-		  { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
-#endif
-		  { RD_KAFKA_COMPRESSION_LZ4, "lz4" },
-		  { RD_KAFKA_COMPRESSION_INHERIT, "inherit" },
-		  { 0 }
-		} },
-
-
-        /* Topic consumer properties */
-	{ _RK_TOPIC|_RK_CONSUMER, "auto.commit.enable", _RK_C_BOOL,
-	  _RKT(auto_commit),
-	  "If true, periodically commit offset of the last message handed "
-	  "to the application. This committed offset will be used when the "
-	  "process restarts to pick up where it left off. "
-	  "If false, the application will have to call "
-	  "`rd_kafka_offset_store()` to store an offset (optional). "
-          "**NOTE:** This property should only be used with the simple "
-          "legacy consumer, when using the high-level KafkaConsumer the global "
-          "`enable.auto.commit` property must be used instead. "
-	  "**NOTE:** There is currently no zookeeper integration, offsets "
-	  "will be written to broker or local file according to "
-          "offset.store.method.",
-	  0, 1, 1 },
-	{ _RK_TOPIC|_RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS,
-	  .sdef = "auto.commit.enable" },
-	{ _RK_TOPIC|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT,
-	  _RKT(auto_commit_interval_ms),
-	  "The frequency in milliseconds that the consumer offsets "
-	  "are committed (written) to offset storage. "
-          "This setting is used by the low-level legacy consumer.",
-	  10, 86400*1000, 60*1000 },
-	{ _RK_TOPIC|_RK_CONSUMER, "auto.offset.reset", _RK_C_S2I,
-	  _RKT(auto_offset_reset),
-	  "Action to take when there is no initial offset in offset store "
-	  "or the desired offset is out of range: "
-	  "'smallest','earliest' - automatically reset the offset to the smallest offset, "
-	  "'largest','latest' - automatically reset the offset to the largest offset, "
-	  "'error' - trigger an error which is retrieved by consuming messages "
-	  "and checking 'message->err'.",
-	  .vdef = RD_KAFKA_OFFSET_END,
-	  .s2i = {
-			{ RD_KAFKA_OFFSET_BEGINNING, "smallest" },
-			{ RD_KAFKA_OFFSET_BEGINNING, "earliest" },
-			{ RD_KAFKA_OFFSET_BEGINNING, "beginning" },
-			{ RD_KAFKA_OFFSET_END, "largest" },
-			{ RD_KAFKA_OFFSET_END, "latest" },
-			{ RD_KAFKA_OFFSET_END, "end" },
-			{ RD_KAFKA_OFFSET_INVALID, "error" },
-		}
-	},
-	{ _RK_TOPIC|_RK_CONSUMER, "offset.store.path", _RK_C_STR,
-	  _RKT(offset_store_path),
-	  "Path to local file for storing offsets. If the path is a directory "
-	  "a filename will be automatically generated in that directory based "
-	  "on the topic and partition.",
-	  .sdef = "." },
-
-	{ _RK_TOPIC|_RK_CONSUMER, "offset.store.sync.interval.ms", _RK_C_INT,
-	  _RKT(offset_store_sync_interval_ms),
-	  "fsync() interval for the offset file, in milliseconds. "
-	  "Use -1 to disable syncing, and 0 for immediate sync after "
-	  "each write.",
-	  -1, 86400*1000, -1 },
-
-        { _RK_TOPIC|_RK_CONSUMER, "offset.store.method", _RK_C_S2I,
-          _RKT(offset_store_method),
-          "Offset commit store method: "
-          "'file' - local file store (offset.store.path, et.al), "
-          "'broker' - broker commit store "
-          "(requires \"group.id\" to be configured and "
-          "Apache Kafka 0.8.2 or later on the broker.).",
-          .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
-          .s2i = {
-                        { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
-                        { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
-                }
-        },
-
-        { _RK_TOPIC|_RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT,
-          _RKT(consume_callback_max_msgs),
-          "Maximum number of messages to dispatch in "
-          "one `rd_kafka_consume_callback*()` call (0 = unlimited)",
-          0, 1000000, 0 },
-
-	{ 0, /* End */ }
-};
-
-
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_set_prop0 (int scope, void *conf,
-			    const struct rd_kafka_property *prop,
-			    const char *istr, int ival, rd_kafka_conf_set_mode_t set_mode,
-                            char *errstr, size_t errstr_size) {
-        rd_kafka_conf_res_t res;
-
-#define _RK_PTR(TYPE,BASE,OFFSET)  (TYPE)(void *)(((char *)(BASE))+(OFFSET))
-
-        /* Try interceptors first (only for GLOBAL config) */
-        if (scope & _RK_GLOBAL) {
-                if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL)
-                        res = RD_KAFKA_CONF_UNKNOWN;
-                else
-                        res = rd_kafka_interceptors_on_conf_set(conf,
-                                                                prop->name,
-                                                                istr,
-                                                                errstr,
-                                                                errstr_size);
-                if (res != RD_KAFKA_CONF_UNKNOWN)
-                        return res;
-        }
-
-
-        if (prop->set) {
-                /* Custom setter */
-                rd_kafka_conf_res_t res;
-
-                res = prop->set(scope, conf, prop->name, istr,
-                                _RK_PTR(void *, conf, prop->offset),
-                                set_mode, errstr, errstr_size);
-
-                if (res != RD_KAFKA_CONF_OK)
-                        return res;
-
-                /* FALLTHRU so that property value is set. */
-        }
-
-	switch (prop->type)
-	{
-	case _RK_C_STR:
-	{
-		char **str = _RK_PTR(char **, conf, prop->offset);
-		if (*str)
-			rd_free(*str);
-		if (istr)
-			*str = rd_strdup(istr);
-		else
-			*str = prop->sdef ? rd_strdup(prop->sdef) : NULL;
-		return RD_KAFKA_CONF_OK;
-	}
-        case _RK_C_KSTR:
-        {
-                rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
-                                                 prop->offset);
-                if (*kstr)
-                        rd_kafkap_str_destroy(*kstr);
-                if (istr)
-                        *kstr = rd_kafkap_str_new(istr, -1);
-                else
-                        *kstr = prop->sdef ?
-				rd_kafkap_str_new(prop->sdef, -1) : NULL;
-                return RD_KAFKA_CONF_OK;
-        }
-	case _RK_C_PTR:
-		*_RK_PTR(const void **, conf, prop->offset) = istr;
-		return RD_KAFKA_CONF_OK;
-	case _RK_C_BOOL:
-	case _RK_C_INT:
-	case _RK_C_S2I:
-	case _RK_C_S2F:
-	{
-		int *val = _RK_PTR(int *, conf, prop->offset);
-
-		if (prop->type == _RK_C_S2F) {
-			switch (set_mode)
-			{
-			case _RK_CONF_PROP_SET_REPLACE:
-				*val = ival;
-				break;
-			case _RK_CONF_PROP_SET_ADD:
-				*val |= ival;
-				break;
-			case _RK_CONF_PROP_SET_DEL:
-				*val &= ~ival;
-				break;
-			}
-		} else {
-			/* Single assignment */
-			*val = ival;
-
-		}
-		return RD_KAFKA_CONF_OK;
-	}
-        case _RK_C_PATLIST:
-        {
-                /* Split comma-separated list into individual regex expressions
-                 * that are verified and then append to the provided list. */
-                rd_kafka_pattern_list_t **plist;
-
-                plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
-
-		if (*plist)
-			rd_kafka_pattern_list_destroy(*plist);
-
-		if (istr) {
-			if (!(*plist =
-			      rd_kafka_pattern_list_new(istr,
-							errstr,
-							(int)errstr_size)))
-				return RD_KAFKA_CONF_INVALID;
-		} else
-			*plist = NULL;
-
-                return RD_KAFKA_CONF_OK;
-        }
-
-        case _RK_C_INTERNAL:
-                /* Probably handled by setter */
-                return RD_KAFKA_CONF_OK;
-
-	default:
-		rd_kafka_assert(NULL, !*"unknown conf type");
-	}
-
-	/* unreachable */
-	return RD_KAFKA_CONF_INVALID;
-}
-
-
-/**
- * @brief Find s2i (string-to-int mapping) entry and return its array index,
- *        or -1 on miss.
- */
-static int rd_kafka_conf_s2i_find (const struct rd_kafka_property *prop,
-				   const char *value) {
-	int j;
-
-	for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
-		if (prop->s2i[j].str &&
-		    !rd_strcasecmp(prop->s2i[j].str, value))
-			return j;
-	}
-
-	return -1;
-}
-
-
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_set_prop (int scope, void *conf,
-			   const struct rd_kafka_property *prop,
-			   const char *value,
-			   char *errstr, size_t errstr_size) {
-	int ival;
-
-	switch (prop->type)
-	{
-	case _RK_C_STR:
-        case _RK_C_KSTR:
-		if (prop->s2i[0].str) {
-			int match;
-
-			if (!value ||
-			    (match = rd_kafka_conf_s2i_find(prop, value)) == -1){
-				rd_snprintf(errstr, errstr_size,
-					    "Invalid value for "
-					    "configuration property \"%s\": "
-					    "%s",
-					    prop->name, value);
-				return RD_KAFKA_CONF_INVALID;
-			}
-
-			/* Replace value string with canonical form */
-			value = prop->s2i[match].str;
-		}
-		/* FALLTHRU */
-        case _RK_C_PATLIST:
-		if (prop->validate &&
-		    (!value || !prop->validate(prop, value, -1))) {
-			rd_snprintf(errstr, errstr_size,
-				    "Invalid value for "
-				    "configuration property \"%s\": %s",
-				    prop->name, value);
-			return RD_KAFKA_CONF_INVALID;
-		}
-
-		return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
-						  _RK_CONF_PROP_SET_REPLACE,
-                                                  errstr, errstr_size);
-
-	case _RK_C_PTR:
-		rd_snprintf(errstr, errstr_size,
-			 "Property \"%s\" must be set through dedicated "
-			 ".._set_..() function", prop->name);
-		return RD_KAFKA_CONF_INVALID;
-
-	case _RK_C_BOOL:
-		if (!value) {
-			rd_snprintf(errstr, errstr_size,
-				 "Bool configuration property \"%s\" cannot "
-				 "be set to empty value", prop->name);
-			return RD_KAFKA_CONF_INVALID;
-		}
-
-
-		if (!rd_strcasecmp(value, "true") ||
-		    !rd_strcasecmp(value, "t") ||
-		    !strcmp(value, "1"))
-			ival = 1;
-		else if (!rd_strcasecmp(value, "false") ||
-			 !rd_strcasecmp(value, "f") ||
-			 !strcmp(value, "0"))
-			ival = 0;
-		else {
-			rd_snprintf(errstr, errstr_size,
-				 "Expected bool value for \"%s\": "
-				 "true or false", prop->name);
-			return RD_KAFKA_CONF_INVALID;
-		}
-
-		rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
-					   _RK_CONF_PROP_SET_REPLACE,
-                                           errstr, errstr_size);
-		return RD_KAFKA_CONF_OK;
-
-	case _RK_C_INT:
-	{
-		const char *end;
-
-		if (!value) {
-			rd_snprintf(errstr, errstr_size,
-				 "Integer configuration "
-				 "property \"%s\" cannot be set "
-				 "to empty value", prop->name);
-			return RD_KAFKA_CONF_INVALID;
-		}
-
-		ival = (int)strtol(value, (char **)&end, 0);
-		if (end == value) {
-			/* Non numeric, check s2i for string mapping */
-			int match = rd_kafka_conf_s2i_find(prop, value);
-
-			if (match == -1) {
-				rd_snprintf(errstr, errstr_size,
-					    "Invalid value for "
-					    "configuration property \"%s\"",
-					    prop->name);
-				return RD_KAFKA_CONF_INVALID;
-			}
-
-			ival = prop->s2i[match].val;
-		}
-
-		if (ival < prop->vmin ||
-		    ival > prop->vmax) {
-			rd_snprintf(errstr, errstr_size,
-				 "Configuration property \"%s\" value "
-				 "%i is outside allowed range %i..%i\n",
-				 prop->name, ival,
-				 prop->vmin,
-				 prop->vmax);
-			return RD_KAFKA_CONF_INVALID;
-		}
-
-		rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
-					   _RK_CONF_PROP_SET_REPLACE,
-                                           errstr, errstr_size);
-		return RD_KAFKA_CONF_OK;
-	}
-
-	case _RK_C_S2I:
-	case _RK_C_S2F:
-	{
-		int j;
-		const char *next;
-
-		if (!value) {
-			rd_snprintf(errstr, errstr_size,
-				 "Configuration "
-				 "property \"%s\" cannot be set "
-				 "to empty value", prop->name);
-			return RD_KAFKA_CONF_INVALID;
-		}
-
-		next = value;
-		while (next && *next) {
-			const char *s, *t;
-			rd_kafka_conf_set_mode_t set_mode = _RK_CONF_PROP_SET_ADD; /* S2F */
-
-			s = next;
-
-			if (prop->type == _RK_C_S2F &&
-			    (t = strchr(s, ','))) {
-				/* CSV flag field */
-				next = t+1;
-			} else {
-				/* Single string */
-				t = s+strlen(s);
-				next = NULL;
-			}
-
-
-			/* Left trim */
-			while (s < t && isspace((int)*s))
-				s++;
-
-			/* Right trim */
-			while (t > s && isspace((int)*t))
-				t--;
-
-			/* S2F: +/- prefix */
-			if (prop->type == _RK_C_S2F) {
-				if (*s == '+') {
-					set_mode = _RK_CONF_PROP_SET_ADD;
-					s++;
-				} else if (*s == '-') {
-					set_mode = _RK_CONF_PROP_SET_DEL;
-					s++;
-				}
-			}
-
-			/* Empty string? */
-			if (s == t)
-				continue;
-
-			/* Match string to s2i table entry */
-			for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
-				int new_val;
-
-				if (!prop->s2i[j].str)
-					continue;
-
-				if (strlen(prop->s2i[j].str) == (size_t)(t-s) &&
-					 !rd_strncasecmp(prop->s2i[j].str, s,
-							 (int)(t-s)))
-					new_val = prop->s2i[j].val;
-				else
-					continue;
-
-				rd_kafka_anyconf_set_prop0(scope, conf, prop,
-                                                           value, new_val,
-                                                           set_mode,
-                                                           errstr, errstr_size);
-
-				if (prop->type == _RK_C_S2F) {
-					/* Flags: OR it in: do next */
-					break;
-				} else {
-					/* Single assignment */
-					return RD_KAFKA_CONF_OK;
-				}
-			}
-
-			/* S2F: Good match: continue with next */
-			if (j < (int)RD_ARRAYSIZE(prop->s2i))
-				continue;
-
-			/* No match */
-			rd_snprintf(errstr, errstr_size,
-				 "Invalid value for "
-				 "configuration property \"%s\"", prop->name);
-			return RD_KAFKA_CONF_INVALID;
-
-		}
-		return RD_KAFKA_CONF_OK;
-	}
-
-        case _RK_C_INTERNAL:
-                rd_snprintf(errstr, errstr_size,
-                            "Internal property \"%s\" not settable",
-                            prop->name);
-                return RD_KAFKA_CONF_INVALID;
-
-        case _RK_C_INVALID:
-                rd_snprintf(errstr, errstr_size, "%s", prop->desc);
-                return RD_KAFKA_CONF_INVALID;
-
-	default:
-                rd_kafka_assert(NULL, !*"unknown conf type");
-	}
-
-	/* not reachable */
-	return RD_KAFKA_CONF_INVALID;
-}
-
-
-
-static void rd_kafka_defaultconf_set (int scope, void *conf) {
-	const struct rd_kafka_property *prop;
-
-	for (prop = rd_kafka_properties ; prop->name ; prop++) {
-		if (!(prop->scope & scope))
-			continue;
-
-		if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
-			continue;
-
-                if (prop->ctor)
-                        prop->ctor(scope, conf);
-
-		if (prop->sdef || prop->vdef || prop->pdef)
-			rd_kafka_anyconf_set_prop0(scope, conf, prop,
-						   prop->sdef ?
-                                                   prop->sdef : prop->pdef,
-                                                   prop->vdef,
-                                                   _RK_CONF_PROP_SET_REPLACE,
-                                                   NULL, 0);
-	}
-}
-
-rd_kafka_conf_t *rd_kafka_conf_new (void) {
-	rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
-	rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
-	return conf;
-}
-
-rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) {
-	rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
-	rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
-	return tconf;
-}
-
-
-
-static int rd_kafka_anyconf_set (int scope, void *conf,
-				 const char *name, const char *value,
-				 char *errstr, size_t errstr_size) {
-	char estmp[1];
-	const struct rd_kafka_property *prop;
-        rd_kafka_conf_res_t res;
-
-	if (!errstr) {
-		errstr = estmp;
-		errstr_size = 0;
-	}
-
-	if (value && !*value)
-		value = NULL;
-
-        /* Try interceptors first (only for GLOBAL config for now) */
-        if (scope & _RK_GLOBAL) {
-                res = rd_kafka_interceptors_on_conf_set(
-                        (rd_kafka_conf_t *)conf, name, value,
-                        errstr, errstr_size);
-                /* Handled (successfully or not) by interceptor. */
-                if (res != RD_KAFKA_CONF_UNKNOWN)
-                        return res;
-        }
-
-        /* Then global config */
-
-
-	for (prop = rd_kafka_properties ; prop->name ; prop++) {
-
-		if (!(prop->scope & scope))
-			continue;
-
-		if (strcmp(prop->name, name))
-			continue;
-
-		if (prop->type == _RK_C_ALIAS)
-			return rd_kafka_anyconf_set(scope, conf,
-						    prop->sdef, value,
-						    errstr, errstr_size);
-
-		return rd_kafka_anyconf_set_prop(scope, conf, prop, value,
-						 errstr, errstr_size);
-	}
-
-	rd_snprintf(errstr, errstr_size,
-		 "No such configuration property: \"%s\"", name);
-
-	return RD_KAFKA_CONF_UNKNOWN;
-}
-
-
-rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
-                                       const char *name,
-                                       const char *value,
-                                       char *errstr, size_t errstr_size) {
-        rd_kafka_conf_res_t res;
-
-        res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value,
-                                   errstr, errstr_size);
-        if (res != RD_KAFKA_CONF_UNKNOWN)
-                return res;
-
-        /* Fallthru:
-         * If the global property was unknown, try setting it on the
-         * default topic config. */
-        if (!conf->topic_conf) {
-                /* Create topic config, might be over-written by application
-                 * later. */
-                conf->topic_conf = rd_kafka_topic_conf_new();
-        }
-
-        return rd_kafka_topic_conf_set(conf->topic_conf, name, value,
-                                       errstr, errstr_size);
-}
-
-
-rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
-					     const char *name,
-					     const char *value,
-					     char *errstr, size_t errstr_size) {
-	if (!strncmp(name, "topic.", strlen("topic.")))
-		name += strlen("topic.");
-
-	return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value,
-				    errstr, errstr_size);
-}
-
-
-static void rd_kafka_anyconf_clear (int scope, void *conf,
-				    const struct rd_kafka_property *prop) {
-	switch (prop->type)
-	{
-	case _RK_C_STR:
-	{
-		char **str = _RK_PTR(char **, conf, prop->offset);
-
-		if (*str) {
-                        if (prop->set) {
-                                prop->set(scope, conf, prop->name, NULL, *str,
-                                          _RK_CONF_PROP_SET_DEL, NULL, 0);
-                                /* FALLTHRU */
-                        }
-                        rd_free(*str);
-			*str = NULL;
-		}
-	}
-	break;
-
-        case _RK_C_KSTR:
-        {
-                rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
-                                                 prop->offset);
-                if (*kstr) {
-                        rd_kafkap_str_destroy(*kstr);
-                        *kstr = NULL;
-                }
-        }
-        break;
-
-        case _RK_C_PATLIST:
-        {
-                rd_kafka_pattern_list_t **plist;
-                plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
-		if (*plist) {
-			rd_kafka_pattern_list_destroy(*plist);
-			*plist = NULL;
-		}
-        }
-        break;
-
-        case _RK_C_PTR:
-                if (_RK_PTR(void *, conf, prop->offset) != NULL) {
-                        if (!strcmp(prop->name, "default_topic_conf")) {
-                                rd_kafka_topic_conf_t **tconf;
-
-                                tconf = _RK_PTR(rd_kafka_topic_conf_t **,
-                                                conf, prop->offset);
-                                if (*tconf) {
-                                        rd_kafka_topic_conf_destroy(*tconf);
-                                        *tconf = NULL;
-                                }
-                        }
-                }
-                break;
-
-	default:
-		break;
-	}
-
-        if (prop->dtor)
-                prop->dtor(scope, conf);
-
-}
-
-void rd_kafka_anyconf_destroy (int scope, void *conf) {
-	const struct rd_kafka_property *prop;
-
-        /* Call on_conf_destroy() interceptors */
-        if (scope == _RK_GLOBAL)
-                rd_kafka_interceptors_on_conf_destroy(conf);
-
-	for (prop = rd_kafka_properties; prop->name ; prop++) {
-		if (!(prop->scope & scope))
-			continue;
-
-		rd_kafka_anyconf_clear(scope, conf, prop);
-	}
-}
-
-
-void rd_kafka_conf_destroy (rd_kafka_conf_t *conf) {
-	rd_kafka_anyconf_destroy(_RK_GLOBAL, conf);
-        //FIXME: partition_assignors
-	rd_free(conf);
-}
-
-void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf) {
-	rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf);
-	rd_free(topic_conf);
-}
-
-
-
-static void rd_kafka_anyconf_copy (int scope, void *dst, const void *src,
-                                   size_t filter_cnt, const char **filter) {
-	const struct rd_kafka_property *prop;
-
-	for (prop = rd_kafka_properties ; prop->name ; prop++) {
-		const char *val = NULL;
-		int ival = 0;
-                char *valstr;
-                size_t valsz;
-                size_t fi;
-                size_t nlen;
-
-		if (!(prop->scope & scope))
-			continue;
-
-		if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
-			continue;
-
-                /* Apply filter, if any. */
-                nlen = strlen(prop->name);
-                for (fi = 0 ; fi < filter_cnt ; fi++) {
-                        size_t flen = strlen(filter[fi]);
-                        if (nlen >= flen &&
-                            !strncmp(filter[fi], prop->name, flen))
-                                break;
-                }
-                if (fi < filter_cnt)
-                        continue; /* Filter matched */
-
-		switch (prop->type)
-		{
-		case _RK_C_STR:
-		case _RK_C_PTR:
-			val = *_RK_PTR(const char **, src, prop->offset);
-
-                        if (!strcmp(prop->name, "default_topic_conf") && val)
-                                val = (void *)rd_kafka_topic_conf_dup(
-                                        (const rd_kafka_topic_conf_t *)
-                                        (void *)val);
-			break;
-                case _RK_C_KSTR:
-                {
-                        rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **,
-                                                         src, prop->offset);
-                        if (*kstr)
-                                val = (*kstr)->str;
-                        break;
-                }
-
-		case _RK_C_BOOL:
-		case _RK_C_INT:
-		case _RK_C_S2I:
-		case _RK_C_S2F:
-			ival = *_RK_PTR(const int *, src, prop->offset);
-
-                        /* Get string representation of configuration value. */
-                        valsz = 0;
-                        rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
-                        valstr = rd_alloca(valsz);
-                        rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
-                        val = valstr;
-			break;
-                case _RK_C_PATLIST:
-                {
-                        const rd_kafka_pattern_list_t **plist;
-                        plist = _RK_PTR(const rd_kafka_pattern_list_t **,
-                                        src, prop->offset);
-			if (*plist)
-				val = (*plist)->rkpl_orig;
-                        break;
-                }
-                case _RK_C_INTERNAL:
-                        /* Handled by ->copy() below. */
-                        break;
-		default:
-			continue;
-		}
-
-                if (prop->copy)
-                        prop->copy(scope, dst, src,
-                                   _RK_PTR(void *, dst, prop->offset),
-                                   _RK_PTR(const void *, src, prop->offset),
-                                   filter_cnt, filter);
-
-                rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival,
-                                           _RK_CONF_PROP_SET_REPLACE, NULL, 0);
-	}
-}
-
-
-rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf) {
-	rd_kafka_conf_t *new = rd_kafka_conf_new();
-
-        rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL);
-
-        rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL);
-
-	return new;
-}
-
-rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf,
-                                           size_t filter_cnt,
-                                           const char **filter) {
-	rd_kafka_conf_t *new = rd_kafka_conf_new();
-
-        rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter);
-
-        rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter);
-
-	return new;
-}
-
-
-rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t
-						*conf) {
-	rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new();
-
-	rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL);
-
-	return new;
-}
-
-
-void rd_kafka_conf_set_events (rd_kafka_conf_t *conf, int events) {
-	conf->enabled_events = events;
-}
-
-
-void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf,
-			      void (*dr_cb) (rd_kafka_t *rk,
-					     void *payload, size_t len,
-					     rd_kafka_resp_err_t err,
-					     void *opaque, void *msg_opaque)) {
-	conf->dr_cb = dr_cb;
-}
-
-
-void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,
-                                  void (*dr_msg_cb) (rd_kafka_t *rk,
-                                                     const rd_kafka_message_t *
-                                                     rkmessage,
-                                                     void *opaque)) {
-        conf->dr_msg_cb = dr_msg_cb;
-}
-
-
-void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
-                                   void (*consume_cb) (rd_kafka_message_t *
-                                                       rkmessage,
-                                                       void *opaque)) {
-        conf->consume_cb = consume_cb;
-}
-
-void rd_kafka_conf_set_rebalance_cb (
-        rd_kafka_conf_t *conf,
-        void (*rebalance_cb) (rd_kafka_t *rk,
-                              rd_kafka_resp_err_t err,
-                              rd_kafka_topic_partition_list_t *partitions,
-                              void *opaque)) {
-        conf->rebalance_cb = rebalance_cb;
-}
-
-void rd_kafka_conf_set_offset_commit_cb (
-        rd_kafka_conf_t *conf,
-        void (*offset_commit_cb) (rd_kafka_t *rk,
-                                  rd_kafka_resp_err_t err,
-                                  rd_kafka_topic_partition_list_t *offsets,
-                                  void *opaque)) {
-        conf->offset_commit_cb = offset_commit_cb;
-}
-
-
-
-void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf,
-				 void  (*error_cb) (rd_kafka_t *rk, int err,
-						    const char *reason,
-						    void *opaque)) {
-	conf->error_cb = error_cb;
-}
-
-
-void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
-				    void (*throttle_cb) (
-					    rd_kafka_t *rk,
-					    const char *broker_name,
-					    int32_t broker_id,
-					    int throttle_time_ms,
-					    void *opaque)) {
-	conf->throttle_cb = throttle_cb;
-}
-
-
-void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf,
-			  void (*log_cb) (const rd_kafka_t *rk, int level,
-                                          const char *fac, const char *buf)) {
-	conf->log_cb = log_cb;
-}
-
-
-void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf,
-				 int (*stats_cb) (rd_kafka_t *rk,
-						  char *json,
-						  size_t json_len,
-						  void *opaque)) {
-	conf->stats_cb = stats_cb;
-}
-
-void rd_kafka_conf_set_socket_cb (rd_kafka_conf_t *conf,
-                                  int (*socket_cb) (int domain, int type,
-                                                    int protocol,
-                                                    void *opaque)) {
-        conf->socket_cb = socket_cb;
-}
-
-void
-rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
-                              int (*connect_cb) (int sockfd,
-                                                 const struct sockaddr *addr,
-                                                 int addrlen,
-                                                 const char *id,
-                                                 void *opaque)) {
-        conf->connect_cb = connect_cb;
-}
-
-void
-rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
-                                  int (*closesocket_cb) (int sockfd,
-                                                         void *opaque)) {
-        conf->closesocket_cb = closesocket_cb;
-}
-
-
-
-#ifndef _MSC_VER
-void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
-                                int (*open_cb) (const char *pathname,
-                                                int flags, mode_t mode,
-                                                void *opaque)) {
-        conf->open_cb = open_cb;
-}
-#endif
-
-void rd_kafka_conf_set_opaque (rd_kafka_conf_t *conf, void *opaque) {
-	conf->opaque = opaque;
-}
-
-
-void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
-                                           rd_kafka_topic_conf_t *tconf) {
-        if (conf->topic_conf)
-                rd_kafka_topic_conf_destroy(conf->topic_conf);
-
-        conf->topic_conf = tconf;
-}
-
-
-void
-rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
-					int32_t (*partitioner) (
-						const rd_kafka_topic_t *rkt,
-						const void *keydata,
-						size_t keylen,
-						int32_t partition_cnt,
-						void *rkt_opaque,
-						void *msg_opaque)) {
-	topic_conf->partitioner = partitioner;
-}
-
-void rd_kafka_topic_conf_set_opaque (rd_kafka_topic_conf_t *topic_conf,
-				     void *opaque) {
-	topic_conf->opaque = opaque;
-}
-
-
-
-
-/**
- * @brief Convert flags \p ival to csv-string using S2F property \p prop.
- *
- * This function has two modes: size query and write.
- * To query for needed size call with dest==NULL,
- * to write to buffer of size dest_size call with dest!=NULL.
- *
- * An \p ival of -1 means all.
- *
- * @returns the number of bytes written to \p dest (if not NULL), else the
- *          total number of bytes needed.
- *
- */
-size_t rd_kafka_conf_flags2str (char *dest, size_t dest_size, const char *delim,
-				const struct rd_kafka_property *prop,
-				int ival) {
-	size_t of = 0;
-	int j;
-
-	if (dest && dest_size > 0)
-		*dest = '\0';
-
-	/* Phase 1: scan for set flags, accumulate needed size.
-	 * Phase 2: write to dest */
-	for (j = 0 ; prop->s2i[j].str ; j++) {
-		if (prop->type == _RK_C_S2F && ival != -1 &&
-		    (ival & prop->s2i[j].val) != prop->s2i[j].val)
-			continue;
-		else if (prop->type == _RK_C_S2I &&
-			   ival != -1 && prop->s2i[j].val != ival)
-			continue;
-
-		if (!dest)
-			of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0);
-		else {
-			size_t r;
-			r = rd_snprintf(dest+of, dest_size-of,
-					"%s%s",
-					of > 0 ? delim:"",
-					prop->s2i[j].str);
-			if (r > dest_size-of) {
-				r = dest_size-of;
-				break;
-			}
-			of += r;
-		}
-	}
-
-	return of+1/*nul*/;
-}
-
-
-/**
- * Return "original"(re-created) configuration value string
- */
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
-                       char *dest, size_t *dest_size) {
-        char tmp[22];
-        const char *val = NULL;
-        size_t val_len = 0;
-        int j;
-
-        switch (prop->type)
-        {
-        case _RK_C_STR:
-                val = *_RK_PTR(const char **, conf, prop->offset);
-                break;
-
-        case _RK_C_KSTR:
-        {
-                const rd_kafkap_str_t **kstr = _RK_PTR(const rd_kafkap_str_t **,
-                                                       conf, prop->offset);
-                if (*kstr)
-                        val = (*kstr)->str;
-                break;
-        }
-
-        case _RK_C_PTR:
-                val = *_RK_PTR(const void **, conf, prop->offset);
-                if (val) {
-                        rd_snprintf(tmp, sizeof(tmp), "%p", (void *)val);
-                        val = tmp;
-                }
-                break;
-
-        case _RK_C_BOOL:
-                val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false");
-                break;
-
-        case _RK_C_INT:
-                rd_snprintf(tmp, sizeof(tmp), "%i",
-                            *_RK_PTR(int *, conf, prop->offset));
-                val = tmp;
-                break;
-
-        case _RK_C_S2I:
-                for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
-                        if (prop->s2i[j].val ==
-                            *_RK_PTR(int *, conf, prop->offset)) {
-                                val = prop->s2i[j].str;
-                                break;
-                        }
-                }
-                break;
-
-        case _RK_C_S2F:
-        {
-                const int ival = *_RK_PTR(const int *, conf, prop->offset);
-
-		val_len = rd_kafka_conf_flags2str(dest, *dest_size, ",",
-						  prop, ival);
-		if (dest) {
-			val_len = 0;
-			val = dest;
-			dest = NULL;
-		}
-		break;
-	}
-
-        case _RK_C_PATLIST:
-        {
-                const rd_kafka_pattern_list_t **plist;
-                plist = _RK_PTR(const rd_kafka_pattern_list_t **,
-                                conf, prop->offset);
-		if (*plist)
-			val = (*plist)->rkpl_orig;
-                break;
-        }
-
-        default:
-                break;
-        }
-
-        if (val_len) {
-                *dest_size = val_len+1;
-                return RD_KAFKA_CONF_OK;
-        }
-
-        if (!val)
-                return RD_KAFKA_CONF_INVALID;
-
-        val_len = strlen(val);
-
-        if (dest) {
-                size_t use_len = RD_MIN(val_len, (*dest_size)-1);
-                memcpy(dest, val, use_len);
-                dest[use_len] = '\0';
-        }
-
-        /* Return needed size */
-        *dest_size = val_len+1;
-
-        return RD_KAFKA_CONF_OK;
-}
-
-
-static rd_kafka_conf_res_t rd_kafka_anyconf_get (int scope, const void *conf,
-                                                 const char *name,
-                                                 char *dest, size_t *dest_size){
-	const struct rd_kafka_property *prop;
-
-	for (prop = rd_kafka_properties; prop->name ; prop++) {
-
-		if (!(prop->scope & scope) || strcmp(prop->name, name))
-			continue;
-
-		if (prop->type == _RK_C_ALIAS)
-			return rd_kafka_anyconf_get(scope, conf,
-						    prop->sdef,
-						    dest, dest_size);
-
-                if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) ==
-                    RD_KAFKA_CONF_OK)
-                        return RD_KAFKA_CONF_OK;
-        }
-
-        return RD_KAFKA_CONF_UNKNOWN;
-}
-
-rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
-                                             const char *name,
-                                             char *dest, size_t *dest_size) {
-        return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size);
-}
-
-rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
-                                       const char *name,
-                                       char *dest, size_t *dest_size) {
-        rd_kafka_conf_res_t res;
-        res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size);
-        if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf)
-                return res;
-
-        /* Fallthru:
-         * If the global property was unknown, try getting it from the
-         * default topic config, if any. */
-        return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size);
-}
-
-
-static const char **rd_kafka_anyconf_dump (int scope, const void *conf,
-					   size_t *cntp) {
-	const struct rd_kafka_property *prop;
-	char **arr;
-	int cnt = 0;
-
-	arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties)*2);
-
-	for (prop = rd_kafka_properties; prop->name ; prop++) {
-                char *val = NULL;
-                size_t val_size;
-
-		if (!(prop->scope & scope))
-			continue;
-
-		/* Skip aliases, show original property instead.
-                 * Skip invalids. */
-		if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
-			continue;
-
-                /* Query value size */
-                if (rd_kafka_anyconf_get0(conf, prop, NULL, &val_size) !=
-                    RD_KAFKA_CONF_OK)
-                        continue;
-
-                /* Get value */
-                val = malloc(val_size);
-                rd_kafka_anyconf_get0(conf, prop, val, &val_size);
-
-                arr[cnt++] = rd_strdup(prop->name);
-                arr[cnt++] = val;
-	}
-
-	*cntp = cnt;
-
-	return (const char **)arr;
-}
-
-
-const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp) {
-	return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp);
-}
-
-const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf,
-				       size_t *cntp) {
-	return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp);
-}
-
-void rd_kafka_conf_dump_free (const char **arr, size_t cnt) {
-	char **_arr = (char **)arr;
-	unsigned int i;
-
-	for (i = 0 ; i < cnt ; i++)
-		if (_arr[i])
-			rd_free(_arr[i]);
-
-	rd_free(_arr);
-}
-
-void rd_kafka_conf_properties_show (FILE *fp) {
-	const struct rd_kafka_property *prop;
-	int last = 0;
-	int j;
-	char tmp[512];
-	const char *dash80 = "----------------------------------------"
-		"----------------------------------------";
-
-	for (prop = rd_kafka_properties; prop->name ; prop++) {
-		const char *typeinfo = "";
-
-                /* Skip invalid properties. */
-                if (prop->type == _RK_C_INVALID)
-                        continue;
-
-		if (!(prop->scope & last)) {
-			fprintf(fp,
-				"%s## %s configuration properties\n\n",
-				last ? "\n\n":"",
-				prop->scope == _RK_GLOBAL ? "Global": "Topic");
-
-			fprintf(fp,
-				"%-40s | %3s | %-15s | %13s | %-25s\n"
-				"%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s\n",
-				"Property", "C/P", "Range",
-				"Default", "Description",
-				40, dash80, 3, dash80, 15, dash80,
-				13, dash80, 25, dash80);
-
-			last = prop->scope & (_RK_GLOBAL|_RK_TOPIC);
-
-		}
-
-		fprintf(fp, "%-40s | %3s | ", prop->name,
-                        (!(prop->scope & _RK_PRODUCER) ==
-                         !(prop->scope & _RK_CONSUMER) ? " * " :
-                         ((prop->scope & _RK_PRODUCER) ? " P " :
-                          (prop->scope & _RK_CONSUMER) ? " C " : "")));
-
-		switch (prop->type)
-		{
-		case _RK_C_STR:
-                case _RK_C_KSTR:
-			typeinfo = "string";
-                case _RK_C_PATLIST:
-			if (prop->type == _RK_C_PATLIST)
-				typeinfo = "pattern list";
-			if (prop->s2i[0].str) {
-				rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
-							prop, -1);
-				fprintf(fp, "%-15s | %13s",
-					tmp, prop->sdef ? prop->sdef : "");
-			} else {
-				fprintf(fp, "%-15s | %13s",
-					"", prop->sdef ? prop->sdef : "");
-			}
-			break;
-		case _RK_C_BOOL:
-			typeinfo = "boolean";
-			fprintf(fp, "%-15s | %13s", "true, false",
-				prop->vdef ? "true" : "false");
-			break;
-		case _RK_C_INT:
-			typeinfo = "integer";
-			rd_snprintf(tmp, sizeof(tmp),
-				    "%d .. %d", prop->vmin, prop->vmax);
-			fprintf(fp, "%-15s | %13i", tmp, prop->vdef);
-			break;
-		case _RK_C_S2I:
-			typeinfo = "enum value";
-			rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
-						prop, -1);
-			fprintf(fp, "%-15s | ", tmp);
-
-			for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
-				if (prop->s2i[j].val == prop->vdef) {
-					fprintf(fp, "%13s", prop->s2i[j].str);
-					break;
-				}
-			}
-			if (j == RD_ARRAYSIZE(prop->s2i))
-				fprintf(fp, "%13s", " ");
-			break;
-
-		case _RK_C_S2F:
-			typeinfo = "CSV flags";
-			/* Dont duplicate builtin.features value in
-			 * both Range and Default */
-			if (!strcmp(prop->name, "builtin.features"))
-				*tmp = '\0';
-			else
-				rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
-							prop, -1);
-			fprintf(fp, "%-15s | ", tmp);
-			rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
-						prop, prop->vdef);
-			fprintf(fp, "%13s", tmp);
-
-			break;
-		case _RK_C_PTR:
-			typeinfo = "pointer";
-			/* FALLTHRU */
-		default:
-			fprintf(fp, "%-15s | %-13s", "", " ");
-			break;
-		}
-
-		if (prop->type == _RK_C_ALIAS)
-			fprintf(fp, " | Alias for `%s`\n", prop->sdef);
-		else
-			fprintf(fp, " | %s <br>*Type: %s*\n", prop->desc,
-				typeinfo);
-	}
-	fprintf(fp, "\n");
-        fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n");
-}