You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:55 UTC

[25/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka.c b/thirdparty/librdkafka-0.11.1/src/rdkafka.c
new file mode 100644
index 0000000..6867a6c
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka.c
@@ -0,0 +1,3392 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2013, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#define _GNU_SOURCE
+#include <errno.h>
+#include <string.h>
+#include <stdarg.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+
+#include "rdkafka_int.h"
+#include "rdkafka_msg.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_assignor.h"
+#include "rdkafka_request.h"
+#include "rdkafka_event.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_interceptor.h"
+
+#include "rdtime.h"
+#include "crc32c.h"
+#include "rdunittest.h"
+
+#ifdef _MSC_VER
+#include <sys/types.h>
+#include <sys/timeb.h>
+#endif
+
+
+
+static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
+
+/**
+ * @brief Global counter+lock for all active librdkafka instances
+ */
+mtx_t rd_kafka_global_lock;
+int rd_kafka_global_cnt;
+
+
+/**
+ * Last API error code, per thread.
+ * Shared among all rd_kafka_t instances.
+ */
+rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
+
+
+/**
+ * Current number of threads created by rdkafka.
+ * This is used in regression tests.
+ */
+rd_atomic32_t rd_kafka_thread_cnt_curr;
+int rd_kafka_thread_cnt (void) {
+#if ENABLE_SHAREDPTR_DEBUG
+        rd_shared_ptrs_dump();
+#endif
+
+	return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
+}
+
+/**
+ * Current thread's name (TLS)
+ */
+char RD_TLS rd_kafka_thread_name[64] = "app";
+
+
+
+static void rd_kafka_global_init (void) {
+#if ENABLE_SHAREDPTR_DEBUG
+        LIST_INIT(&rd_shared_ptr_debug_list);
+        mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain);
+        atexit(rd_shared_ptrs_dump);
+#endif
+	mtx_init(&rd_kafka_global_lock, mtx_plain);
+#if ENABLE_DEVEL
+	rd_atomic32_init(&rd_kafka_op_cnt, 0);
+#endif
+        crc32c_global_init();
+}
+
+/**
+ * @returns the current number of active librdkafka instances
+ */
+static int rd_kafka_global_cnt_get (void) {
+	int r;
+	mtx_lock(&rd_kafka_global_lock);
+	r = rd_kafka_global_cnt;
+	mtx_unlock(&rd_kafka_global_lock);
+	return r;
+}
+
+
+/**
+ * @brief Increase counter for active librdkafka instances.
+ * If this is the first instance the global constructors will be called, if any.
+ */
+static void rd_kafka_global_cnt_incr (void) {
+	mtx_lock(&rd_kafka_global_lock);
+	rd_kafka_global_cnt++;
+	if (rd_kafka_global_cnt == 1) {
+		rd_kafka_transport_init();
+#if WITH_SSL
+		rd_kafka_transport_ssl_init();
+#endif
+                rd_kafka_sasl_global_init();
+	}
+	mtx_unlock(&rd_kafka_global_lock);
+}
+
+/**
+ * @brief Decrease counter for active librdkafka instances.
+ * If this counter reaches 0 the global destructors will be called, if any.
+ */
+static void rd_kafka_global_cnt_decr (void) {
+	mtx_lock(&rd_kafka_global_lock);
+	rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
+	rd_kafka_global_cnt--;
+	if (rd_kafka_global_cnt == 0) {
+                rd_kafka_sasl_global_term();
+#if WITH_SSL
+		rd_kafka_transport_ssl_term();
+#endif
+	}
+	mtx_unlock(&rd_kafka_global_lock);
+}
+
+
+/**
+ * Wait for all rd_kafka_t objects to be destroyed.
+ * Returns 0 if all kafka objects are now destroyed, or -1 if the
+ * timeout was reached.
+ */
+int rd_kafka_wait_destroyed (int timeout_ms) {
+	rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
+
+	while (rd_kafka_thread_cnt() > 0 ||
+	       rd_kafka_global_cnt_get() > 0) {
+		if (rd_clock() >= timeout) {
+			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
+						ETIMEDOUT);
+#if ENABLE_SHAREDPTR_DEBUG
+                        rd_shared_ptrs_dump();
+#endif
+			return -1;
+		}
+		rd_usleep(25000, NULL); /* 25ms */
+	}
+
+	return 0;
+}
+
+static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
+                              const rd_kafka_t *rk, int level, const char *fac,
+                              const char *buf) {
+        if (level > conf->log_level)
+                return;
+        else if (rk && conf->log_queue) {
+                rd_kafka_op_t *rko;
+
+                if (!rk->rk_logq)
+                        return; /* Terminating */
+
+                rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
+                rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
+                rko->rko_u.log.level = level;
+                strncpy(rko->rko_u.log.fac, fac,
+                        sizeof(rko->rko_u.log.fac) - 1);
+                rko->rko_u.log.str = rd_strdup(buf);
+                rd_kafka_q_enq(rk->rk_logq, rko);
+
+        } else if (conf->log_cb) {
+                conf->log_cb(rk, level, fac, buf);
+        }
+}
+
+/**
+ * @brief Logger
+ *
+ * @remark conf must be set, but rk may be NULL
+ */
+void rd_kafka_log0 (const rd_kafka_conf_t *conf,
+                    const rd_kafka_t *rk,
+                    const char *extra, int level,
+                    const char *fac, const char *fmt, ...) {
+	char buf[2048];
+	va_list ap;
+	unsigned int elen = 0;
+        unsigned int of = 0;
+
+	if (level > conf->log_level)
+		return;
+
+	if (conf->log_thread_name) {
+		elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
+				   rd_kafka_thread_name);
+		if (unlikely(elen >= sizeof(buf)))
+			elen = sizeof(buf);
+		of = elen;
+	}
+
+	if (extra) {
+		elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
+		if (unlikely(elen >= sizeof(buf)-of))
+			elen = sizeof(buf)-of;
+                of += elen;
+	}
+
+	va_start(ap, fmt);
+	rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
+	va_end(ap);
+
+        rd_kafka_log_buf(conf, rk, level, fac, buf);
+}
+
+
+
+void rd_kafka_log_print(const rd_kafka_t *rk, int level,
+	const char *fac, const char *buf) {
+	int secs, msecs;
+	struct timeval tv;
+	rd_gettimeofday(&tv, NULL);
+	secs = (int)tv.tv_sec;
+	msecs = (int)(tv.tv_usec / 1000);
+	fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
+		level, secs, msecs,
+		fac, rk ? rk->rk_name : "", buf);
+}
+
+#ifndef _MSC_VER
+void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
+			  const char *fac, const char *buf) {
+	static int initialized = 0;
+
+	if (!initialized)
+		openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);
+
+	syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
+}
+#endif
+
+void rd_kafka_set_logger (rd_kafka_t *rk,
+			  void (*func) (const rd_kafka_t *rk, int level,
+					const char *fac, const char *buf)) {
+	rk->rk_conf.log_cb = func;
+}
+
+void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
+	rk->rk_conf.log_level = level;
+}
+
+
+
+
+
+
+static const char *rd_kafka_type2str (rd_kafka_type_t type) {
+	static const char *types[] = {
+		[RD_KAFKA_PRODUCER] = "producer",
+		[RD_KAFKA_CONSUMER] = "consumer",
+	};
+	return types[type];
+}
+
+#define _ERR_DESC(ENUM,DESC) \
+	[ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC }
+
+static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
+	_ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
+		  "Local: Bad message format"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
+		  "Local: Invalid compressed data"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
+		  "Local: Broker handle destroyed"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
+		  "Local: Communication failure with broker"), //FIXME: too specific
+	_ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
+		  "Local: Broker transport failure"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
+		  "Local: Critical system resource failure"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
+		  "Local: Host resolution failure"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
+		  "Local: Message timed out"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
+		  "Broker: No more messages"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+		  "Local: Unknown partition"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__FS,
+		  "Local: File or filesystem error"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
+		  "Local: Unknown topic"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
+		  "Local: All broker connections are down"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
+		  "Local: Invalid argument or configuration"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
+		  "Local: Timed out"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
+		  "Local: Queue full"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
+		  "Local: ISR count insufficient"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
+		  "Local: Broker node update"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
+		  "Local: SSL error"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
+		  "Local: Waiting for coordinator"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
+		  "Local: Unknown group"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
+		  "Local: Operation in progress"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
+		  "Local: Previous operation in progress"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
+		  "Local: Existing subscription"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+		  "Local: Assign partitions"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+		  "Local: Revoke partitions"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
+		  "Local: Conflicting use"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
+		  "Local: Erroneous state"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
+		  "Local: Unknown protocol"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
+		  "Local: Not implemented"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
+		  "Local: Authentication failure"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
+		  "Local: No offset stored"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
+		  "Local: Outdated"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
+		  "Local: Timed out in queue"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+                  "Local: Required feature not supported by broker"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
+                  "Local: Awaiting cache update"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
+                  "Local: Operation interrupted"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
+                  "Local: Key serialization error"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
+                  "Local: Value serialization error"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
+                  "Local: Key deserialization error"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
+                  "Local: Value deserialization error"),
+
+	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
+		  "Unknown broker error"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
+		  "Success"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
+		  "Broker: Offset out of range"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
+		  "Broker: Invalid message"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+		  "Broker: Unknown topic or partition"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
+		  "Broker: Invalid message size"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
+		  "Broker: Leader not available"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
+		  "Broker: Not leader for partition"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
+		  "Broker: Request timed out"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
+		  "Broker: Broker not available"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
+		  "Broker: Replica not available"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
+		  "Broker: Message size too large"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
+		  "Broker: StaleControllerEpochCode"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
+		  "Broker: Offset metadata string too large"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
+		  "Broker: Broker disconnected before response received"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
+		  "Broker: Group coordinator load in progress"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
+		  "Broker: Group coordinator not available"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
+		  "Broker: Not coordinator for group"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
+		  "Broker: Invalid topic"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
+		  "Broker: Message batch larger than configured server "
+		  "segment size"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
+		  "Broker: Not enough in-sync replicas"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
+		  "Broker: Message(s) written to insufficient number of "
+		  "in-sync replicas"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
+		  "Broker: Invalid required acks value"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+		  "Broker: Specified group generation id is not valid"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
+		  "Broker: Inconsistent group protocol"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
+		  "Broker: Invalid group.id"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+		  "Broker: Unknown member"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
+		  "Broker: Invalid session timeout"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
+		  "Broker: Group rebalance in progress"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
+		  "Broker: Commit offset data size is not valid"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+		  "Broker: Topic authorization failed"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
+		  "Broker: Group authorization failed"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
+		  "Broker: Cluster authorization failed"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
+		  "Broker: Invalid timestamp"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
+		  "Broker: Unsupported SASL mechanism"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
+		  "Broker: Request not valid in current SASL state"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
+		  "Broker: API version not supported"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
+		  "Broker: Topic already exists"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
+		  "Broker: Invalid number of partitions"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
+		  "Broker: Invalid replication factor"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
+		  "Broker: Invalid replica assignment"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
+		  "Broker: Configuration is invalid"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
+		  "Broker: Not controller for cluster"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
+		  "Broker: Invalid request"),
+	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
+		  "Broker: Message format on broker does not support request"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
+                  "Broker: Isolation policy volation"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
+                  "Broker: Broker received an out of order sequence number"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
+                  "Broker: Broker received a duplicate sequence number"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
+                  "Broker: Producer attempted an operation with an old epoch"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
+                  "Broker: Producer attempted a transactional operation in "
+                  "an invalid state"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
+                  "Broker: Producer attempted to use a producer id which is "
+                  "not currently assigned to its transactional id"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
+                  "Broker: Transaction timeout is larger than the maximum "
+                  "value allowed by the broker's max.transaction.timeout.ms"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
+                  "Broker: Producer attempted to update a transaction while "
+                  "another concurrent operation on the same transaction was "
+                  "ongoing"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
+                  "Broker: Indicates that the transaction coordinator sending "
+                  "a WriteTxnMarker is no longer the current coordinator for "
+                  "a given producer"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
+                  "Broker: Transactional Id authorization failed"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
+                  "Broker: Security features are disabled"),
+        _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
+                  "Broker: Operation not attempted"),
+
+	_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
+};
+
+
+void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
+			     size_t *cntp) {
+	*errdescs = rd_kafka_err_descs;
+	*cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
+}
+
+
+const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
+	static RD_TLS char ret[32];
+	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
+
+	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
+		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
+		     !rd_kafka_err_descs[idx].desc)) {
+		rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
+		return ret;
+	}
+
+	return rd_kafka_err_descs[idx].desc;
+}
+
+
+const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
+	static RD_TLS char ret[32];
+	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
+
+	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
+		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
+		     !rd_kafka_err_descs[idx].desc)) {
+		rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
+		return ret;
+	}
+
+	return rd_kafka_err_descs[idx].name;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_last_error (void) {
+	return rd_kafka_last_error_code;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
+	switch (errnox)
+	{
+	case EINVAL:
+		return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+        case EBUSY:
+                return RD_KAFKA_RESP_ERR__CONFLICT;
+
+	case ENOENT:
+		return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
+
+	case ESRCH:
+		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+
+	case ETIMEDOUT:
+		return RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+	case EMSGSIZE:
+		return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
+
+	case ENOBUFS:
+		return RD_KAFKA_RESP_ERR__QUEUE_FULL;
+
+	default:
+		return RD_KAFKA_RESP_ERR__FAIL;
+	}
+}
+
+
+
+/**
+ * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
+ *
+ * @locality application thread
+ */
+void rd_kafka_destroy_final (rd_kafka_t *rk) {
+
+        rd_kafka_assert(rk, rd_atomic32_get(&rk->rk_terminate) != 0);
+
+        /* Synchronize state */
+        rd_kafka_wrlock(rk);
+        rd_kafka_wrunlock(rk);
+
+        rd_kafka_assignors_term(rk);
+
+        rd_kafka_metadata_cache_destroy(rk);
+
+        rd_kafka_timers_destroy(&rk->rk_timers);
+
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
+
+        /* Destroy cgrp */
+        if (rk->rk_cgrp) {
+                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                             "Destroying cgrp");
+                /* Reset queue forwarding (rep -> cgrp) */
+                rd_kafka_q_fwd_set(rk->rk_rep, NULL);
+                rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
+        }
+
+	/* Purge op-queues */
+	rd_kafka_q_destroy(rk->rk_rep);
+	rd_kafka_q_destroy(rk->rk_ops);
+
+#if WITH_SSL
+	if (rk->rk_conf.ssl.ctx) {
+                rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
+                rd_kafka_transport_ssl_ctx_term(rk);
+        }
+#endif
+
+        /* It is not safe to log after this point. */
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                     "Termination done: freeing resources");
+
+        if (rk->rk_logq) {
+                rd_kafka_q_destroy(rk->rk_logq);
+                rk->rk_logq = NULL;
+        }
+
+        if (rk->rk_type == RD_KAFKA_PRODUCER) {
+		cnd_destroy(&rk->rk_curr_msgs.cnd);
+		mtx_destroy(&rk->rk_curr_msgs.lock);
+	}
+
+	cnd_destroy(&rk->rk_broker_state_change_cnd);
+	mtx_destroy(&rk->rk_broker_state_change_lock);
+
+	if (rk->rk_full_metadata)
+		rd_kafka_metadata_destroy(rk->rk_full_metadata);
+        rd_kafkap_str_destroy(rk->rk_client_id);
+        rd_kafkap_str_destroy(rk->rk_group_id);
+        rd_kafkap_str_destroy(rk->rk_eos.TransactionalId);
+	rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
+        rd_list_destroy(&rk->rk_broker_by_id);
+
+	rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes);
+	rwlock_destroy(&rk->rk_lock);
+
+	rd_free(rk);
+	rd_kafka_global_cnt_decr();
+}
+
+
+static void rd_kafka_destroy_app (rd_kafka_t *rk, int blocking) {
+        thrd_t thrd;
+#ifndef _MSC_VER
+	int term_sig = rk->rk_conf.term_sig;
+#endif
+        rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance");
+
+        /* The legacy/simple consumer lacks an API to close down the consumer*/
+        if (rk->rk_cgrp) {
+                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                             "Closing consumer group");
+                rd_kafka_consumer_close(rk);
+        }
+
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
+        rd_kafka_wrlock(rk);
+        thrd = rk->rk_thread;
+	rd_atomic32_add(&rk->rk_terminate, 1);
+        rd_kafka_timers_interrupt(&rk->rk_timers);
+        rd_kafka_wrunlock(rk);
+
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                     "Sending TERMINATE to main background thread");
+        /* Send op to trigger queue/io wake-up.
+         * The op itself is (likely) ignored by the receiver. */
+        rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+	rd_kafka_brokers_broadcast_state_change(rk);
+
+#ifndef _MSC_VER
+        /* Interrupt main kafka thread to speed up termination. */
+	if (term_sig) {
+                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                             "Sending thread kill signal %d", term_sig);
+                pthread_kill(thrd, term_sig);
+        }
+#endif
+
+        if (!blocking)
+                return; /* FIXME: thread resource leak */
+
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                     "Joining main background thread");
+
+        if (thrd_join(thrd, NULL) != thrd_success)
+                rd_kafka_assert(NULL, !*"failed to join main thread");
+
+        rd_kafka_destroy_final(rk);
+}
+
+
+/* NOTE: Must only be called by application.
+ *       librdkafka itself must use rd_kafka_destroy0(). */
+void rd_kafka_destroy (rd_kafka_t *rk) {
+        rd_kafka_destroy_app(rk, 1);
+}
+
+
+/**
+ * Main destructor for rd_kafka_t
+ *
+ * Locality: rdkafka main thread or application thread during rd_kafka_new()
+ */
+static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
+	rd_kafka_itopic_t *rkt, *rkt_tmp;
+	rd_kafka_broker_t *rkb, *rkb_tmp;
+        rd_list_t wait_thrds;
+        thrd_t *thrd;
+        int i;
+
+        rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
+
+        /* Call on_destroy() interceptors */
+        rd_kafka_interceptors_on_destroy(rk);
+
+	/* Brokers pick up on rk_terminate automatically. */
+
+        /* List of (broker) threads to join to synchronize termination */
+        rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
+
+	rd_kafka_wrlock(rk);
+
+        rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
+	/* Decommission all topics */
+	TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
+		rd_kafka_wrunlock(rk);
+		rd_kafka_topic_partitions_remove(rkt);
+		rd_kafka_wrlock(rk);
+	}
+
+        /* Decommission brokers.
+         * Broker thread holds a refcount and detects when broker refcounts
+         * reaches 1 and then decommissions itself. */
+        TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
+                /* Add broker's thread to wait_thrds list for later joining */
+                thrd = malloc(sizeof(*thrd));
+                *thrd = rkb->rkb_thread;
+                rd_list_add(&wait_thrds, thrd);
+                rd_kafka_wrunlock(rk);
+
+                /* Send op to trigger queue/io wake-up.
+                 * The op itself is (likely) ignored by the broker thread. */
+                rd_kafka_q_enq(rkb->rkb_ops,
+                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+#ifndef _MSC_VER
+                /* Interrupt IO threads to speed up termination. */
+                if (rk->rk_conf.term_sig)
+			pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
+#endif
+
+                rd_kafka_broker_destroy(rkb);
+
+                rd_kafka_wrlock(rk);
+        }
+
+        if (rk->rk_clusterid) {
+                rd_free(rk->rk_clusterid);
+                rk->rk_clusterid = NULL;
+        }
+
+        rd_kafka_wrunlock(rk);
+
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                     "Purging reply queue");
+
+	/* Purge op-queue */
+        rd_kafka_q_disable(rk->rk_rep);
+	rd_kafka_q_purge(rk->rk_rep);
+
+	/* Loose our special reference to the internal broker. */
+        mtx_lock(&rk->rk_internal_rkb_lock);
+	if ((rkb = rk->rk_internal_rkb)) {
+                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                             "Decommissioning internal broker");
+
+                /* Send op to trigger queue wake-up. */
+                rd_kafka_q_enq(rkb->rkb_ops,
+                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+                rk->rk_internal_rkb = NULL;
+                thrd = malloc(sizeof(*thrd));
+                *thrd = rkb->rkb_thread;
+                rd_list_add(&wait_thrds, thrd);
+        }
+        mtx_unlock(&rk->rk_internal_rkb_lock);
+	if (rkb)
+		rd_kafka_broker_destroy(rkb);
+
+
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                     "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));
+
+        /* Join broker threads */
+        RD_LIST_FOREACH(thrd, &wait_thrds, i) {
+                if (thrd_join(*thrd, NULL) != thrd_success)
+                        ;
+                free(thrd);
+        }
+
+        rd_list_destroy(&wait_thrds);
+}
+
+
+/* Stats buffer printf */
+#define _st_printf(...) do {					\
+		ssize_t r;					\
+		ssize_t rem = size-of;				\
+		r = rd_snprintf(buf+of, rem, __VA_ARGS__);	\
+		if (r >= rem) {					\
+			size *= 2;				\
+			rem = size-of;				\
+			buf = rd_realloc(buf, size);		\
+			r = rd_snprintf(buf+of, rem, __VA_ARGS__);	\
+		}						\
+		of += r;					\
+	} while (0)
+
+/**
+ * Emit stats for toppar
+ */
+static RD_INLINE void rd_kafka_stats_emit_toppar (char **bufp, size_t *sizep,
+					       size_t *ofp,
+					       rd_kafka_toppar_t *rktp,
+					       int first) {
+	char *buf = *bufp;
+	size_t size = *sizep;
+	size_t of = *ofp;
+        int64_t consumer_lag = -1;
+        struct offset_stats offs;
+        int32_t leader_nodeid = -1;
+
+        rd_kafka_toppar_lock(rktp);
+
+        if (rktp->rktp_leader) {
+                rd_kafka_broker_lock(rktp->rktp_leader);
+                leader_nodeid = rktp->rktp_leader->rkb_nodeid;
+                rd_kafka_broker_unlock(rktp->rktp_leader);
+        }
+
+        /* Grab a copy of the latest finalized offset stats */
+        offs = rktp->rktp_offsets_fin;
+
+        if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID &&
+            rktp->rktp_app_offset >= 0) {
+                if (unlikely(rktp->rktp_app_offset > rktp->rktp_hi_offset))
+                        consumer_lag = 0;
+                else
+                        consumer_lag = rktp->rktp_hi_offset -
+                                rktp->rktp_app_offset;
+        }
+
+	_st_printf("%s\"%"PRId32"\": { "
+		   "\"partition\":%"PRId32", "
+		   "\"leader\":%"PRId32", "
+		   "\"desired\":%s, "
+		   "\"unknown\":%s, "
+		   "\"msgq_cnt\":%i, "
+		   "\"msgq_bytes\":%"PRIu64", "
+		   "\"xmit_msgq_cnt\":%i, "
+		   "\"xmit_msgq_bytes\":%"PRIu64", "
+		   "\"fetchq_cnt\":%i, "
+		   "\"fetchq_size\":%"PRIu64", "
+		   "\"fetch_state\":\"%s\", "
+		   "\"query_offset\":%"PRId64", "
+		   "\"next_offset\":%"PRId64", "
+		   "\"app_offset\":%"PRId64", "
+		   "\"stored_offset\":%"PRId64", "
+		   "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
+		   "\"committed_offset\":%"PRId64", "
+		   "\"eof_offset\":%"PRId64", "
+		   "\"lo_offset\":%"PRId64", "
+		   "\"hi_offset\":%"PRId64", "
+                   "\"consumer_lag\":%"PRId64", "
+		   "\"txmsgs\":%"PRIu64", "
+		   "\"txbytes\":%"PRIu64", "
+                   "\"msgs\": %"PRIu64", "
+                   "\"rx_ver_drops\": %"PRIu64" "
+		   "} ",
+		   first ? "" : ", ",
+		   rktp->rktp_partition,
+		   rktp->rktp_partition,
+                   leader_nodeid,
+		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
+		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
+		   rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt),
+		   rd_atomic64_get(&rktp->rktp_msgq.rkmq_msg_bytes),
+		   rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt),
+		   rd_atomic64_get(&rktp->rktp_xmit_msgq.rkmq_msg_bytes),
+		   rd_kafka_q_len(rktp->rktp_fetchq),
+		   rd_kafka_q_size(rktp->rktp_fetchq),
+		   rd_kafka_fetch_states[rktp->rktp_fetch_state],
+		   rktp->rktp_query_offset,
+                   offs.fetch_offset,
+		   rktp->rktp_app_offset,
+		   rktp->rktp_stored_offset,
+		   rktp->rktp_committed_offset, /* FIXME: issue #80 */
+		   rktp->rktp_committed_offset,
+                   offs.eof_offset,
+		   rktp->rktp_lo_offset,
+		   rktp->rktp_hi_offset,
+                   consumer_lag,
+                   rd_atomic64_get(&rktp->rktp_c.tx_msgs),
+		   rd_atomic64_get(&rktp->rktp_c.tx_bytes),
+		   rd_atomic64_get(&rktp->rktp_c.msgs),
+                   rd_atomic64_get(&rktp->rktp_c.rx_ver_drops));
+
+        rd_kafka_toppar_unlock(rktp);
+
+	*bufp = buf;
+	*sizep = size;
+	*ofp = of;
+}
+
+/**
+ * Emit all statistics
+ */
+static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
+	char  *buf;
+	size_t size = 1024*10;
+	size_t of = 0;
+	rd_kafka_broker_t *rkb;
+	rd_kafka_itopic_t *rkt;
+	shptr_rd_kafka_toppar_t *s_rktp;
+	rd_ts_t now;
+	rd_kafka_op_t *rko;
+	unsigned int tot_cnt;
+	size_t tot_size;
+
+	buf = rd_malloc(size);
+
+
+	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
+	rd_kafka_rdlock(rk);
+
+	now = rd_clock();
+	_st_printf("{ "
+                   "\"name\": \"%s\", "
+                   "\"type\": \"%s\", "
+		   "\"ts\":%"PRId64", "
+		   "\"time\":%lli, "
+		   "\"replyq\":%i, "
+                   "\"msg_cnt\":%u, "
+		   "\"msg_size\":%"PRIusz", "
+                   "\"msg_max\":%u, "
+		   "\"msg_size_max\":%"PRIusz", "
+                   "\"simple_cnt\":%i, "
+                   "\"metadata_cache_cnt\":%i, "
+		   "\"brokers\":{ "/*open brokers*/,
+                   rk->rk_name,
+                   rd_kafka_type2str(rk->rk_type),
+		   now,
+		   (signed long long)time(NULL),
+		   rd_kafka_q_len(rk->rk_rep),
+		   tot_cnt, tot_size,
+		   rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
+                   rd_atomic32_get(&rk->rk_simple_cnt),
+                   rk->rk_metadata_cache.rkmc_cnt);
+
+
+	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
+		rd_avg_t rtt, throttle, int_latency;
+		rd_kafka_toppar_t *rktp;
+
+		rd_kafka_broker_lock(rkb);
+		rd_avg_rollover(&int_latency, &rkb->rkb_avg_int_latency);
+		rd_avg_rollover(&rtt, &rkb->rkb_avg_rtt);
+		rd_avg_rollover(&throttle, &rkb->rkb_avg_throttle);
+		_st_printf("%s\"%s\": { "/*open broker*/
+			   "\"name\":\"%s\", "
+			   "\"nodeid\":%"PRId32", "
+			   "\"state\":\"%s\", "
+                           "\"stateage\":%"PRId64", "
+			   "\"outbuf_cnt\":%i, "
+			   "\"outbuf_msg_cnt\":%i, "
+			   "\"waitresp_cnt\":%i, "
+			   "\"waitresp_msg_cnt\":%i, "
+			   "\"tx\":%"PRIu64", "
+			   "\"txbytes\":%"PRIu64", "
+			   "\"txerrs\":%"PRIu64", "
+			   "\"txretries\":%"PRIu64", "
+			   "\"req_timeouts\":%"PRIu64", "
+			   "\"rx\":%"PRIu64", "
+			   "\"rxbytes\":%"PRIu64", "
+			   "\"rxerrs\":%"PRIu64", "
+                           "\"rxcorriderrs\":%"PRIu64", "
+                           "\"rxpartial\":%"PRIu64", "
+                           "\"zbuf_grow\":%"PRIu64", "
+                           "\"buf_grow\":%"PRIu64", "
+                           "\"wakeups\":%"PRIu64", "
+			   "\"int_latency\": {"
+			   " \"min\":%"PRId64","
+			   " \"max\":%"PRId64","
+			   " \"avg\":%"PRId64","
+			   " \"sum\":%"PRId64","
+			   " \"cnt\":%i "
+			   "}, "
+			   "\"rtt\": {"
+			   " \"min\":%"PRId64","
+			   " \"max\":%"PRId64","
+			   " \"avg\":%"PRId64","
+			   " \"sum\":%"PRId64","
+			   " \"cnt\":%i "
+			   "}, "
+			   "\"throttle\": {"
+			   " \"min\":%"PRId64","
+			   " \"max\":%"PRId64","
+			   " \"avg\":%"PRId64","
+			   " \"sum\":%"PRId64","
+			   " \"cnt\":%i "
+			   "}, "
+			   "\"toppars\":{ "/*open toppars*/,
+			   rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
+			   rkb->rkb_name,
+			   rkb->rkb_name,
+			   rkb->rkb_nodeid,
+			   rd_kafka_broker_state_names[rkb->rkb_state],
+                           rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
+			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
+			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
+			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
+			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
+			   rd_atomic64_get(&rkb->rkb_c.tx),
+			   rd_atomic64_get(&rkb->rkb_c.tx_bytes),
+			   rd_atomic64_get(&rkb->rkb_c.tx_err),
+			   rd_atomic64_get(&rkb->rkb_c.tx_retries),
+			   rd_atomic64_get(&rkb->rkb_c.req_timeouts),
+			   rd_atomic64_get(&rkb->rkb_c.rx),
+			   rd_atomic64_get(&rkb->rkb_c.rx_bytes),
+			   rd_atomic64_get(&rkb->rkb_c.rx_err),
+			   rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
+			   rd_atomic64_get(&rkb->rkb_c.rx_partial),
+                           rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
+                           rd_atomic64_get(&rkb->rkb_c.buf_grow),
+                           rd_atomic64_get(&rkb->rkb_c.wakeups),
+			   int_latency.ra_v.minv,
+			   int_latency.ra_v.maxv,
+			   int_latency.ra_v.avg,
+			   int_latency.ra_v.sum,
+			   int_latency.ra_v.cnt,
+			   rtt.ra_v.minv,
+			   rtt.ra_v.maxv,
+			   rtt.ra_v.avg,
+			   rtt.ra_v.sum,
+			   rtt.ra_v.cnt,
+			   throttle.ra_v.minv,
+			   throttle.ra_v.maxv,
+			   throttle.ra_v.avg,
+			   throttle.ra_v.sum,
+			   throttle.ra_v.cnt);
+
+		TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
+			_st_printf("%s\"%.*s-%"PRId32"\": { "
+				   "\"topic\":\"%.*s\", "
+				   "\"partition\":%"PRId32"} ",
+				   rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
+				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+                                   rktp->rktp_partition,
+				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+				   rktp->rktp_partition);
+		}
+
+		rd_kafka_broker_unlock(rkb);
+
+		_st_printf("} "/*close toppars*/
+			   "} "/*close broker*/);
+	}
+
+
+	_st_printf("}, " /* close "brokers" array */
+		   "\"topics\":{ ");
+
+	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
+		int i, j;
+
+		rd_kafka_topic_rdlock(rkt);
+		_st_printf("%s\"%.*s\": { "
+			   "\"topic\":\"%.*s\", "
+			   "\"metadata_age\":%"PRId64", "
+			   "\"partitions\":{ " /*open partitions*/,
+			   rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
+			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
+			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
+			   rkt->rkt_ts_metadata ?
+			   (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0);
+
+		for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
+			rd_kafka_stats_emit_toppar(&buf, &size, &of,
+						   rd_kafka_toppar_s2i(rkt->rkt_p[i]),
+						   i == 0);
+
+                RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j)
+			rd_kafka_stats_emit_toppar(&buf, &size, &of,
+						   rd_kafka_toppar_s2i(s_rktp),
+						   i+j == 0);
+
+                i += j;
+
+		if (rkt->rkt_ua)
+			rd_kafka_stats_emit_toppar(&buf, &size, &of,
+						   rd_kafka_toppar_s2i(rkt->rkt_ua),
+                                                   i++ == 0);
+		rd_kafka_topic_rdunlock(rkt);
+
+		_st_printf("} "/*close partitions*/
+			   "} "/*close topic*/);
+
+	}
+	_st_printf("} "/*close topics*/);
+
+        if (rk->rk_cgrp) {
+                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+                _st_printf(", \"cgrp\": { "
+                           "\"rebalance_age\": %"PRId64", "
+                           "\"rebalance_cnt\": %d, "
+                           "\"assignment_size\": %d }",
+                           rkcg->rkcg_c.ts_rebalance ?
+                           (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
+                           rkcg->rkcg_c.rebalance_cnt,
+                           rkcg->rkcg_c.assignment_size);
+        }
+	rd_kafka_rdunlock(rk);
+
+        _st_printf("}"/*close object*/);
+
+
+	/* Enqueue op for application */
+	rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
+        rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
+	rko->rko_u.stats.json = buf;
+	rko->rko_u.stats.json_len = of;
+	rd_kafka_q_enq(rk->rk_rep, rko);
+}
+
+
+
+static void rd_kafka_topic_scan_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
+        rd_kafka_t *rk = rkts->rkts_rk;
+	rd_kafka_topic_scan_all(rk, rd_clock());
+}
+
+static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
+        rd_kafka_t *rk = rkts->rkts_rk;
+	rd_kafka_stats_emit_all(rk);
+}
+
+
+/**
+ * @brief Periodic metadata refresh callback
+ *
+ * @locality rdkafka main thread
+ */
+static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
+        rd_kafka_t *rk = rkts->rkts_rk;
+        int sparse = 1;
+
+        /* Dont do sparse requests if there is a consumer group with an
+         * active subscription since subscriptions need to be able to match
+         * on all topics. */
+        if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp &&
+            rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
+                sparse = 0;
+
+        if (sparse)
+                rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
+                                                       "periodic refresh");
+        else
+                rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh");
+}
+
+
+/**
+ * Main loop for Kafka handler thread.
+ */
+static int rd_kafka_thread_main (void *arg) {
+        rd_kafka_t *rk = arg;
+	rd_kafka_timer_t tmr_topic_scan = RD_ZERO_INIT;
+	rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
+	rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
+
+        rd_snprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), "main");
+
+	(void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
+
+	/* Acquire lock (which was held by thread creator during creation)
+	 * to synchronise state. */
+	rd_kafka_wrlock(rk);
+	rd_kafka_wrunlock(rk);
+
+	rd_kafka_timer_start(&rk->rk_timers, &tmr_topic_scan, 1000000,
+			     rd_kafka_topic_scan_tmr_cb, NULL);
+	rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
+			     rk->rk_conf.stats_interval_ms * 1000ll,
+			     rd_kafka_stats_emit_tmr_cb, NULL);
+        if (rk->rk_conf.metadata_refresh_interval_ms > 0)
+                rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
+                                     rk->rk_conf.metadata_refresh_interval_ms *
+                                     1000ll,
+                                     rd_kafka_metadata_refresh_cb, NULL);
+
+        if (rk->rk_cgrp) {
+                rd_kafka_cgrp_reassign_broker(rk->rk_cgrp);
+                rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
+        }
+
+	while (likely(!rd_kafka_terminating(rk) ||
+		      rd_kafka_q_len(rk->rk_ops))) {
+                rd_ts_t sleeptime = rd_kafka_timers_next(
+                        &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
+                rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
+                                 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
+		if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
+			rd_kafka_cgrp_serve(rk->rk_cgrp);
+		rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
+	}
+
+	rd_kafka_q_disable(rk->rk_ops);
+	rd_kafka_q_purge(rk->rk_ops);
+
+        rd_kafka_timer_stop(&rk->rk_timers, &tmr_topic_scan, 1);
+        rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
+        rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
+
+        /* Synchronise state */
+        rd_kafka_wrlock(rk);
+        rd_kafka_wrunlock(rk);
+
+        rd_kafka_destroy_internal(rk);
+
+        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+                     "Main background thread exiting");
+
+	rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
+
+	return 0;
+}
+
+
+static void rd_kafka_term_sig_handler (int sig) {
+	/* nop */
+}
+
+
+rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
+			  char *errstr, size_t errstr_size) {
+	rd_kafka_t *rk;
+	static rd_atomic32_t rkid;
+        rd_kafka_conf_t *conf;
+        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+        int ret_errno = 0;
+#ifndef _MSC_VER
+        sigset_t newset, oldset;
+#endif
+
+	call_once(&rd_kafka_global_init_once, rd_kafka_global_init);
+
+        /* rd_kafka_new() takes ownership of the provided \p app_conf
+         * object if rd_kafka_new() succeeds.
+         * Since \p app_conf is optional we allocate a default configuration
+         * object here if \p app_conf is NULL.
+         * The configuration object itself is struct-copied later
+         * leaving the default *conf pointer to be ready for freeing.
+         * In case new() fails and app_conf was specified we will clear out
+         * rk_conf to avoid double-freeing from destroy_internal() and the
+         * user's eventual call to rd_kafka_conf_destroy().
+         * This is all a bit tricky but that's the nature of
+         * legacy interfaces. */
+        if (!app_conf)
+                conf = rd_kafka_conf_new();
+        else
+                conf = app_conf;
+
+        /* Verify mandatory configuration */
+        if (!conf->socket_cb) {
+                rd_snprintf(errstr, errstr_size,
+                            "Mandatory config property 'socket_cb' not set");
+                if (!app_conf)
+                        rd_kafka_conf_destroy(conf);
+                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+                return NULL;
+        }
+
+        if (!conf->open_cb) {
+                rd_snprintf(errstr, errstr_size,
+                            "Mandatory config property 'open_cb' not set");
+                if (!app_conf)
+                        rd_kafka_conf_destroy(conf);
+                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+                return NULL;
+        }
+
+        if (conf->metadata_max_age_ms == -1) {
+                if (conf->metadata_refresh_interval_ms > 0)
+                        conf->metadata_max_age_ms =
+                                conf->metadata_refresh_interval_ms * 3;
+                else /* use default value of refresh * 3 */
+                        conf->metadata_max_age_ms = 5*60*1000 * 3;
+        }
+
+	rd_kafka_global_cnt_incr();
+
+	/*
+	 * Set up the handle.
+	 */
+	rk = rd_calloc(1, sizeof(*rk));
+
+	rk->rk_type = type;
+
+        /* Struct-copy the config object. */
+	rk->rk_conf = *conf;
+        if (!app_conf)
+                rd_free(conf); /* Free the base config struct only,
+                                * not its fields since they were copied to
+                                * rk_conf just above. Those fields are
+                                * freed from rd_kafka_destroy_internal()
+                                * as the rk itself is destroyed. */
+
+        /* Call on_new() interceptors */
+        rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
+
+	rwlock_init(&rk->rk_lock);
+        mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
+
+	cnd_init(&rk->rk_broker_state_change_cnd);
+	mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
+
+	rk->rk_rep = rd_kafka_q_new(rk);
+	rk->rk_ops = rd_kafka_q_new(rk);
+        rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
+        rk->rk_ops->rkq_opaque = rk;
+
+        if (rk->rk_conf.log_queue) {
+                rk->rk_logq = rd_kafka_q_new(rk);
+                rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
+                rk->rk_logq->rkq_opaque = rk;
+        }
+
+	TAILQ_INIT(&rk->rk_brokers);
+	TAILQ_INIT(&rk->rk_topics);
+        rd_kafka_timers_init(&rk->rk_timers, rk);
+        rd_kafka_metadata_cache_init(rk);
+
+	if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
+		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
+	if (rk->rk_conf.rebalance_cb)
+		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
+	if (rk->rk_conf.offset_commit_cb)
+		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
+
+	/* Convenience Kafka protocol null bytes */
+	rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0);
+
+	if (rk->rk_conf.debug)
+                rk->rk_conf.log_level = LOG_DEBUG;
+
+	rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
+                    rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
+                    rd_atomic32_add(&rkid, 1));
+
+	/* Construct clientid kafka string */
+	rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);
+
+        /* Convert group.id to kafka string (may be NULL) */
+        rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);
+
+        /* Config fixups */
+        rk->rk_conf.queued_max_msg_bytes =
+                (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
+
+	/* Enable api.version.request=true if fallback.broker.version
+	 * indicates a supporting broker. */
+	if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
+		rk->rk_conf.api_version_request = 1;
+
+	if (rk->rk_type == RD_KAFKA_PRODUCER) {
+		mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
+		cnd_init(&rk->rk_curr_msgs.cnd);
+		rk->rk_curr_msgs.max_cnt =
+			rk->rk_conf.queue_buffering_max_msgs;
+                if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 >
+                    (unsigned long long)SIZE_MAX)
+                        rk->rk_curr_msgs.max_size = SIZE_MAX;
+                else
+                        rk->rk_curr_msgs.max_size =
+                        (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024;
+	}
+
+        if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
+                ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+                ret_errno = EINVAL;
+                goto fail;
+        }
+
+        if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
+            rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
+                if (rd_kafka_sasl_select_provider(rk,
+                                                  errstr, errstr_size) == -1) {
+                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+                        ret_errno = EINVAL;
+                        goto fail;
+                }
+        }
+
+#if WITH_SSL
+	if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
+	    rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
+		/* Create SSL context */
+		if (rd_kafka_transport_ssl_ctx_init(rk, errstr,
+						    errstr_size) == -1) {
+                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+                        ret_errno = EINVAL;
+                        goto fail;
+                }
+        }
+#endif
+
+	/* Client group, eligible both in consumer and producer mode. */
+        if (type == RD_KAFKA_CONSUMER &&
+	    RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
+                rk->rk_cgrp = rd_kafka_cgrp_new(rk,
+                                                rk->rk_group_id,
+                                                rk->rk_client_id);
+
+
+
+#ifndef _MSC_VER
+        /* Block all signals in newly created thread.
+         * To avoid race condition we block all signals in the calling
+         * thread, which the new thread will inherit its sigmask from,
+         * and then restore the original sigmask of the calling thread when
+         * we're done creating the thread. */
+        sigemptyset(&oldset);
+        sigfillset(&newset);
+	if (rk->rk_conf.term_sig) {
+		struct sigaction sa_term = {
+			.sa_handler = rd_kafka_term_sig_handler
+		};
+		sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
+	}
+        pthread_sigmask(SIG_SETMASK, &newset, &oldset);
+#endif
+
+	/* Lock handle here to synchronise state, i.e., hold off
+	 * the thread until we've finalized the handle. */
+	rd_kafka_wrlock(rk);
+
+	/* Create handler thread */
+	if ((thrd_create(&rk->rk_thread,
+			 rd_kafka_thread_main, rk)) != thrd_success) {
+                ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+                ret_errno = errno;
+		if (errstr)
+			rd_snprintf(errstr, errstr_size,
+				    "Failed to create thread: %s (%i)",
+				    rd_strerror(errno), errno);
+		rd_kafka_wrunlock(rk);
+#ifndef _MSC_VER
+                /* Restore sigmask of caller */
+                pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+                goto fail;
+        }
+
+        rd_kafka_wrunlock(rk);
+
+        rk->rk_eos.PID = -1;
+        rk->rk_eos.TransactionalId = rd_kafkap_str_new(NULL, 0);
+
+        mtx_lock(&rk->rk_internal_rkb_lock);
+	rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
+						  RD_KAFKA_PROTO_PLAINTEXT,
+						  "", 0, RD_KAFKA_NODEID_UA);
+        mtx_unlock(&rk->rk_internal_rkb_lock);
+
+	/* Add initial list of brokers from configuration */
+	if (rk->rk_conf.brokerlist) {
+		if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
+			rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
+					"No brokers configured");
+	}
+
+#ifndef _MSC_VER
+	/* Restore sigmask of caller */
+	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+
+        /* Free user supplied conf's base pointer on success,
+         * but not the actual allocated fields since the struct
+         * will have been copied in its entirety above. */
+        if (app_conf)
+                rd_free(app_conf);
+	rd_kafka_set_last_error(0, 0);
+
+        rk->rk_initialized = 1;
+
+	return rk;
+
+fail:
+        /*
+         * Error out and clean up
+         */
+
+        /* If on_new() interceptors have been called we also need
+         * to allow interceptor clean-up by calling on_destroy() */
+        rd_kafka_interceptors_on_destroy(rk);
+
+        /* If rk_conf is a struct-copy of the application configuration
+         * we need to avoid rk_conf fields from being freed from
+         * rd_kafka_destroy_internal() since they belong to app_conf.
+         * However, there are some internal fields, such as interceptors,
+         * that belong to rk_conf and thus needs to be cleaned up.
+         * Legacy APIs, sigh.. */
+        if (app_conf) {
+                rd_kafka_assignors_term(rk);
+                rd_kafka_interceptors_destroy(&rk->rk_conf);
+                memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
+        }
+
+        rd_atomic32_add(&rk->rk_terminate, 1);
+        rd_kafka_destroy_internal(rk);
+        rd_kafka_destroy_final(rk);
+
+        rd_kafka_set_last_error(ret_err, ret_errno);
+
+        return NULL;
+}
+
+
+
+
+
+/**
+ * Produce a single message.
+ * Locality: any application thread
+ */
+int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
+		      int msgflags,
+		      void *payload, size_t len,
+		      const void *key, size_t keylen,
+		      void *msg_opaque) {
+	return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
+				msgflags, payload, len,
+				key, keylen, msg_opaque);
+}
+
+
+/**
+ * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
+ * friends) since it does not have an API for stopping the cgrp we will need to
+ * sort that out automatically in the background when all consumption
+ * has stopped.
+ *
+ * Returns 0 if a  High level consumer is already instantiated
+ * which means a Simple consumer cannot co-operate with it, else 1.
+ *
+ * A rd_kafka_t handle can never migrate from simple to high-level, or
+ * vice versa, so we dont need a ..consumer_del().
+ */
+int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
+        if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
+                return 0;
+
+        return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
+}
+
+
+
+
+/**
+ * rktp fetch is split up in these parts:
+ *   * application side:
+ *   * broker side (handled by current leader broker thread for rktp):
+ *          - the fetch state, initial offset, etc.
+ *          - fetching messages, updating fetched offset, etc.
+ *          - offset commits
+ *
+ * Communication between the two are:
+ *    app side -> rdkafka main side: rktp_ops
+ *    broker thread -> app side: rktp_fetchq
+ *
+ * There is no shared state between these threads, instead
+ * state is communicated through the two op queues, and state synchronization
+ * is performed by version barriers.
+ *
+ */
+
+static RD_UNUSED
+int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
+				    int64_t offset, rd_kafka_q_t *rkq) {
+	shptr_rd_kafka_toppar_t *s_rktp;
+
+	if (partition < 0) {
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+					ESRCH);
+		return -1;
+	}
+
+        if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+                return -1;
+        }
+
+	rd_kafka_topic_wrlock(rkt);
+	s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
+	rd_kafka_topic_wrunlock(rkt);
+
+        /* Verify offset */
+	if (offset == RD_KAFKA_OFFSET_BEGINNING ||
+	    offset == RD_KAFKA_OFFSET_END ||
+            offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
+                /* logical offsets */
+
+	} else if (offset == RD_KAFKA_OFFSET_STORED) {
+		/* offset manager */
+
+                if (rkt->rkt_conf.offset_store_method ==
+                    RD_KAFKA_OFFSET_METHOD_BROKER &&
+                    RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
+                        /* Broker based offsets require a group id. */
+                        rd_kafka_toppar_destroy(s_rktp);
+			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
+						EINVAL);
+                        return -1;
+                }
+
+	} else if (offset < 0) {
+		rd_kafka_toppar_destroy(s_rktp);
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
+					EINVAL);
+		return -1;
+
+        }
+
+        rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
+				       rkq, RD_KAFKA_NO_REPLYQ);
+
+        rd_kafka_toppar_destroy(s_rktp);
+
+	rd_kafka_set_last_error(0, 0);
+	return 0;
+}
+
+
+
+
+int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
+			    int64_t offset) {
+        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
+                     "Start consuming partition %"PRId32,partition);
+ 	return rd_kafka_consume_start0(rkt, partition, offset, NULL);
+}
+
+int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
+				  int64_t offset, rd_kafka_queue_t *rkqu) {
+        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+
+ 	return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
+}
+
+
+
+
+static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
+        rd_kafka_q_t *tmpq = NULL;
+        rd_kafka_resp_err_t err;
+
+        rd_kafka_topic_wrlock(rktp->rktp_rkt);
+        rd_kafka_toppar_lock(rktp);
+	rd_kafka_toppar_desired_del(rktp);
+        rd_kafka_toppar_unlock(rktp);
+	rd_kafka_topic_wrunlock(rktp->rktp_rkt);
+
+        tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
+
+        rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
+
+        /* Synchronisation: Wait for stop reply from broker thread */
+        err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
+        rd_kafka_q_destroy(tmpq);
+
+	rd_kafka_set_last_error(err, err ? EINVAL : 0);
+
+	return err ? -1 : 0;
+}
+
+
+int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
+        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+	shptr_rd_kafka_toppar_t *s_rktp;
+        int r;
+
+	if (partition == RD_KAFKA_PARTITION_UA) {
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+		return -1;
+	}
+
+	rd_kafka_topic_wrlock(rkt);
+	if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
+	    !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
+		rd_kafka_topic_wrunlock(rkt);
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+					ESRCH);
+		return -1;
+	}
+        rd_kafka_topic_wrunlock(rkt);
+
+        r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp));
+	/* set_last_error() called by stop0() */
+
+        rd_kafka_toppar_destroy(s_rktp);
+
+        return r;
+}
+
+
+
+rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
+                                   int32_t partition,
+                                   int64_t offset,
+                                   int timeout_ms) {
+        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+        shptr_rd_kafka_toppar_t *s_rktp;
+	rd_kafka_toppar_t *rktp;
+        rd_kafka_q_t *tmpq = NULL;
+        rd_kafka_resp_err_t err;
+
+        /* FIXME: simple consumer check */
+
+	if (partition == RD_KAFKA_PARTITION_UA)
+                return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+	rd_kafka_topic_rdlock(rkt);
+	if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
+	    !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
+		rd_kafka_topic_rdunlock(rkt);
+                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+	}
+	rd_kafka_topic_rdunlock(rkt);
+
+        if (timeout_ms)
+                tmpq = rd_kafka_q_new(rkt->rkt_rk);
+
+        rktp = rd_kafka_toppar_s2i(s_rktp);
+        if ((err = rd_kafka_toppar_op_seek(rktp, offset,
+					   RD_KAFKA_REPLYQ(tmpq, 0)))) {
+                if (tmpq)
+                        rd_kafka_q_destroy(tmpq);
+                rd_kafka_toppar_destroy(s_rktp);
+                return err;
+        }
+
+	rd_kafka_toppar_destroy(s_rktp);
+
+        if (tmpq) {
+                err = rd_kafka_q_wait_result(tmpq, timeout_ms);
+                rd_kafka_q_destroy(tmpq);
+                return err;
+        }
+
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
+					int timeout_ms,
+					rd_kafka_message_t **rkmessages,
+					size_t rkmessages_size) {
+	/* Populate application's rkmessages array. */
+	return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
+					   rkmessages, rkmessages_size);
+}
+
+
+ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
+				int timeout_ms,
+				rd_kafka_message_t **rkmessages,
+				size_t rkmessages_size) {
+        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+	shptr_rd_kafka_toppar_t *s_rktp;
+        rd_kafka_toppar_t *rktp;
+	ssize_t cnt;
+
+	/* Get toppar */
+	rd_kafka_topic_rdlock(rkt);
+	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
+	if (unlikely(!s_rktp))
+		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
+	rd_kafka_topic_rdunlock(rkt);
+
+	if (unlikely(!s_rktp)) {
+		/* No such toppar known */
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+					ESRCH);
+		return -1;
+	}
+
+        rktp = rd_kafka_toppar_s2i(s_rktp);
+
+	/* Populate application's rkmessages array. */
+	cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
+					  rkmessages, rkmessages_size);
+
+	rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
+
+	rd_kafka_set_last_error(0, 0);
+
+	return cnt;
+}
+
+ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
+				      int timeout_ms,
+				      rd_kafka_message_t **rkmessages,
+				      size_t rkmessages_size) {
+	/* Populate application's rkmessages array. */
+	return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
+				       rkmessages, rkmessages_size);
+}
+
+
+struct consume_ctx {
+	void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
+	void *opaque;
+};
+
+
+/**
+ * Trampoline for application's consume_cb()
+ */
+static rd_kafka_op_res_t
+rd_kafka_consume_cb (rd_kafka_t *rk,
+                     rd_kafka_q_t *rkq,
+                     rd_kafka_op_t *rko,
+                     rd_kafka_q_cb_type_t cb_type, void *opaque) {
+	struct consume_ctx *ctx = opaque;
+	rd_kafka_message_t *rkmessage;
+
+        if (unlikely(rd_kafka_op_version_outdated(rko, 0))) {
+                rd_kafka_op_destroy(rko);
+                return RD_KAFKA_OP_RES_HANDLED;
+        }
+
+	rkmessage = rd_kafka_message_get(rko);
+
+	rd_kafka_op_offset_store(rk, rko, rkmessage);
+
+	ctx->consume_cb(rkmessage, ctx->opaque);
+
+        rd_kafka_op_destroy(rko);
+
+        return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+
+static rd_kafka_op_res_t
+rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
+                            void (*consume_cb) (rd_kafka_message_t
+                                                *rkmessage,
+                                                void *opaque),
+                            void *opaque) {
+        struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
+        return rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
+                                RD_KAFKA_Q_CB_RETURN,
+                                rd_kafka_consume_cb, &ctx);
+
+}
+
+
+int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
+			       int timeout_ms,
+			       void (*consume_cb) (rd_kafka_message_t
+						   *rkmessage,
+						   void *opaque),
+			       void *opaque) {
+        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+        shptr_rd_kafka_toppar_t *s_rktp;
+	rd_kafka_toppar_t *rktp;
+	int r;
+
+	/* Get toppar */
+	rd_kafka_topic_rdlock(rkt);
+	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
+	if (unlikely(!s_rktp))
+		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
+	rd_kafka_topic_rdunlock(rkt);
+
+	if (unlikely(!s_rktp)) {
+		/* No such toppar known */
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+					ESRCH);
+		return -1;
+	}
+
+        rktp = rd_kafka_toppar_s2i(s_rktp);
+	r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
+                                       rkt->rkt_conf.consume_callback_max_msgs,
+				       consume_cb, opaque);
+
+	rd_kafka_toppar_destroy(s_rktp);
+
+	rd_kafka_set_last_error(0, 0);
+
+	return r;
+}
+
+
+
+int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
+				     int timeout_ms,
+				     void (*consume_cb) (rd_kafka_message_t
+							 *rkmessage,
+							 void *opaque),
+				     void *opaque) {
+	return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
+					  consume_cb, opaque);
+}
+
+
+/**
+ * Serve queue 'rkq' and return one message.
+ * By serving the queue it will also call any registered callbacks
+ * registered for matching events, this includes consumer_cb()
+ * in which case no message will be returned.
+ */
+static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
+                                              rd_kafka_q_t *rkq,
+					      int timeout_ms) {
+	rd_kafka_op_t *rko;
+	rd_kafka_message_t *rkmessage = NULL;
+	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+	rd_kafka_yield_thread = 0;
+        while ((rko = rd_kafka_q_pop(rkq,
+                                     rd_timeout_remains(abs_timeout), 0))) {
+                rd_kafka_op_res_t res;
+
+                res = rd_kafka_poll_cb(rk, rkq, rko,
+                                       RD_KAFKA_Q_CB_RETURN, NULL);
+
+                if (res == RD_KAFKA_OP_RES_PASS)
+                        break;
+
+                if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
+                            rd_kafka_yield_thread)) {
+                        /* Callback called rd_kafka_yield(), we must
+                         * stop dispatching the queue and return. */
+                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
+                                                EINTR);
+                        return NULL;
+                }
+
+                /* Message was handled by callback. */
+                continue;
+        }
+
+	if (!rko) {
+		/* Timeout reached with no op returned. */
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
+					ETIMEDOUT);
+		return NULL;
+	}
+
+        rd_kafka_assert(rk,
+                        rko->rko_type == RD_KAFKA_OP_FETCH ||
+                        rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
+
+	/* Get rkmessage from rko */
+	rkmessage = rd_kafka_message_get(rko);
+
+	/* Store offset */
+	rd_kafka_op_offset_store(rk, rko, rkmessage);
+
+	rd_kafka_set_last_error(0, 0);
+
+	return rkmessage;
+}
+
+rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
+                                      int32_t partition,
+				      int timeout_ms) {
+        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+        shptr_rd_kafka_toppar_t *s_rktp;
+	rd_kafka_toppar_t *rktp;
+	rd_kafka_message_t *rkmessage;
+
+	rd_kafka_topic_rdlock(rkt);
+	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
+	if (unlikely(!s_rktp))
+		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
+	rd_kafka_topic_rdunlock(rkt);
+
+	if (unlikely(!s_rktp)) {
+		/* No such toppar known */
+		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+					ESRCH);
+		return NULL;
+	}
+
+        rktp = rd_kafka_toppar_s2i(s_rktp);
+	rkmessage = rd_kafka_consume0(rkt->rkt_rk,
+                                      rktp->rktp_fetchq, timeout_ms);
+
+	rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
+
+	return rkmessage;
+}
+
+
+rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
+					    int timeout_ms) {
+	return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
+}
+
+
+
+
+rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
+        rd_kafka_cgrp_t *rkcg;
+
+        if (!(rkcg = rd_kafka_cgrp_get(rk)))
+                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+        rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+
+rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
+                                            int timeout_ms) {
+        rd_kafka_cgrp_t *rkcg;
+
+        if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
+                rd_kafka_message_t *rkmessage = rd_kafka_message_new();
+                rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+                return rkmessage;
+        }
+
+        return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
+}
+
+
+rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
+        rd_kafka_cgrp_t *rkcg;
+        rd_kafka_op_t *rko;
+        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+	rd_kafka_q_t *rkq;
+
+        if (!(rkcg = rd_kafka_cgrp_get(rk)))
+                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+	/* Redirect cgrp queue to our temporary queue to make sure
+	 * all posted ops (e.g., rebalance callbacks) are served by
+	 * this function. */
+	rkq = rd_kafka_q_new(rk);
+	rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
+
+        rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
+
+        while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
+                rd_kafka_op_res_t res;
+                if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
+		    RD_KAFKA_OP_TERMINATE) {
+                        err = rko->rko_err;
+                        rd_kafka_op_destroy(rko);
+                        break;
+                }
+                res = rd_kafka_poll_cb(rk, rkq, rko,
+                                       RD_KAFKA_Q_CB_RETURN, NULL);
+                if (res == RD_KAFKA_OP_RES_PASS)
+                        rd_kafka_op_destroy(rko);
+                /* Ignore YIELD, we need to finish */
+        }
+
+        rd_kafka_q_destroy(rkq);
+
+	rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
+
+        return err;
+}
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_committed (rd_kafka_t *rk,
+		    rd_kafka_topic_partition_list_t *partitions,
+		    int timeout_ms) {
+        rd_kafka_q_t *rkq;
+        rd_kafka_resp_err_t err;
+        rd_kafka_cgrp_t *rkcg;
+	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+        if (!partitions)
+                return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+        if (!(rkcg = rd_kafka_cgrp_get(rk)))
+                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+	/* Set default offsets. */
+	rd_kafka_topic_partition_list_reset_offsets(partitions,
+                                                    RD_KAFKA_OFFSET_INVALID);
+
+	rkq = rd_kafka_q_new(rk);
+
+        do {
+                rd_kafka_op_t *rko;
+		int state_version = rd_kafka_brokers_get_state_version(rk);
+
+                rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
+		rd_kafka_op_set_replyq(rko, rkq, NULL);
+
+                /* Issue #827
+                 * Copy partition list to avoid use-after-free if we time out
+                 * here, the app frees the list, and then cgrp starts
+                 * processing the op. */
+		rko->rko_u.offset_fetch.partitions =
+                        rd_kafka_topic_partition_list_copy(partitions);
+		rko->rko_u.offset_fetch.do_free = 1;
+
+                if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
+                        err = RD_KAFKA_RESP_ERR__DESTROY;
+                        break;
+                }
+
+                rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0);
+                if (rko) {
+                        if (!(err = rko->rko_err))
+                                rd_kafka_topic_partition_list_update(
+                                        partitions,
+                                        rko->rko_u.offset_fetch.partitions);
+                        else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
+				    err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
+				   !rd_kafka_brokers_wait_state_change(
+					   rk, state_version,
+					   rd_timeout_remains(abs_timeout)))
+				err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+                        rd_kafka_op_destroy(rko);
+                } else
+                        err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+        } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
+		 err == RD_KAFKA_RESP_ERR__WAIT_COORD);
+
+        rd_kafka_q_destroy(rkq);
+
+        return err;
+}
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_position (rd_kafka_t *rk,
+		   rd_kafka_topic_partition_list_t *partitions) {
+ 	int i;
+
+	/* Set default offsets. */
+	rd_kafka_topic_partition_list_reset_offsets(partitions,
+						    RD_KAFKA_OFFSET_INVALID);
+
+	for (i = 0 ; i < partitions->cnt ; i++) {
+		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+		shptr_rd_kafka_toppar_t *s_rktp;
+		rd_kafka_toppar_t *rktp;
+
+		if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
+						    rktpar->partition, 0, 1))) {
+			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+			rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+			continue;
+		}
+
+		rktp = rd_kafka_toppar_s2i(s_rktp);
+		rd_kafka_toppar_lock(rktp);
+		rktpar->offset = rktp->rktp_app_offset;
+		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
+		rd_kafka_toppar_unlock(rktp);
+		rd_kafka_toppar_destroy(s_rktp);
+	}
+
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+struct _query_wmark_offsets_state {
+	rd_kafka_resp_err_t err;
+	const char *topic;
+	int32_t partition;
+	int64_t offsets[2];
+	int     offidx;  /* next offset to set from response */
+	rd_ts_t ts_end;
+	int     state_version;  /* Broker state version */
+};
+
+static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
+						  rd_kafka_broker_t *rkb,
+						  rd_kafka_resp_err_t err,
+						  rd_kafka_buf_t *rkbuf,
+						  rd_kafka_buf_t *request,
+						  void *opaque) {
+	struct _query_wmark_offsets_state *state = opaque;
+        rd_kafka_topic_partition_list_t *offsets;
+        rd_kafka_topic_partition_t *rktpar;
+
+        offsets = rd_kafka_topic_partition_list_new(1);
+        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets);
+        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+                rd_kafka_topic_partition_list_destroy(offsets);
+                return; /* Retrying */
+        }
+
+	/* Retry if no broker connection is available yet. */
+	if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
+	     err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
+	    rkb &&
+	    rd_kafka_brokers_wait_state_change(
+		    rkb->rkb_rk, state->state_version,
+		    rd_timeout_remains(state->ts_end))) {
+		/* Retry */
+		state->state_version = rd_kafka_brokers_get_state_version(rk);
+		request->rkbuf_retries = 0;
+		if (rd_kafka_buf_retry(rkb, request)) {
+                        rd_kafka_topic_partition_list_destroy(offsets);
+                        return; /* Retry in progress */
+                }
+		/* FALLTHRU */
+	}
+
+        /* Partition not seen in response. */
+        if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
+                                                          state->topic,
+                                                          state->partition)))
+                err = RD_KAFKA_RESP_ERR__BAD_MSG;
+        else if (rktpar->err)
+                err = rktpar->err;
+        else
+                state->offsets[state->offidx] = rktpar->offset;
+
+        state->offidx++;
+
+        if (err || state->offidx == 2) /* Error or Done */
+                state->err = err;
+
+        rd_kafka_topic_partition_list_destroy(offsets);
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
+                                  int32_t partition,
+                                  int64_t *low, int64_t *high, int timeout_ms) {
+        rd_kafka_q_t *rkq;
+        struct _query_wmark_offsets_state state;
+        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+        rd_kafka_topic_partition_list_t *partitions;
+        rd_kafka_topic_partition_t *rktpar;
+        struct rd_kafka_partition_leader *leader;
+        rd_list_t leaders;
+        rd_kafka_resp_err_t err;
+
+        partitions = rd_kafka_topic_partition_list_new(1);
+        rktpar = rd_kafka_topic_partition_list_add(partitions,
+                                                   topic, partition);
+
+        rd_list_init(&leaders, partitions->cnt,
+                     (void *)rd_kafka_partition_leader_destroy);
+
+        err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
+                                                          &leaders, timeout_ms);
+        if (err) {
+                         rd_list_destroy(&leaders);
+                         rd_kafka_topic_partition_list_destroy(partitions);
+                         return err;
+        }
+
+        leader = rd_list_elem(&leaders, 0);
+
+        rkq = rd_kafka_q_new(rk);
+
+        /* Due to KAFKA-1588 we need to send a request for each wanted offset,
+         * in this case one for the low watermark and one for the high. */
+        state.topic = topic;
+        state.partition = partition;
+        state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
+        state.offsets[1] = RD_KAFKA_OFFSET_END;
+        state.offidx = 0;
+        state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
+        state.ts_end = ts_end;
+        state.state_version = rd_kafka_brokers_get_state_version(rk);
+
+
+        rktpar->offset =  RD_KAFKA_OFFSET_BEGINNING;
+        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
+                               RD_KAFKA_REPLYQ(rkq, 0),
+                               rd_kafka_query_wmark_offsets_resp_cb,
+                               &state);
+
+        rktpar->offset =  RD_KAFKA_OFFSET_END;
+        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
+                               RD_KAFKA_REPLYQ(rkq, 0),
+                               rd_kafka_query_wmark_offsets_resp_cb,
+                               &state);
+
+        rd_kafka_topic_partition_list_destroy(partitions);
+        rd_list_destroy(&leaders);
+
+        /* Wait for reply (or timeout) */
+        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
+               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
+                                rd_kafka_poll_cb, NULL) !=
+               RD_KAFKA_OP_RES_YIELD)
+                ;
+
+        rd_kafka_q_destroy(rkq);
+
+        if (state.err)
+                return state.err;
+        else if (state.offidx != 2)
+                return RD_KAFKA_RESP_ERR__FAIL;
+
+        /* We are not certain about the returned order. */
+        if (state.offsets[0] < state.offsets[1]) {
+                *low = state.offsets[0];
+                *high  = state.offsets[1];
+        } else {
+                *low = state.offsets[1];
+                *high = state.offsets[0];
+        }
+
+        /* If partition is empty only one offset (the last) will be returned. */
+        if (*low < 0 && *high >= 0)
+                *low = *high;
+
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
+				int32_t partition,
+				int64_t *low, int64_t *high) {
+	shptr_rd_kafka_toppar_t *s_rktp;
+	rd_kafka_toppar_t *rktp;
+
+	s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
+	if (!s_rktp)
+		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+	rktp = rd_kafka_toppar_s2i(s_rktp);
+
+	rd_kafka_toppar_lock(rktp);
+	*low = rktp->rktp_lo_offset;
+	*high = rktp->rktp_hi_offset;
+	rd_kafka_toppar_unlock(rktp);
+
+	rd_kafka_toppar_destroy(s_rktp);
+
+	return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief get_offsets_for_times() state
+ */
+struct _get_offsets_for_times {
+        rd_kafka_topic_partition_list_t *results;
+        rd_kafka_resp_err_t err;
+        int wait_reply;
+        int state_version;
+        rd_ts_t ts_end;
+};
+
+/**
+ * @brief Handle OffsetRequest responses
+ */
+static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
+                                                  rd_kafka_broker_t *rkb,
+                                                  rd_kafka_resp_err_t err,
+                                                  rd_kafka_buf_t *rkbuf,
+                                                  rd_kafka_buf_t *request,
+                                                  void *opaque) {
+        struct _get_offsets_for_times *state = opaque;
+
+        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request,
+                                     state->results);
+        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+                return; /* Retrying */
+
+        /* Retry if no broker connection is available yet. */
+        if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
+             err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
+            rkb &&
+            rd_kafka_brokers_wait_state_change(
+                    rkb->rkb_rk, state->state_version,
+                    rd_timeout_remains(state->ts_end))) {
+                /* Retry */
+                state->state_version = rd_kafka_brokers_get_state_version(rk);
+                request->rkbuf_retries = 0;
+                if (rd_kafka_buf_retry(rkb, request))
+                        return; /* Retry in progress */
+                /* FALLTHRU */
+        }
+
+        if (err && !state->err)
+                state->err = err;
+
+        state->wait_reply--;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_offsets_for_times (rd_kafka_t *rk,
+                            rd_kafka_topic_partition_list_t *offsets,
+                            int timeout_ms) {
+        rd_kafka_q_t *rkq;
+        struct _get_offsets_for_times state = RD_ZERO_INIT;
+        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+        rd_list_t leaders;
+        int i;
+        rd_kafka_resp_err_t err;
+        struct rd_kafka_partition_leader *leader;
+
+        if (offsets->cnt == 0)
+                return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+        rd_list_init(&leaders, offsets->cnt,
+                     (void *)rd_kafka_partition_leader_destroy);
+
+        err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
+                                                          timeout_ms);
+        if (err) {
+                rd_list_destroy(&leaders);
+                return err;
+        }
+
+
+        rkq = rd_kafka_q_new(rk);
+
+        state.wait_reply = 0;
+        state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
+
+        /* For each leader send a request for its partitions */
+        RD_LIST_FOREACH(leader, &leaders, i) {
+                state.wait_reply++;
+                rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1,
+                                       RD_KAFKA_REPLYQ(rkq, 0),
+                                       rd_kafka_get_offsets_for_times_resp_cb,
+                                       &state);
+        }
+
+        rd_list_destroy(&leaders);
+
+        /* Wait for reply (or timeout) */
+        while (state.wait_reply > 0 && rd_timeout_remains(ts_end) > 0)
+                rd_kafka_q_serve(rkq, rd_timeout_remains(ts_end),
+                                0, RD_KAFKA_Q_CB_CALLBACK,
+                                 rd_kafka_poll_cb, NULL);
+
+        rd_kafka_q_destroy(rkq);
+
+        /* Then update the queried partitions. */
+        if (!state.err)
+                rd_kafka_topic_partition_list_update(offsets, state.results);
+
+        rd_kafka_topic_partition_list_destroy(state.results);
+
+        return state.err;
+}
+
+
+/**
+ * rd_kafka_poll() (and similar) op callback handler.
+ * Will either call registered callback depending on cb_type and op type
+ * or return op to application, if applicable (e.g., fetch message).
+ *
+ * Returns 1 if op was handled, else 0.
+ *
+ * Locality: application thread
+ */
+rd_kafka_op_res_t
+rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
+                  rd_kafka_q_cb_type_t cb_type, void *opaque) {
+	rd_kafka_msg_t *rkm;
+
+	/* Return-as-event requested, see if op can be converted to event,
+	 * otherwise fall through and trigger callbacks. */
+	if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko))
+		return 0; /* Return as event */
+
+        switch ((int)rko->rko_type)
+        {
+        case RD_KAFKA_OP_FETCH:
+                if (!rk->rk_conf.consume_cb ||
+                    cb_type == RD_KAFKA_Q_CB_RETURN ||
+                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
+                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
+                else {
+                        struct consume_ctx ctx = {
+                                .consume_cb = rk->rk_conf.consume_cb,
+                                .opaque = rk->rk_conf.opaque };
+
+                        return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
+                }
+                break;
+
+        case RD_KAFKA_OP_REBALANCE:
+                /* If EVENT_REBALANCE is enabled but rebalance_cb isnt
+                 * we need to perform a dummy assign for the application.
+                 * This might happen during termination with consumer_close() */
+                if (rk->rk_conf.rebalance_cb)
+                        rk->rk_conf.rebalance_cb(
+                                rk, rko->rko_err,
+                                rko->rko_u.rebalance.partitions,
+                                rk->rk_conf.opaque);
+                else {
+                        rd_kafka_dbg(rk, CGRP, "UNASSIGN",
+                                     "Forcing unassign of %d partition(s)",
+                                     rko->rko_u.rebalance.partitions ?
+                                     rko->rko_u.rebalance.partitions->cnt : 0);
+                        rd_kafka_assign(rk, NULL);
+                }
+                break;
+
+        case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
+		if (!rko->rko_u.offset_commit.cb)
+			return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
+		rko->rko_u.offset_commit.cb(
+                        rk, rko->rko_err,
+			rko->rko_u.offset_commit.partitions,
+			rko->rko_u.offset_commit.opaque);
+                break;
+
+        case RD_KAFKA_OP_CONSUMER_ERR:
+                /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
+                 *   Consumer errors are returned to the application
+                 *   as rkmessages, not error callbacks.
+                 *
+                 * rd_kafka_poll() (_Q_CB_GLOBAL):
+                 *   convert to ERR op (fallthru)
+                 */
+                if (cb_type == RD_KAFKA_Q_CB_RETURN ||
+                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
+                        /* return as message_t to application */
+                        return RD_KAFKA_OP_RES_PASS;
+                }
+		/* FALLTHRU */
+
+	case RD_KAFKA_OP_ERR:
+		if (rk->rk_conf.error_cb)
+			rk->rk_conf.error_cb(rk, rko->rko_err,
+					     rko->rko_u.err.errstr,
+                                             rk->rk_conf.opaque);
+		else
+			rd_kafka_log(rk, LOG_ERR, "ERROR",
+				     "%s: %s: %s",
+				     rk->rk_name,
+				     rd_kafka_err2str(rko->rko_err),
+				     rko->rko_u.err.errstr);
+		break;
+
+	case RD_KAFKA_OP_DR:
+		/* Delivery report:
+		 * call application DR callback for each message. */
+		while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
+                        rd_kafka_message_t *rkmessage;
+
+			TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
+				     rkm, rkm_link);
+
+                        rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
+
+                        if (rk->rk_conf.dr_msg_cb) {
+                                rk->rk_conf.dr_msg_cb(rk, rkmessage,
+                                                      rk->rk_conf.opaque);
+
+                        } else {
+
+                                rk->rk_conf.dr_cb(rk,
+                                                  rkmessage->payload,
+                                                  rkmessage->len,
+                                                  rkmessage->err,
+                                                  rk->rk_conf.opaque,
+                                                  rkmessage->_private);
+                        }
+
+                        rd_kafka_msg_destroy(rk, rkm);
+
+                        if (unlikely(rd_kafka_yield_thread)) {
+                                /* Callback called yield(),
+                                 * re-enqueue the op (if there are any
+                                 * remaining messages). */
+                                if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
+                                                 rkmq_msgs))
+                                        rd_kafka_q_reenq(rkq, rko);
+                                else
+                                        rd_kafka_op_destroy(rko);
+                                return RD_KAFKA_OP_RES_YIELD;
+                        }
+		}
+
+		rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
+
+		break;
+
+	case RD_KAFKA_OP_THROTTLE:
+		if (rk->rk_conf.throttle_cb)
+			rk->rk_conf.throttle_

<TRUNCATED>