You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:15:03 UTC

[35/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka.c b/thirdparty/librdkafka-0.11.1/src/rdkafka.c
deleted file mode 100644
index 6867a6c..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka.c
+++ /dev/null
@@ -1,3392 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-#define _GNU_SOURCE
-#include <errno.h>
-#include <string.h>
-#include <stdarg.h>
-#include <signal.h>
-#include <stdlib.h>
-#include <sys/stat.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_cgrp.h"
-#include "rdkafka_assignor.h"
-#include "rdkafka_request.h"
-#include "rdkafka_event.h"
-#include "rdkafka_sasl.h"
-#include "rdkafka_interceptor.h"
-
-#include "rdtime.h"
-#include "crc32c.h"
-#include "rdunittest.h"
-
-#ifdef _MSC_VER
-#include <sys/types.h>
-#include <sys/timeb.h>
-#endif
-
-
-
-static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
-
-/**
- * @brief Global counter+lock for all active librdkafka instances
- */
-mtx_t rd_kafka_global_lock;
-int rd_kafka_global_cnt;
-
-
-/**
- * Last API error code, per thread.
- * Shared among all rd_kafka_t instances.
- */
-rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
-
-
-/**
- * Current number of threads created by rdkafka.
- * This is used in regression tests.
- */
-rd_atomic32_t rd_kafka_thread_cnt_curr;
-int rd_kafka_thread_cnt (void) {
-#if ENABLE_SHAREDPTR_DEBUG
-        rd_shared_ptrs_dump();
-#endif
-
-	return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
-}
-
-/**
- * Current thread's name (TLS)
- */
-char RD_TLS rd_kafka_thread_name[64] = "app";
-
-
-
-static void rd_kafka_global_init (void) {
-#if ENABLE_SHAREDPTR_DEBUG
-        LIST_INIT(&rd_shared_ptr_debug_list);
-        mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain);
-        atexit(rd_shared_ptrs_dump);
-#endif
-	mtx_init(&rd_kafka_global_lock, mtx_plain);
-#if ENABLE_DEVEL
-	rd_atomic32_init(&rd_kafka_op_cnt, 0);
-#endif
-        crc32c_global_init();
-}
-
-/**
- * @returns the current number of active librdkafka instances
- */
-static int rd_kafka_global_cnt_get (void) {
-	int r;
-	mtx_lock(&rd_kafka_global_lock);
-	r = rd_kafka_global_cnt;
-	mtx_unlock(&rd_kafka_global_lock);
-	return r;
-}
-
-
-/**
- * @brief Increase counter for active librdkafka instances.
- * If this is the first instance the global constructors will be called, if any.
- */
-static void rd_kafka_global_cnt_incr (void) {
-	mtx_lock(&rd_kafka_global_lock);
-	rd_kafka_global_cnt++;
-	if (rd_kafka_global_cnt == 1) {
-		rd_kafka_transport_init();
-#if WITH_SSL
-		rd_kafka_transport_ssl_init();
-#endif
-                rd_kafka_sasl_global_init();
-	}
-	mtx_unlock(&rd_kafka_global_lock);
-}
-
-/**
- * @brief Decrease counter for active librdkafka instances.
- * If this counter reaches 0 the global destructors will be called, if any.
- */
-static void rd_kafka_global_cnt_decr (void) {
-	mtx_lock(&rd_kafka_global_lock);
-	rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
-	rd_kafka_global_cnt--;
-	if (rd_kafka_global_cnt == 0) {
-                rd_kafka_sasl_global_term();
-#if WITH_SSL
-		rd_kafka_transport_ssl_term();
-#endif
-	}
-	mtx_unlock(&rd_kafka_global_lock);
-}
-
-
-/**
- * Wait for all rd_kafka_t objects to be destroyed.
- * Returns 0 if all kafka objects are now destroyed, or -1 if the
- * timeout was reached.
- */
-int rd_kafka_wait_destroyed (int timeout_ms) {
-	rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
-
-	while (rd_kafka_thread_cnt() > 0 ||
-	       rd_kafka_global_cnt_get() > 0) {
-		if (rd_clock() >= timeout) {
-			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
-						ETIMEDOUT);
-#if ENABLE_SHAREDPTR_DEBUG
-                        rd_shared_ptrs_dump();
-#endif
-			return -1;
-		}
-		rd_usleep(25000, NULL); /* 25ms */
-	}
-
-	return 0;
-}
-
-static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
-                              const rd_kafka_t *rk, int level, const char *fac,
-                              const char *buf) {
-        if (level > conf->log_level)
-                return;
-        else if (rk && conf->log_queue) {
-                rd_kafka_op_t *rko;
-
-                if (!rk->rk_logq)
-                        return; /* Terminating */
-
-                rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
-                rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
-                rko->rko_u.log.level = level;
-                strncpy(rko->rko_u.log.fac, fac,
-                        sizeof(rko->rko_u.log.fac) - 1);
-                rko->rko_u.log.str = rd_strdup(buf);
-                rd_kafka_q_enq(rk->rk_logq, rko);
-
-        } else if (conf->log_cb) {
-                conf->log_cb(rk, level, fac, buf);
-        }
-}
-
-/**
- * @brief Logger
- *
- * @remark conf must be set, but rk may be NULL
- */
-void rd_kafka_log0 (const rd_kafka_conf_t *conf,
-                    const rd_kafka_t *rk,
-                    const char *extra, int level,
-                    const char *fac, const char *fmt, ...) {
-	char buf[2048];
-	va_list ap;
-	unsigned int elen = 0;
-        unsigned int of = 0;
-
-	if (level > conf->log_level)
-		return;
-
-	if (conf->log_thread_name) {
-		elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
-				   rd_kafka_thread_name);
-		if (unlikely(elen >= sizeof(buf)))
-			elen = sizeof(buf);
-		of = elen;
-	}
-
-	if (extra) {
-		elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
-		if (unlikely(elen >= sizeof(buf)-of))
-			elen = sizeof(buf)-of;
-                of += elen;
-	}
-
-	va_start(ap, fmt);
-	rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
-	va_end(ap);
-
-        rd_kafka_log_buf(conf, rk, level, fac, buf);
-}
-
-
-
-void rd_kafka_log_print(const rd_kafka_t *rk, int level,
-	const char *fac, const char *buf) {
-	int secs, msecs;
-	struct timeval tv;
-	rd_gettimeofday(&tv, NULL);
-	secs = (int)tv.tv_sec;
-	msecs = (int)(tv.tv_usec / 1000);
-	fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
-		level, secs, msecs,
-		fac, rk ? rk->rk_name : "", buf);
-}
-
-#ifndef _MSC_VER
-void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
-			  const char *fac, const char *buf) {
-	static int initialized = 0;
-
-	if (!initialized)
-		openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);
-
-	syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
-}
-#endif
-
-void rd_kafka_set_logger (rd_kafka_t *rk,
-			  void (*func) (const rd_kafka_t *rk, int level,
-					const char *fac, const char *buf)) {
-	rk->rk_conf.log_cb = func;
-}
-
-void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
-	rk->rk_conf.log_level = level;
-}
-
-
-
-
-
-
-static const char *rd_kafka_type2str (rd_kafka_type_t type) {
-	static const char *types[] = {
-		[RD_KAFKA_PRODUCER] = "producer",
-		[RD_KAFKA_CONSUMER] = "consumer",
-	};
-	return types[type];
-}
-
-#define _ERR_DESC(ENUM,DESC) \
-	[ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC }
-
-static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
-	_ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
-		  "Local: Bad message format"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
-		  "Local: Invalid compressed data"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
-		  "Local: Broker handle destroyed"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
-		  "Local: Communication failure with broker"), //FIXME: too specific
-	_ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
-		  "Local: Broker transport failure"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
-		  "Local: Critical system resource failure"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
-		  "Local: Host resolution failure"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
-		  "Local: Message timed out"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
-		  "Broker: No more messages"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
-		  "Local: Unknown partition"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__FS,
-		  "Local: File or filesystem error"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
-		  "Local: Unknown topic"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
-		  "Local: All broker connections are down"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
-		  "Local: Invalid argument or configuration"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
-		  "Local: Timed out"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
-		  "Local: Queue full"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
-		  "Local: ISR count insufficient"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
-		  "Local: Broker node update"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
-		  "Local: SSL error"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
-		  "Local: Waiting for coordinator"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
-		  "Local: Unknown group"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
-		  "Local: Operation in progress"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
-		  "Local: Previous operation in progress"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
-		  "Local: Existing subscription"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
-		  "Local: Assign partitions"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
-		  "Local: Revoke partitions"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
-		  "Local: Conflicting use"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
-		  "Local: Erroneous state"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
-		  "Local: Unknown protocol"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
-		  "Local: Not implemented"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
-		  "Local: Authentication failure"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
-		  "Local: No offset stored"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
-		  "Local: Outdated"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
-		  "Local: Timed out in queue"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
-                  "Local: Required feature not supported by broker"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
-                  "Local: Awaiting cache update"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
-                  "Local: Operation interrupted"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
-                  "Local: Key serialization error"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
-                  "Local: Value serialization error"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
-                  "Local: Key deserialization error"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
-                  "Local: Value deserialization error"),
-
-	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
-		  "Unknown broker error"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
-		  "Success"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
-		  "Broker: Offset out of range"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
-		  "Broker: Invalid message"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-		  "Broker: Unknown topic or partition"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
-		  "Broker: Invalid message size"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
-		  "Broker: Leader not available"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
-		  "Broker: Not leader for partition"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
-		  "Broker: Request timed out"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
-		  "Broker: Broker not available"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
-		  "Broker: Replica not available"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
-		  "Broker: Message size too large"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
-		  "Broker: StaleControllerEpochCode"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
-		  "Broker: Offset metadata string too large"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
-		  "Broker: Broker disconnected before response received"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
-		  "Broker: Group coordinator load in progress"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
-		  "Broker: Group coordinator not available"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
-		  "Broker: Not coordinator for group"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
-		  "Broker: Invalid topic"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
-		  "Broker: Message batch larger than configured server "
-		  "segment size"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
-		  "Broker: Not enough in-sync replicas"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
-		  "Broker: Message(s) written to insufficient number of "
-		  "in-sync replicas"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
-		  "Broker: Invalid required acks value"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
-		  "Broker: Specified group generation id is not valid"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
-		  "Broker: Inconsistent group protocol"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
-		  "Broker: Invalid group.id"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
-		  "Broker: Unknown member"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
-		  "Broker: Invalid session timeout"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
-		  "Broker: Group rebalance in progress"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
-		  "Broker: Commit offset data size is not valid"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
-		  "Broker: Topic authorization failed"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
-		  "Broker: Group authorization failed"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
-		  "Broker: Cluster authorization failed"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
-		  "Broker: Invalid timestamp"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
-		  "Broker: Unsupported SASL mechanism"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
-		  "Broker: Request not valid in current SASL state"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
-		  "Broker: API version not supported"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
-		  "Broker: Topic already exists"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
-		  "Broker: Invalid number of partitions"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
-		  "Broker: Invalid replication factor"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
-		  "Broker: Invalid replica assignment"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
-		  "Broker: Configuration is invalid"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
-		  "Broker: Not controller for cluster"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
-		  "Broker: Invalid request"),
-	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
-		  "Broker: Message format on broker does not support request"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
-                  "Broker: Isolation policy volation"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
-                  "Broker: Broker received an out of order sequence number"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
-                  "Broker: Broker received a duplicate sequence number"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
-                  "Broker: Producer attempted an operation with an old epoch"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
-                  "Broker: Producer attempted a transactional operation in "
-                  "an invalid state"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
-                  "Broker: Producer attempted to use a producer id which is "
-                  "not currently assigned to its transactional id"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
-                  "Broker: Transaction timeout is larger than the maximum "
-                  "value allowed by the broker's max.transaction.timeout.ms"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
-                  "Broker: Producer attempted to update a transaction while "
-                  "another concurrent operation on the same transaction was "
-                  "ongoing"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
-                  "Broker: Indicates that the transaction coordinator sending "
-                  "a WriteTxnMarker is no longer the current coordinator for "
-                  "a given producer"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
-                  "Broker: Transactional Id authorization failed"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
-                  "Broker: Security features are disabled"),
-        _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
-                  "Broker: Operation not attempted"),
-
-	_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
-};
-
-
-void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
-			     size_t *cntp) {
-	*errdescs = rd_kafka_err_descs;
-	*cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
-}
-
-
-const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
-	static RD_TLS char ret[32];
-	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
-
-	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
-		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
-		     !rd_kafka_err_descs[idx].desc)) {
-		rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
-		return ret;
-	}
-
-	return rd_kafka_err_descs[idx].desc;
-}
-
-
-const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
-	static RD_TLS char ret[32];
-	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
-
-	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
-		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
-		     !rd_kafka_err_descs[idx].desc)) {
-		rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
-		return ret;
-	}
-
-	return rd_kafka_err_descs[idx].name;
-}
-
-
-rd_kafka_resp_err_t rd_kafka_last_error (void) {
-	return rd_kafka_last_error_code;
-}
-
-
-rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
-	switch (errnox)
-	{
-	case EINVAL:
-		return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
-        case EBUSY:
-                return RD_KAFKA_RESP_ERR__CONFLICT;
-
-	case ENOENT:
-		return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
-
-	case ESRCH:
-		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
-
-	case ETIMEDOUT:
-		return RD_KAFKA_RESP_ERR__TIMED_OUT;
-
-	case EMSGSIZE:
-		return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
-
-	case ENOBUFS:
-		return RD_KAFKA_RESP_ERR__QUEUE_FULL;
-
-	default:
-		return RD_KAFKA_RESP_ERR__FAIL;
-	}
-}
-
-
-
-/**
- * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
- *
- * @locality application thread
- */
-void rd_kafka_destroy_final (rd_kafka_t *rk) {
-
-        rd_kafka_assert(rk, rd_atomic32_get(&rk->rk_terminate) != 0);
-
-        /* Synchronize state */
-        rd_kafka_wrlock(rk);
-        rd_kafka_wrunlock(rk);
-
-        rd_kafka_assignors_term(rk);
-
-        rd_kafka_metadata_cache_destroy(rk);
-
-        rd_kafka_timers_destroy(&rk->rk_timers);
-
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
-
-        /* Destroy cgrp */
-        if (rk->rk_cgrp) {
-                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                             "Destroying cgrp");
-                /* Reset queue forwarding (rep -> cgrp) */
-                rd_kafka_q_fwd_set(rk->rk_rep, NULL);
-                rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
-        }
-
-	/* Purge op-queues */
-	rd_kafka_q_destroy(rk->rk_rep);
-	rd_kafka_q_destroy(rk->rk_ops);
-
-#if WITH_SSL
-	if (rk->rk_conf.ssl.ctx) {
-                rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
-                rd_kafka_transport_ssl_ctx_term(rk);
-        }
-#endif
-
-        /* It is not safe to log after this point. */
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                     "Termination done: freeing resources");
-
-        if (rk->rk_logq) {
-                rd_kafka_q_destroy(rk->rk_logq);
-                rk->rk_logq = NULL;
-        }
-
-        if (rk->rk_type == RD_KAFKA_PRODUCER) {
-		cnd_destroy(&rk->rk_curr_msgs.cnd);
-		mtx_destroy(&rk->rk_curr_msgs.lock);
-	}
-
-	cnd_destroy(&rk->rk_broker_state_change_cnd);
-	mtx_destroy(&rk->rk_broker_state_change_lock);
-
-	if (rk->rk_full_metadata)
-		rd_kafka_metadata_destroy(rk->rk_full_metadata);
-        rd_kafkap_str_destroy(rk->rk_client_id);
-        rd_kafkap_str_destroy(rk->rk_group_id);
-        rd_kafkap_str_destroy(rk->rk_eos.TransactionalId);
-	rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
-        rd_list_destroy(&rk->rk_broker_by_id);
-
-	rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes);
-	rwlock_destroy(&rk->rk_lock);
-
-	rd_free(rk);
-	rd_kafka_global_cnt_decr();
-}
-
-
-static void rd_kafka_destroy_app (rd_kafka_t *rk, int blocking) {
-        thrd_t thrd;
-#ifndef _MSC_VER
-	int term_sig = rk->rk_conf.term_sig;
-#endif
-        rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance");
-
-        /* The legacy/simple consumer lacks an API to close down the consumer*/
-        if (rk->rk_cgrp) {
-                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                             "Closing consumer group");
-                rd_kafka_consumer_close(rk);
-        }
-
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
-        rd_kafka_wrlock(rk);
-        thrd = rk->rk_thread;
-	rd_atomic32_add(&rk->rk_terminate, 1);
-        rd_kafka_timers_interrupt(&rk->rk_timers);
-        rd_kafka_wrunlock(rk);
-
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                     "Sending TERMINATE to main background thread");
-        /* Send op to trigger queue/io wake-up.
-         * The op itself is (likely) ignored by the receiver. */
-        rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
-
-	rd_kafka_brokers_broadcast_state_change(rk);
-
-#ifndef _MSC_VER
-        /* Interrupt main kafka thread to speed up termination. */
-	if (term_sig) {
-                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                             "Sending thread kill signal %d", term_sig);
-                pthread_kill(thrd, term_sig);
-        }
-#endif
-
-        if (!blocking)
-                return; /* FIXME: thread resource leak */
-
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                     "Joining main background thread");
-
-        if (thrd_join(thrd, NULL) != thrd_success)
-                rd_kafka_assert(NULL, !*"failed to join main thread");
-
-        rd_kafka_destroy_final(rk);
-}
-
-
-/* NOTE: Must only be called by application.
- *       librdkafka itself must use rd_kafka_destroy0(). */
-void rd_kafka_destroy (rd_kafka_t *rk) {
-        rd_kafka_destroy_app(rk, 1);
-}
-
-
-/**
- * Main destructor for rd_kafka_t
- *
- * Locality: rdkafka main thread or application thread during rd_kafka_new()
- */
-static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
-	rd_kafka_itopic_t *rkt, *rkt_tmp;
-	rd_kafka_broker_t *rkb, *rkb_tmp;
-        rd_list_t wait_thrds;
-        thrd_t *thrd;
-        int i;
-
-        rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
-
-        /* Call on_destroy() interceptors */
-        rd_kafka_interceptors_on_destroy(rk);
-
-	/* Brokers pick up on rk_terminate automatically. */
-
-        /* List of (broker) threads to join to synchronize termination */
-        rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
-
-	rd_kafka_wrlock(rk);
-
-        rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
-	/* Decommission all topics */
-	TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
-		rd_kafka_wrunlock(rk);
-		rd_kafka_topic_partitions_remove(rkt);
-		rd_kafka_wrlock(rk);
-	}
-
-        /* Decommission brokers.
-         * Broker thread holds a refcount and detects when broker refcounts
-         * reaches 1 and then decommissions itself. */
-        TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
-                /* Add broker's thread to wait_thrds list for later joining */
-                thrd = malloc(sizeof(*thrd));
-                *thrd = rkb->rkb_thread;
-                rd_list_add(&wait_thrds, thrd);
-                rd_kafka_wrunlock(rk);
-
-                /* Send op to trigger queue/io wake-up.
-                 * The op itself is (likely) ignored by the broker thread. */
-                rd_kafka_q_enq(rkb->rkb_ops,
-                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
-
-#ifndef _MSC_VER
-                /* Interrupt IO threads to speed up termination. */
-                if (rk->rk_conf.term_sig)
-			pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
-#endif
-
-                rd_kafka_broker_destroy(rkb);
-
-                rd_kafka_wrlock(rk);
-        }
-
-        if (rk->rk_clusterid) {
-                rd_free(rk->rk_clusterid);
-                rk->rk_clusterid = NULL;
-        }
-
-        rd_kafka_wrunlock(rk);
-
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                     "Purging reply queue");
-
-	/* Purge op-queue */
-        rd_kafka_q_disable(rk->rk_rep);
-	rd_kafka_q_purge(rk->rk_rep);
-
-	/* Loose our special reference to the internal broker. */
-        mtx_lock(&rk->rk_internal_rkb_lock);
-	if ((rkb = rk->rk_internal_rkb)) {
-                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                             "Decommissioning internal broker");
-
-                /* Send op to trigger queue wake-up. */
-                rd_kafka_q_enq(rkb->rkb_ops,
-                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
-
-                rk->rk_internal_rkb = NULL;
-                thrd = malloc(sizeof(*thrd));
-                *thrd = rkb->rkb_thread;
-                rd_list_add(&wait_thrds, thrd);
-        }
-        mtx_unlock(&rk->rk_internal_rkb_lock);
-	if (rkb)
-		rd_kafka_broker_destroy(rkb);
-
-
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                     "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));
-
-        /* Join broker threads */
-        RD_LIST_FOREACH(thrd, &wait_thrds, i) {
-                if (thrd_join(*thrd, NULL) != thrd_success)
-                        ;
-                free(thrd);
-        }
-
-        rd_list_destroy(&wait_thrds);
-}
-
-
-/* Stats buffer printf */
-#define _st_printf(...) do {					\
-		ssize_t r;					\
-		ssize_t rem = size-of;				\
-		r = rd_snprintf(buf+of, rem, __VA_ARGS__);	\
-		if (r >= rem) {					\
-			size *= 2;				\
-			rem = size-of;				\
-			buf = rd_realloc(buf, size);		\
-			r = rd_snprintf(buf+of, rem, __VA_ARGS__);	\
-		}						\
-		of += r;					\
-	} while (0)
-
-/**
- * Emit stats for toppar
- */
-static RD_INLINE void rd_kafka_stats_emit_toppar (char **bufp, size_t *sizep,
-					       size_t *ofp,
-					       rd_kafka_toppar_t *rktp,
-					       int first) {
-	char *buf = *bufp;
-	size_t size = *sizep;
-	size_t of = *ofp;
-        int64_t consumer_lag = -1;
-        struct offset_stats offs;
-        int32_t leader_nodeid = -1;
-
-        rd_kafka_toppar_lock(rktp);
-
-        if (rktp->rktp_leader) {
-                rd_kafka_broker_lock(rktp->rktp_leader);
-                leader_nodeid = rktp->rktp_leader->rkb_nodeid;
-                rd_kafka_broker_unlock(rktp->rktp_leader);
-        }
-
-        /* Grab a copy of the latest finalized offset stats */
-        offs = rktp->rktp_offsets_fin;
-
-        if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID &&
-            rktp->rktp_app_offset >= 0) {
-                if (unlikely(rktp->rktp_app_offset > rktp->rktp_hi_offset))
-                        consumer_lag = 0;
-                else
-                        consumer_lag = rktp->rktp_hi_offset -
-                                rktp->rktp_app_offset;
-        }
-
-	_st_printf("%s\"%"PRId32"\": { "
-		   "\"partition\":%"PRId32", "
-		   "\"leader\":%"PRId32", "
-		   "\"desired\":%s, "
-		   "\"unknown\":%s, "
-		   "\"msgq_cnt\":%i, "
-		   "\"msgq_bytes\":%"PRIu64", "
-		   "\"xmit_msgq_cnt\":%i, "
-		   "\"xmit_msgq_bytes\":%"PRIu64", "
-		   "\"fetchq_cnt\":%i, "
-		   "\"fetchq_size\":%"PRIu64", "
-		   "\"fetch_state\":\"%s\", "
-		   "\"query_offset\":%"PRId64", "
-		   "\"next_offset\":%"PRId64", "
-		   "\"app_offset\":%"PRId64", "
-		   "\"stored_offset\":%"PRId64", "
-		   "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
-		   "\"committed_offset\":%"PRId64", "
-		   "\"eof_offset\":%"PRId64", "
-		   "\"lo_offset\":%"PRId64", "
-		   "\"hi_offset\":%"PRId64", "
-                   "\"consumer_lag\":%"PRId64", "
-		   "\"txmsgs\":%"PRIu64", "
-		   "\"txbytes\":%"PRIu64", "
-                   "\"msgs\": %"PRIu64", "
-                   "\"rx_ver_drops\": %"PRIu64" "
-		   "} ",
-		   first ? "" : ", ",
-		   rktp->rktp_partition,
-		   rktp->rktp_partition,
-                   leader_nodeid,
-		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
-		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
-		   rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt),
-		   rd_atomic64_get(&rktp->rktp_msgq.rkmq_msg_bytes),
-		   rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt),
-		   rd_atomic64_get(&rktp->rktp_xmit_msgq.rkmq_msg_bytes),
-		   rd_kafka_q_len(rktp->rktp_fetchq),
-		   rd_kafka_q_size(rktp->rktp_fetchq),
-		   rd_kafka_fetch_states[rktp->rktp_fetch_state],
-		   rktp->rktp_query_offset,
-                   offs.fetch_offset,
-		   rktp->rktp_app_offset,
-		   rktp->rktp_stored_offset,
-		   rktp->rktp_committed_offset, /* FIXME: issue #80 */
-		   rktp->rktp_committed_offset,
-                   offs.eof_offset,
-		   rktp->rktp_lo_offset,
-		   rktp->rktp_hi_offset,
-                   consumer_lag,
-                   rd_atomic64_get(&rktp->rktp_c.tx_msgs),
-		   rd_atomic64_get(&rktp->rktp_c.tx_bytes),
-		   rd_atomic64_get(&rktp->rktp_c.msgs),
-                   rd_atomic64_get(&rktp->rktp_c.rx_ver_drops));
-
-        rd_kafka_toppar_unlock(rktp);
-
-	*bufp = buf;
-	*sizep = size;
-	*ofp = of;
-}
-
-/**
- * Emit all statistics
- */
-static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
-	char  *buf;
-	size_t size = 1024*10;
-	size_t of = 0;
-	rd_kafka_broker_t *rkb;
-	rd_kafka_itopic_t *rkt;
-	shptr_rd_kafka_toppar_t *s_rktp;
-	rd_ts_t now;
-	rd_kafka_op_t *rko;
-	unsigned int tot_cnt;
-	size_t tot_size;
-
-	buf = rd_malloc(size);
-
-
-	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
-	rd_kafka_rdlock(rk);
-
-	now = rd_clock();
-	_st_printf("{ "
-                   "\"name\": \"%s\", "
-                   "\"type\": \"%s\", "
-		   "\"ts\":%"PRId64", "
-		   "\"time\":%lli, "
-		   "\"replyq\":%i, "
-                   "\"msg_cnt\":%u, "
-		   "\"msg_size\":%"PRIusz", "
-                   "\"msg_max\":%u, "
-		   "\"msg_size_max\":%"PRIusz", "
-                   "\"simple_cnt\":%i, "
-                   "\"metadata_cache_cnt\":%i, "
-		   "\"brokers\":{ "/*open brokers*/,
-                   rk->rk_name,
-                   rd_kafka_type2str(rk->rk_type),
-		   now,
-		   (signed long long)time(NULL),
-		   rd_kafka_q_len(rk->rk_rep),
-		   tot_cnt, tot_size,
-		   rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
-                   rd_atomic32_get(&rk->rk_simple_cnt),
-                   rk->rk_metadata_cache.rkmc_cnt);
-
-
-	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
-		rd_avg_t rtt, throttle, int_latency;
-		rd_kafka_toppar_t *rktp;
-
-		rd_kafka_broker_lock(rkb);
-		rd_avg_rollover(&int_latency, &rkb->rkb_avg_int_latency);
-		rd_avg_rollover(&rtt, &rkb->rkb_avg_rtt);
-		rd_avg_rollover(&throttle, &rkb->rkb_avg_throttle);
-		_st_printf("%s\"%s\": { "/*open broker*/
-			   "\"name\":\"%s\", "
-			   "\"nodeid\":%"PRId32", "
-			   "\"state\":\"%s\", "
-                           "\"stateage\":%"PRId64", "
-			   "\"outbuf_cnt\":%i, "
-			   "\"outbuf_msg_cnt\":%i, "
-			   "\"waitresp_cnt\":%i, "
-			   "\"waitresp_msg_cnt\":%i, "
-			   "\"tx\":%"PRIu64", "
-			   "\"txbytes\":%"PRIu64", "
-			   "\"txerrs\":%"PRIu64", "
-			   "\"txretries\":%"PRIu64", "
-			   "\"req_timeouts\":%"PRIu64", "
-			   "\"rx\":%"PRIu64", "
-			   "\"rxbytes\":%"PRIu64", "
-			   "\"rxerrs\":%"PRIu64", "
-                           "\"rxcorriderrs\":%"PRIu64", "
-                           "\"rxpartial\":%"PRIu64", "
-                           "\"zbuf_grow\":%"PRIu64", "
-                           "\"buf_grow\":%"PRIu64", "
-                           "\"wakeups\":%"PRIu64", "
-			   "\"int_latency\": {"
-			   " \"min\":%"PRId64","
-			   " \"max\":%"PRId64","
-			   " \"avg\":%"PRId64","
-			   " \"sum\":%"PRId64","
-			   " \"cnt\":%i "
-			   "}, "
-			   "\"rtt\": {"
-			   " \"min\":%"PRId64","
-			   " \"max\":%"PRId64","
-			   " \"avg\":%"PRId64","
-			   " \"sum\":%"PRId64","
-			   " \"cnt\":%i "
-			   "}, "
-			   "\"throttle\": {"
-			   " \"min\":%"PRId64","
-			   " \"max\":%"PRId64","
-			   " \"avg\":%"PRId64","
-			   " \"sum\":%"PRId64","
-			   " \"cnt\":%i "
-			   "}, "
-			   "\"toppars\":{ "/*open toppars*/,
-			   rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
-			   rkb->rkb_name,
-			   rkb->rkb_name,
-			   rkb->rkb_nodeid,
-			   rd_kafka_broker_state_names[rkb->rkb_state],
-                           rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
-			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
-			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
-			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
-			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
-			   rd_atomic64_get(&rkb->rkb_c.tx),
-			   rd_atomic64_get(&rkb->rkb_c.tx_bytes),
-			   rd_atomic64_get(&rkb->rkb_c.tx_err),
-			   rd_atomic64_get(&rkb->rkb_c.tx_retries),
-			   rd_atomic64_get(&rkb->rkb_c.req_timeouts),
-			   rd_atomic64_get(&rkb->rkb_c.rx),
-			   rd_atomic64_get(&rkb->rkb_c.rx_bytes),
-			   rd_atomic64_get(&rkb->rkb_c.rx_err),
-			   rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
-			   rd_atomic64_get(&rkb->rkb_c.rx_partial),
-                           rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
-                           rd_atomic64_get(&rkb->rkb_c.buf_grow),
-                           rd_atomic64_get(&rkb->rkb_c.wakeups),
-			   int_latency.ra_v.minv,
-			   int_latency.ra_v.maxv,
-			   int_latency.ra_v.avg,
-			   int_latency.ra_v.sum,
-			   int_latency.ra_v.cnt,
-			   rtt.ra_v.minv,
-			   rtt.ra_v.maxv,
-			   rtt.ra_v.avg,
-			   rtt.ra_v.sum,
-			   rtt.ra_v.cnt,
-			   throttle.ra_v.minv,
-			   throttle.ra_v.maxv,
-			   throttle.ra_v.avg,
-			   throttle.ra_v.sum,
-			   throttle.ra_v.cnt);
-
-		TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
-			_st_printf("%s\"%.*s-%"PRId32"\": { "
-				   "\"topic\":\"%.*s\", "
-				   "\"partition\":%"PRId32"} ",
-				   rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
-				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-                                   rktp->rktp_partition,
-				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-				   rktp->rktp_partition);
-		}
-
-		rd_kafka_broker_unlock(rkb);
-
-		_st_printf("} "/*close toppars*/
-			   "} "/*close broker*/);
-	}
-
-
-	_st_printf("}, " /* close "brokers" array */
-		   "\"topics\":{ ");
-
-	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
-		int i, j;
-
-		rd_kafka_topic_rdlock(rkt);
-		_st_printf("%s\"%.*s\": { "
-			   "\"topic\":\"%.*s\", "
-			   "\"metadata_age\":%"PRId64", "
-			   "\"partitions\":{ " /*open partitions*/,
-			   rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
-			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
-			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
-			   rkt->rkt_ts_metadata ?
-			   (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0);
-
-		for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
-			rd_kafka_stats_emit_toppar(&buf, &size, &of,
-						   rd_kafka_toppar_s2i(rkt->rkt_p[i]),
-						   i == 0);
-
-                RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j)
-			rd_kafka_stats_emit_toppar(&buf, &size, &of,
-						   rd_kafka_toppar_s2i(s_rktp),
-						   i+j == 0);
-
-                i += j;
-
-		if (rkt->rkt_ua)
-			rd_kafka_stats_emit_toppar(&buf, &size, &of,
-						   rd_kafka_toppar_s2i(rkt->rkt_ua),
-                                                   i++ == 0);
-		rd_kafka_topic_rdunlock(rkt);
-
-		_st_printf("} "/*close partitions*/
-			   "} "/*close topic*/);
-
-	}
-	_st_printf("} "/*close topics*/);
-
-        if (rk->rk_cgrp) {
-                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
-                _st_printf(", \"cgrp\": { "
-                           "\"rebalance_age\": %"PRId64", "
-                           "\"rebalance_cnt\": %d, "
-                           "\"assignment_size\": %d }",
-                           rkcg->rkcg_c.ts_rebalance ?
-                           (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
-                           rkcg->rkcg_c.rebalance_cnt,
-                           rkcg->rkcg_c.assignment_size);
-        }
-	rd_kafka_rdunlock(rk);
-
-        _st_printf("}"/*close object*/);
-
-
-	/* Enqueue op for application */
-	rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
-        rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
-	rko->rko_u.stats.json = buf;
-	rko->rko_u.stats.json_len = of;
-	rd_kafka_q_enq(rk->rk_rep, rko);
-}
-
-
-
-static void rd_kafka_topic_scan_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
-        rd_kafka_t *rk = rkts->rkts_rk;
-	rd_kafka_topic_scan_all(rk, rd_clock());
-}
-
-static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
-        rd_kafka_t *rk = rkts->rkts_rk;
-	rd_kafka_stats_emit_all(rk);
-}
-
-
-/**
- * @brief Periodic metadata refresh callback
- *
- * @locality rdkafka main thread
- */
-static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
-        rd_kafka_t *rk = rkts->rkts_rk;
-        int sparse = 1;
-
-        /* Dont do sparse requests if there is a consumer group with an
-         * active subscription since subscriptions need to be able to match
-         * on all topics. */
-        if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp &&
-            rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
-                sparse = 0;
-
-        if (sparse)
-                rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
-                                                       "periodic refresh");
-        else
-                rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh");
-}
-
-
-/**
- * Main loop for Kafka handler thread.
- */
-static int rd_kafka_thread_main (void *arg) {
-        rd_kafka_t *rk = arg;
-	rd_kafka_timer_t tmr_topic_scan = RD_ZERO_INIT;
-	rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
-	rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
-
-        rd_snprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), "main");
-
-	(void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
-
-	/* Acquire lock (which was held by thread creator during creation)
-	 * to synchronise state. */
-	rd_kafka_wrlock(rk);
-	rd_kafka_wrunlock(rk);
-
-	rd_kafka_timer_start(&rk->rk_timers, &tmr_topic_scan, 1000000,
-			     rd_kafka_topic_scan_tmr_cb, NULL);
-	rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
-			     rk->rk_conf.stats_interval_ms * 1000ll,
-			     rd_kafka_stats_emit_tmr_cb, NULL);
-        if (rk->rk_conf.metadata_refresh_interval_ms > 0)
-                rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
-                                     rk->rk_conf.metadata_refresh_interval_ms *
-                                     1000ll,
-                                     rd_kafka_metadata_refresh_cb, NULL);
-
-        if (rk->rk_cgrp) {
-                rd_kafka_cgrp_reassign_broker(rk->rk_cgrp);
-                rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
-        }
-
-	while (likely(!rd_kafka_terminating(rk) ||
-		      rd_kafka_q_len(rk->rk_ops))) {
-                rd_ts_t sleeptime = rd_kafka_timers_next(
-                        &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
-                rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
-                                 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
-		if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
-			rd_kafka_cgrp_serve(rk->rk_cgrp);
-		rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
-	}
-
-	rd_kafka_q_disable(rk->rk_ops);
-	rd_kafka_q_purge(rk->rk_ops);
-
-        rd_kafka_timer_stop(&rk->rk_timers, &tmr_topic_scan, 1);
-        rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
-        rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
-
-        /* Synchronise state */
-        rd_kafka_wrlock(rk);
-        rd_kafka_wrunlock(rk);
-
-        rd_kafka_destroy_internal(rk);
-
-        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
-                     "Main background thread exiting");
-
-	rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
-
-	return 0;
-}
-
-
-static void rd_kafka_term_sig_handler (int sig) {
-	/* nop */
-}
-
-
-rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
-			  char *errstr, size_t errstr_size) {
-	rd_kafka_t *rk;
-	static rd_atomic32_t rkid;
-        rd_kafka_conf_t *conf;
-        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
-        int ret_errno = 0;
-#ifndef _MSC_VER
-        sigset_t newset, oldset;
-#endif
-
-	call_once(&rd_kafka_global_init_once, rd_kafka_global_init);
-
-        /* rd_kafka_new() takes ownership of the provided \p app_conf
-         * object if rd_kafka_new() succeeds.
-         * Since \p app_conf is optional we allocate a default configuration
-         * object here if \p app_conf is NULL.
-         * The configuration object itself is struct-copied later
-         * leaving the default *conf pointer to be ready for freeing.
-         * In case new() fails and app_conf was specified we will clear out
-         * rk_conf to avoid double-freeing from destroy_internal() and the
-         * user's eventual call to rd_kafka_conf_destroy().
-         * This is all a bit tricky but that's the nature of
-         * legacy interfaces. */
-        if (!app_conf)
-                conf = rd_kafka_conf_new();
-        else
-                conf = app_conf;
-
-        /* Verify mandatory configuration */
-        if (!conf->socket_cb) {
-                rd_snprintf(errstr, errstr_size,
-                            "Mandatory config property 'socket_cb' not set");
-                if (!app_conf)
-                        rd_kafka_conf_destroy(conf);
-                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
-                return NULL;
-        }
-
-        if (!conf->open_cb) {
-                rd_snprintf(errstr, errstr_size,
-                            "Mandatory config property 'open_cb' not set");
-                if (!app_conf)
-                        rd_kafka_conf_destroy(conf);
-                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
-                return NULL;
-        }
-
-        if (conf->metadata_max_age_ms == -1) {
-                if (conf->metadata_refresh_interval_ms > 0)
-                        conf->metadata_max_age_ms =
-                                conf->metadata_refresh_interval_ms * 3;
-                else /* use default value of refresh * 3 */
-                        conf->metadata_max_age_ms = 5*60*1000 * 3;
-        }
-
-	rd_kafka_global_cnt_incr();
-
-	/*
-	 * Set up the handle.
-	 */
-	rk = rd_calloc(1, sizeof(*rk));
-
-	rk->rk_type = type;
-
-        /* Struct-copy the config object. */
-	rk->rk_conf = *conf;
-        if (!app_conf)
-                rd_free(conf); /* Free the base config struct only,
-                                * not its fields since they were copied to
-                                * rk_conf just above. Those fields are
-                                * freed from rd_kafka_destroy_internal()
-                                * as the rk itself is destroyed. */
-
-        /* Call on_new() interceptors */
-        rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
-
-	rwlock_init(&rk->rk_lock);
-        mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
-
-	cnd_init(&rk->rk_broker_state_change_cnd);
-	mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
-
-	rk->rk_rep = rd_kafka_q_new(rk);
-	rk->rk_ops = rd_kafka_q_new(rk);
-        rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
-        rk->rk_ops->rkq_opaque = rk;
-
-        if (rk->rk_conf.log_queue) {
-                rk->rk_logq = rd_kafka_q_new(rk);
-                rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
-                rk->rk_logq->rkq_opaque = rk;
-        }
-
-	TAILQ_INIT(&rk->rk_brokers);
-	TAILQ_INIT(&rk->rk_topics);
-        rd_kafka_timers_init(&rk->rk_timers, rk);
-        rd_kafka_metadata_cache_init(rk);
-
-	if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
-		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
-	if (rk->rk_conf.rebalance_cb)
-		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
-	if (rk->rk_conf.offset_commit_cb)
-		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
-
-	/* Convenience Kafka protocol null bytes */
-	rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0);
-
-	if (rk->rk_conf.debug)
-                rk->rk_conf.log_level = LOG_DEBUG;
-
-	rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
-                    rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
-                    rd_atomic32_add(&rkid, 1));
-
-	/* Construct clientid kafka string */
-	rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);
-
-        /* Convert group.id to kafka string (may be NULL) */
-        rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);
-
-        /* Config fixups */
-        rk->rk_conf.queued_max_msg_bytes =
-                (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
-
-	/* Enable api.version.request=true if fallback.broker.version
-	 * indicates a supporting broker. */
-	if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
-		rk->rk_conf.api_version_request = 1;
-
-	if (rk->rk_type == RD_KAFKA_PRODUCER) {
-		mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
-		cnd_init(&rk->rk_curr_msgs.cnd);
-		rk->rk_curr_msgs.max_cnt =
-			rk->rk_conf.queue_buffering_max_msgs;
-                if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 >
-                    (unsigned long long)SIZE_MAX)
-                        rk->rk_curr_msgs.max_size = SIZE_MAX;
-                else
-                        rk->rk_curr_msgs.max_size =
-                        (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024;
-	}
-
-        if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
-                ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
-                ret_errno = EINVAL;
-                goto fail;
-        }
-
-        if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
-            rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
-                if (rd_kafka_sasl_select_provider(rk,
-                                                  errstr, errstr_size) == -1) {
-                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
-                        ret_errno = EINVAL;
-                        goto fail;
-                }
-        }
-
-#if WITH_SSL
-	if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
-	    rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
-		/* Create SSL context */
-		if (rd_kafka_transport_ssl_ctx_init(rk, errstr,
-						    errstr_size) == -1) {
-                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
-                        ret_errno = EINVAL;
-                        goto fail;
-                }
-        }
-#endif
-
-	/* Client group, eligible both in consumer and producer mode. */
-        if (type == RD_KAFKA_CONSUMER &&
-	    RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
-                rk->rk_cgrp = rd_kafka_cgrp_new(rk,
-                                                rk->rk_group_id,
-                                                rk->rk_client_id);
-
-
-
-#ifndef _MSC_VER
-        /* Block all signals in newly created thread.
-         * To avoid race condition we block all signals in the calling
-         * thread, which the new thread will inherit its sigmask from,
-         * and then restore the original sigmask of the calling thread when
-         * we're done creating the thread. */
-        sigemptyset(&oldset);
-        sigfillset(&newset);
-	if (rk->rk_conf.term_sig) {
-		struct sigaction sa_term = {
-			.sa_handler = rd_kafka_term_sig_handler
-		};
-		sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
-	}
-        pthread_sigmask(SIG_SETMASK, &newset, &oldset);
-#endif
-
-	/* Lock handle here to synchronise state, i.e., hold off
-	 * the thread until we've finalized the handle. */
-	rd_kafka_wrlock(rk);
-
-	/* Create handler thread */
-	if ((thrd_create(&rk->rk_thread,
-			 rd_kafka_thread_main, rk)) != thrd_success) {
-                ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
-                ret_errno = errno;
-		if (errstr)
-			rd_snprintf(errstr, errstr_size,
-				    "Failed to create thread: %s (%i)",
-				    rd_strerror(errno), errno);
-		rd_kafka_wrunlock(rk);
-#ifndef _MSC_VER
-                /* Restore sigmask of caller */
-                pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-#endif
-                goto fail;
-        }
-
-        rd_kafka_wrunlock(rk);
-
-        rk->rk_eos.PID = -1;
-        rk->rk_eos.TransactionalId = rd_kafkap_str_new(NULL, 0);
-
-        mtx_lock(&rk->rk_internal_rkb_lock);
-	rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
-						  RD_KAFKA_PROTO_PLAINTEXT,
-						  "", 0, RD_KAFKA_NODEID_UA);
-        mtx_unlock(&rk->rk_internal_rkb_lock);
-
-	/* Add initial list of brokers from configuration */
-	if (rk->rk_conf.brokerlist) {
-		if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
-			rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
-					"No brokers configured");
-	}
-
-#ifndef _MSC_VER
-	/* Restore sigmask of caller */
-	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-#endif
-
-        /* Free user supplied conf's base pointer on success,
-         * but not the actual allocated fields since the struct
-         * will have been copied in its entirety above. */
-        if (app_conf)
-                rd_free(app_conf);
-	rd_kafka_set_last_error(0, 0);
-
-        rk->rk_initialized = 1;
-
-	return rk;
-
-fail:
-        /*
-         * Error out and clean up
-         */
-
-        /* If on_new() interceptors have been called we also need
-         * to allow interceptor clean-up by calling on_destroy() */
-        rd_kafka_interceptors_on_destroy(rk);
-
-        /* If rk_conf is a struct-copy of the application configuration
-         * we need to avoid rk_conf fields from being freed from
-         * rd_kafka_destroy_internal() since they belong to app_conf.
-         * However, there are some internal fields, such as interceptors,
-         * that belong to rk_conf and thus needs to be cleaned up.
-         * Legacy APIs, sigh.. */
-        if (app_conf) {
-                rd_kafka_assignors_term(rk);
-                rd_kafka_interceptors_destroy(&rk->rk_conf);
-                memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
-        }
-
-        rd_atomic32_add(&rk->rk_terminate, 1);
-        rd_kafka_destroy_internal(rk);
-        rd_kafka_destroy_final(rk);
-
-        rd_kafka_set_last_error(ret_err, ret_errno);
-
-        return NULL;
-}
-
-
-
-
-
-/**
- * Produce a single message.
- * Locality: any application thread
- */
-int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
-		      int msgflags,
-		      void *payload, size_t len,
-		      const void *key, size_t keylen,
-		      void *msg_opaque) {
-	return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
-				msgflags, payload, len,
-				key, keylen, msg_opaque);
-}
-
-
-/**
- * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
- * friends) since it does not have an API for stopping the cgrp we will need to
- * sort that out automatically in the background when all consumption
- * has stopped.
- *
- * Returns 0 if a  High level consumer is already instantiated
- * which means a Simple consumer cannot co-operate with it, else 1.
- *
- * A rd_kafka_t handle can never migrate from simple to high-level, or
- * vice versa, so we dont need a ..consumer_del().
- */
-int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
-        if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
-                return 0;
-
-        return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
-}
-
-
-
-
-/**
- * rktp fetch is split up in these parts:
- *   * application side:
- *   * broker side (handled by current leader broker thread for rktp):
- *          - the fetch state, initial offset, etc.
- *          - fetching messages, updating fetched offset, etc.
- *          - offset commits
- *
- * Communication between the two are:
- *    app side -> rdkafka main side: rktp_ops
- *    broker thread -> app side: rktp_fetchq
- *
- * There is no shared state between these threads, instead
- * state is communicated through the two op queues, and state synchronization
- * is performed by version barriers.
- *
- */
-
-static RD_UNUSED
-int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
-				    int64_t offset, rd_kafka_q_t *rkq) {
-	shptr_rd_kafka_toppar_t *s_rktp;
-
-	if (partition < 0) {
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
-					ESRCH);
-		return -1;
-	}
-
-        if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
-                return -1;
-        }
-
-	rd_kafka_topic_wrlock(rkt);
-	s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
-	rd_kafka_topic_wrunlock(rkt);
-
-        /* Verify offset */
-	if (offset == RD_KAFKA_OFFSET_BEGINNING ||
-	    offset == RD_KAFKA_OFFSET_END ||
-            offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
-                /* logical offsets */
-
-	} else if (offset == RD_KAFKA_OFFSET_STORED) {
-		/* offset manager */
-
-                if (rkt->rkt_conf.offset_store_method ==
-                    RD_KAFKA_OFFSET_METHOD_BROKER &&
-                    RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
-                        /* Broker based offsets require a group id. */
-                        rd_kafka_toppar_destroy(s_rktp);
-			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
-						EINVAL);
-                        return -1;
-                }
-
-	} else if (offset < 0) {
-		rd_kafka_toppar_destroy(s_rktp);
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
-					EINVAL);
-		return -1;
-
-        }
-
-        rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
-				       rkq, RD_KAFKA_NO_REPLYQ);
-
-        rd_kafka_toppar_destroy(s_rktp);
-
-	rd_kafka_set_last_error(0, 0);
-	return 0;
-}
-
-
-
-
-int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
-			    int64_t offset) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
-                     "Start consuming partition %"PRId32,partition);
- 	return rd_kafka_consume_start0(rkt, partition, offset, NULL);
-}
-
-int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
-				  int64_t offset, rd_kafka_queue_t *rkqu) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-
- 	return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
-}
-
-
-
-
-static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
-        rd_kafka_q_t *tmpq = NULL;
-        rd_kafka_resp_err_t err;
-
-        rd_kafka_topic_wrlock(rktp->rktp_rkt);
-        rd_kafka_toppar_lock(rktp);
-	rd_kafka_toppar_desired_del(rktp);
-        rd_kafka_toppar_unlock(rktp);
-	rd_kafka_topic_wrunlock(rktp->rktp_rkt);
-
-        tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
-
-        rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
-
-        /* Synchronisation: Wait for stop reply from broker thread */
-        err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
-        rd_kafka_q_destroy(tmpq);
-
-	rd_kafka_set_last_error(err, err ? EINVAL : 0);
-
-	return err ? -1 : 0;
-}
-
-
-int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-	shptr_rd_kafka_toppar_t *s_rktp;
-        int r;
-
-	if (partition == RD_KAFKA_PARTITION_UA) {
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
-		return -1;
-	}
-
-	rd_kafka_topic_wrlock(rkt);
-	if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
-	    !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
-		rd_kafka_topic_wrunlock(rkt);
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
-					ESRCH);
-		return -1;
-	}
-        rd_kafka_topic_wrunlock(rkt);
-
-        r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp));
-	/* set_last_error() called by stop0() */
-
-        rd_kafka_toppar_destroy(s_rktp);
-
-        return r;
-}
-
-
-
-rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
-                                   int32_t partition,
-                                   int64_t offset,
-                                   int timeout_ms) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-        shptr_rd_kafka_toppar_t *s_rktp;
-	rd_kafka_toppar_t *rktp;
-        rd_kafka_q_t *tmpq = NULL;
-        rd_kafka_resp_err_t err;
-
-        /* FIXME: simple consumer check */
-
-	if (partition == RD_KAFKA_PARTITION_UA)
-                return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
-	rd_kafka_topic_rdlock(rkt);
-	if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
-	    !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
-		rd_kafka_topic_rdunlock(rkt);
-                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
-	}
-	rd_kafka_topic_rdunlock(rkt);
-
-        if (timeout_ms)
-                tmpq = rd_kafka_q_new(rkt->rkt_rk);
-
-        rktp = rd_kafka_toppar_s2i(s_rktp);
-        if ((err = rd_kafka_toppar_op_seek(rktp, offset,
-					   RD_KAFKA_REPLYQ(tmpq, 0)))) {
-                if (tmpq)
-                        rd_kafka_q_destroy(tmpq);
-                rd_kafka_toppar_destroy(s_rktp);
-                return err;
-        }
-
-	rd_kafka_toppar_destroy(s_rktp);
-
-        if (tmpq) {
-                err = rd_kafka_q_wait_result(tmpq, timeout_ms);
-                rd_kafka_q_destroy(tmpq);
-                return err;
-        }
-
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
-					int timeout_ms,
-					rd_kafka_message_t **rkmessages,
-					size_t rkmessages_size) {
-	/* Populate application's rkmessages array. */
-	return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
-					   rkmessages, rkmessages_size);
-}
-
-
-ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
-				int timeout_ms,
-				rd_kafka_message_t **rkmessages,
-				size_t rkmessages_size) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-	shptr_rd_kafka_toppar_t *s_rktp;
-        rd_kafka_toppar_t *rktp;
-	ssize_t cnt;
-
-	/* Get toppar */
-	rd_kafka_topic_rdlock(rkt);
-	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
-	if (unlikely(!s_rktp))
-		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
-	rd_kafka_topic_rdunlock(rkt);
-
-	if (unlikely(!s_rktp)) {
-		/* No such toppar known */
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
-					ESRCH);
-		return -1;
-	}
-
-        rktp = rd_kafka_toppar_s2i(s_rktp);
-
-	/* Populate application's rkmessages array. */
-	cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
-					  rkmessages, rkmessages_size);
-
-	rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
-
-	rd_kafka_set_last_error(0, 0);
-
-	return cnt;
-}
-
-ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
-				      int timeout_ms,
-				      rd_kafka_message_t **rkmessages,
-				      size_t rkmessages_size) {
-	/* Populate application's rkmessages array. */
-	return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
-				       rkmessages, rkmessages_size);
-}
-
-
-struct consume_ctx {
-	void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
-	void *opaque;
-};
-
-
-/**
- * Trampoline for application's consume_cb()
- */
-static rd_kafka_op_res_t
-rd_kafka_consume_cb (rd_kafka_t *rk,
-                     rd_kafka_q_t *rkq,
-                     rd_kafka_op_t *rko,
-                     rd_kafka_q_cb_type_t cb_type, void *opaque) {
-	struct consume_ctx *ctx = opaque;
-	rd_kafka_message_t *rkmessage;
-
-        if (unlikely(rd_kafka_op_version_outdated(rko, 0))) {
-                rd_kafka_op_destroy(rko);
-                return RD_KAFKA_OP_RES_HANDLED;
-        }
-
-	rkmessage = rd_kafka_message_get(rko);
-
-	rd_kafka_op_offset_store(rk, rko, rkmessage);
-
-	ctx->consume_cb(rkmessage, ctx->opaque);
-
-        rd_kafka_op_destroy(rko);
-
-        return RD_KAFKA_OP_RES_HANDLED;
-}
-
-
-
-static rd_kafka_op_res_t
-rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
-                            void (*consume_cb) (rd_kafka_message_t
-                                                *rkmessage,
-                                                void *opaque),
-                            void *opaque) {
-        struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
-        return rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
-                                RD_KAFKA_Q_CB_RETURN,
-                                rd_kafka_consume_cb, &ctx);
-
-}
-
-
-int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
-			       int timeout_ms,
-			       void (*consume_cb) (rd_kafka_message_t
-						   *rkmessage,
-						   void *opaque),
-			       void *opaque) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-        shptr_rd_kafka_toppar_t *s_rktp;
-	rd_kafka_toppar_t *rktp;
-	int r;
-
-	/* Get toppar */
-	rd_kafka_topic_rdlock(rkt);
-	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
-	if (unlikely(!s_rktp))
-		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
-	rd_kafka_topic_rdunlock(rkt);
-
-	if (unlikely(!s_rktp)) {
-		/* No such toppar known */
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
-					ESRCH);
-		return -1;
-	}
-
-        rktp = rd_kafka_toppar_s2i(s_rktp);
-	r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
-                                       rkt->rkt_conf.consume_callback_max_msgs,
-				       consume_cb, opaque);
-
-	rd_kafka_toppar_destroy(s_rktp);
-
-	rd_kafka_set_last_error(0, 0);
-
-	return r;
-}
-
-
-
-int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
-				     int timeout_ms,
-				     void (*consume_cb) (rd_kafka_message_t
-							 *rkmessage,
-							 void *opaque),
-				     void *opaque) {
-	return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
-					  consume_cb, opaque);
-}
-
-
-/**
- * Serve queue 'rkq' and return one message.
- * By serving the queue it will also call any registered callbacks
- * registered for matching events, this includes consumer_cb()
- * in which case no message will be returned.
- */
-static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
-                                              rd_kafka_q_t *rkq,
-					      int timeout_ms) {
-	rd_kafka_op_t *rko;
-	rd_kafka_message_t *rkmessage = NULL;
-	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
-
-	rd_kafka_yield_thread = 0;
-        while ((rko = rd_kafka_q_pop(rkq,
-                                     rd_timeout_remains(abs_timeout), 0))) {
-                rd_kafka_op_res_t res;
-
-                res = rd_kafka_poll_cb(rk, rkq, rko,
-                                       RD_KAFKA_Q_CB_RETURN, NULL);
-
-                if (res == RD_KAFKA_OP_RES_PASS)
-                        break;
-
-                if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
-                            rd_kafka_yield_thread)) {
-                        /* Callback called rd_kafka_yield(), we must
-                         * stop dispatching the queue and return. */
-                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
-                                                EINTR);
-                        return NULL;
-                }
-
-                /* Message was handled by callback. */
-                continue;
-        }
-
-	if (!rko) {
-		/* Timeout reached with no op returned. */
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
-					ETIMEDOUT);
-		return NULL;
-	}
-
-        rd_kafka_assert(rk,
-                        rko->rko_type == RD_KAFKA_OP_FETCH ||
-                        rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
-
-	/* Get rkmessage from rko */
-	rkmessage = rd_kafka_message_get(rko);
-
-	/* Store offset */
-	rd_kafka_op_offset_store(rk, rko, rkmessage);
-
-	rd_kafka_set_last_error(0, 0);
-
-	return rkmessage;
-}
-
-rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
-                                      int32_t partition,
-				      int timeout_ms) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-        shptr_rd_kafka_toppar_t *s_rktp;
-	rd_kafka_toppar_t *rktp;
-	rd_kafka_message_t *rkmessage;
-
-	rd_kafka_topic_rdlock(rkt);
-	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
-	if (unlikely(!s_rktp))
-		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
-	rd_kafka_topic_rdunlock(rkt);
-
-	if (unlikely(!s_rktp)) {
-		/* No such toppar known */
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
-					ESRCH);
-		return NULL;
-	}
-
-        rktp = rd_kafka_toppar_s2i(s_rktp);
-	rkmessage = rd_kafka_consume0(rkt->rkt_rk,
-                                      rktp->rktp_fetchq, timeout_ms);
-
-	rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
-
-	return rkmessage;
-}
-
-
-rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
-					    int timeout_ms) {
-	return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
-}
-
-
-
-
-rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
-        rd_kafka_cgrp_t *rkcg;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
-                                            int timeout_ms) {
-        rd_kafka_cgrp_t *rkcg;
-
-        if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
-                rd_kafka_message_t *rkmessage = rd_kafka_message_new();
-                rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-                return rkmessage;
-        }
-
-        return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
-}
-
-
-rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
-        rd_kafka_cgrp_t *rkcg;
-        rd_kafka_op_t *rko;
-        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-	rd_kafka_q_t *rkq;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-	/* Redirect cgrp queue to our temporary queue to make sure
-	 * all posted ops (e.g., rebalance callbacks) are served by
-	 * this function. */
-	rkq = rd_kafka_q_new(rk);
-	rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
-
-        rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
-
-        while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
-                rd_kafka_op_res_t res;
-                if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
-		    RD_KAFKA_OP_TERMINATE) {
-                        err = rko->rko_err;
-                        rd_kafka_op_destroy(rko);
-                        break;
-                }
-                res = rd_kafka_poll_cb(rk, rkq, rko,
-                                       RD_KAFKA_Q_CB_RETURN, NULL);
-                if (res == RD_KAFKA_OP_RES_PASS)
-                        rd_kafka_op_destroy(rko);
-                /* Ignore YIELD, we need to finish */
-        }
-
-        rd_kafka_q_destroy(rkq);
-
-	rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
-
-        return err;
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_committed (rd_kafka_t *rk,
-		    rd_kafka_topic_partition_list_t *partitions,
-		    int timeout_ms) {
-        rd_kafka_q_t *rkq;
-        rd_kafka_resp_err_t err;
-        rd_kafka_cgrp_t *rkcg;
-	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
-
-        if (!partitions)
-                return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-	/* Set default offsets. */
-	rd_kafka_topic_partition_list_reset_offsets(partitions,
-                                                    RD_KAFKA_OFFSET_INVALID);
-
-	rkq = rd_kafka_q_new(rk);
-
-        do {
-                rd_kafka_op_t *rko;
-		int state_version = rd_kafka_brokers_get_state_version(rk);
-
-                rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
-		rd_kafka_op_set_replyq(rko, rkq, NULL);
-
-                /* Issue #827
-                 * Copy partition list to avoid use-after-free if we time out
-                 * here, the app frees the list, and then cgrp starts
-                 * processing the op. */
-		rko->rko_u.offset_fetch.partitions =
-                        rd_kafka_topic_partition_list_copy(partitions);
-		rko->rko_u.offset_fetch.do_free = 1;
-
-                if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
-                        err = RD_KAFKA_RESP_ERR__DESTROY;
-                        break;
-                }
-
-                rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0);
-                if (rko) {
-                        if (!(err = rko->rko_err))
-                                rd_kafka_topic_partition_list_update(
-                                        partitions,
-                                        rko->rko_u.offset_fetch.partitions);
-                        else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
-				    err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
-				   !rd_kafka_brokers_wait_state_change(
-					   rk, state_version,
-					   rd_timeout_remains(abs_timeout)))
-				err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-
-                        rd_kafka_op_destroy(rko);
-                } else
-                        err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-        } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
-		 err == RD_KAFKA_RESP_ERR__WAIT_COORD);
-
-        rd_kafka_q_destroy(rkq);
-
-        return err;
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_position (rd_kafka_t *rk,
-		   rd_kafka_topic_partition_list_t *partitions) {
- 	int i;
-
-	/* Set default offsets. */
-	rd_kafka_topic_partition_list_reset_offsets(partitions,
-						    RD_KAFKA_OFFSET_INVALID);
-
-	for (i = 0 ; i < partitions->cnt ; i++) {
-		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
-		shptr_rd_kafka_toppar_t *s_rktp;
-		rd_kafka_toppar_t *rktp;
-
-		if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
-						    rktpar->partition, 0, 1))) {
-			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
-			rktpar->offset = RD_KAFKA_OFFSET_INVALID;
-			continue;
-		}
-
-		rktp = rd_kafka_toppar_s2i(s_rktp);
-		rd_kafka_toppar_lock(rktp);
-		rktpar->offset = rktp->rktp_app_offset;
-		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
-		rd_kafka_toppar_unlock(rktp);
-		rd_kafka_toppar_destroy(s_rktp);
-	}
-
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-struct _query_wmark_offsets_state {
-	rd_kafka_resp_err_t err;
-	const char *topic;
-	int32_t partition;
-	int64_t offsets[2];
-	int     offidx;  /* next offset to set from response */
-	rd_ts_t ts_end;
-	int     state_version;  /* Broker state version */
-};
-
-static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
-						  rd_kafka_broker_t *rkb,
-						  rd_kafka_resp_err_t err,
-						  rd_kafka_buf_t *rkbuf,
-						  rd_kafka_buf_t *request,
-						  void *opaque) {
-	struct _query_wmark_offsets_state *state = opaque;
-        rd_kafka_topic_partition_list_t *offsets;
-        rd_kafka_topic_partition_t *rktpar;
-
-        offsets = rd_kafka_topic_partition_list_new(1);
-        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets);
-        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
-                rd_kafka_topic_partition_list_destroy(offsets);
-                return; /* Retrying */
-        }
-
-	/* Retry if no broker connection is available yet. */
-	if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
-	     err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
-	    rkb &&
-	    rd_kafka_brokers_wait_state_change(
-		    rkb->rkb_rk, state->state_version,
-		    rd_timeout_remains(state->ts_end))) {
-		/* Retry */
-		state->state_version = rd_kafka_brokers_get_state_version(rk);
-		request->rkbuf_retries = 0;
-		if (rd_kafka_buf_retry(rkb, request)) {
-                        rd_kafka_topic_partition_list_destroy(offsets);
-                        return; /* Retry in progress */
-                }
-		/* FALLTHRU */
-	}
-
-        /* Partition not seen in response. */
-        if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
-                                                          state->topic,
-                                                          state->partition)))
-                err = RD_KAFKA_RESP_ERR__BAD_MSG;
-        else if (rktpar->err)
-                err = rktpar->err;
-        else
-                state->offsets[state->offidx] = rktpar->offset;
-
-        state->offidx++;
-
-        if (err || state->offidx == 2) /* Error or Done */
-                state->err = err;
-
-        rd_kafka_topic_partition_list_destroy(offsets);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
-                                  int32_t partition,
-                                  int64_t *low, int64_t *high, int timeout_ms) {
-        rd_kafka_q_t *rkq;
-        struct _query_wmark_offsets_state state;
-        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
-        rd_kafka_topic_partition_list_t *partitions;
-        rd_kafka_topic_partition_t *rktpar;
-        struct rd_kafka_partition_leader *leader;
-        rd_list_t leaders;
-        rd_kafka_resp_err_t err;
-
-        partitions = rd_kafka_topic_partition_list_new(1);
-        rktpar = rd_kafka_topic_partition_list_add(partitions,
-                                                   topic, partition);
-
-        rd_list_init(&leaders, partitions->cnt,
-                     (void *)rd_kafka_partition_leader_destroy);
-
-        err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
-                                                          &leaders, timeout_ms);
-        if (err) {
-                         rd_list_destroy(&leaders);
-                         rd_kafka_topic_partition_list_destroy(partitions);
-                         return err;
-        }
-
-        leader = rd_list_elem(&leaders, 0);
-
-        rkq = rd_kafka_q_new(rk);
-
-        /* Due to KAFKA-1588 we need to send a request for each wanted offset,
-         * in this case one for the low watermark and one for the high. */
-        state.topic = topic;
-        state.partition = partition;
-        state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
-        state.offsets[1] = RD_KAFKA_OFFSET_END;
-        state.offidx = 0;
-        state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
-        state.ts_end = ts_end;
-        state.state_version = rd_kafka_brokers_get_state_version(rk);
-
-
-        rktpar->offset =  RD_KAFKA_OFFSET_BEGINNING;
-        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
-                               RD_KAFKA_REPLYQ(rkq, 0),
-                               rd_kafka_query_wmark_offsets_resp_cb,
-                               &state);
-
-        rktpar->offset =  RD_KAFKA_OFFSET_END;
-        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
-                               RD_KAFKA_REPLYQ(rkq, 0),
-                               rd_kafka_query_wmark_offsets_resp_cb,
-                               &state);
-
-        rd_kafka_topic_partition_list_destroy(partitions);
-        rd_list_destroy(&leaders);
-
-        /* Wait for reply (or timeout) */
-        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
-               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
-                                rd_kafka_poll_cb, NULL) !=
-               RD_KAFKA_OP_RES_YIELD)
-                ;
-
-        rd_kafka_q_destroy(rkq);
-
-        if (state.err)
-                return state.err;
-        else if (state.offidx != 2)
-                return RD_KAFKA_RESP_ERR__FAIL;
-
-        /* We are not certain about the returned order. */
-        if (state.offsets[0] < state.offsets[1]) {
-                *low = state.offsets[0];
-                *high  = state.offsets[1];
-        } else {
-                *low = state.offsets[1];
-                *high = state.offsets[0];
-        }
-
-        /* If partition is empty only one offset (the last) will be returned. */
-        if (*low < 0 && *high >= 0)
-                *low = *high;
-
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
-				int32_t partition,
-				int64_t *low, int64_t *high) {
-	shptr_rd_kafka_toppar_t *s_rktp;
-	rd_kafka_toppar_t *rktp;
-
-	s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
-	if (!s_rktp)
-		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
-	rktp = rd_kafka_toppar_s2i(s_rktp);
-
-	rd_kafka_toppar_lock(rktp);
-	*low = rktp->rktp_lo_offset;
-	*high = rktp->rktp_hi_offset;
-	rd_kafka_toppar_unlock(rktp);
-
-	rd_kafka_toppar_destroy(s_rktp);
-
-	return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief get_offsets_for_times() state
- */
-struct _get_offsets_for_times {
-        rd_kafka_topic_partition_list_t *results;
-        rd_kafka_resp_err_t err;
-        int wait_reply;
-        int state_version;
-        rd_ts_t ts_end;
-};
-
-/**
- * @brief Handle OffsetRequest responses
- */
-static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
-                                                  rd_kafka_broker_t *rkb,
-                                                  rd_kafka_resp_err_t err,
-                                                  rd_kafka_buf_t *rkbuf,
-                                                  rd_kafka_buf_t *request,
-                                                  void *opaque) {
-        struct _get_offsets_for_times *state = opaque;
-
-        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request,
-                                     state->results);
-        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
-                return; /* Retrying */
-
-        /* Retry if no broker connection is available yet. */
-        if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
-             err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
-            rkb &&
-            rd_kafka_brokers_wait_state_change(
-                    rkb->rkb_rk, state->state_version,
-                    rd_timeout_remains(state->ts_end))) {
-                /* Retry */
-                state->state_version = rd_kafka_brokers_get_state_version(rk);
-                request->rkbuf_retries = 0;
-                if (rd_kafka_buf_retry(rkb, request))
-                        return; /* Retry in progress */
-                /* FALLTHRU */
-        }
-
-        if (err && !state->err)
-                state->err = err;
-
-        state->wait_reply--;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_offsets_for_times (rd_kafka_t *rk,
-                            rd_kafka_topic_partition_list_t *offsets,
-                            int timeout_ms) {
-        rd_kafka_q_t *rkq;
-        struct _get_offsets_for_times state = RD_ZERO_INIT;
-        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
-        rd_list_t leaders;
-        int i;
-        rd_kafka_resp_err_t err;
-        struct rd_kafka_partition_leader *leader;
-
-        if (offsets->cnt == 0)
-                return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
-        rd_list_init(&leaders, offsets->cnt,
-                     (void *)rd_kafka_partition_leader_destroy);
-
-        err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
-                                                          timeout_ms);
-        if (err) {
-                rd_list_destroy(&leaders);
-                return err;
-        }
-
-
-        rkq = rd_kafka_q_new(rk);
-
-        state.wait_reply = 0;
-        state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
-
-        /* For each leader send a request for its partitions */
-        RD_LIST_FOREACH(leader, &leaders, i) {
-                state.wait_reply++;
-                rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1,
-                                       RD_KAFKA_REPLYQ(rkq, 0),
-                                       rd_kafka_get_offsets_for_times_resp_cb,
-                                       &state);
-        }
-
-        rd_list_destroy(&leaders);
-
-        /* Wait for reply (or timeout) */
-        while (state.wait_reply > 0 && rd_timeout_remains(ts_end) > 0)
-                rd_kafka_q_serve(rkq, rd_timeout_remains(ts_end),
-                                0, RD_KAFKA_Q_CB_CALLBACK,
-                                 rd_kafka_poll_cb, NULL);
-
-        rd_kafka_q_destroy(rkq);
-
-        /* Then update the queried partitions. */
-        if (!state.err)
-                rd_kafka_topic_partition_list_update(offsets, state.results);
-
-        rd_kafka_topic_partition_list_destroy(state.results);
-
-        return state.err;
-}
-
-
-/**
- * rd_kafka_poll() (and similar) op callback handler.
- * Will either call registered callback depending on cb_type and op type
- * or return op to application, if applicable (e.g., fetch message).
- *
- * Returns 1 if op was handled, else 0.
- *
- * Locality: application thread
- */
-rd_kafka_op_res_t
-rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
-                  rd_kafka_q_cb_type_t cb_type, void *opaque) {
-	rd_kafka_msg_t *rkm;
-
-	/* Return-as-event requested, see if op can be converted to event,
-	 * otherwise fall through and trigger callbacks. */
-	if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko))
-		return 0; /* Return as event */
-
-        switch ((int)rko->rko_type)
-        {
-        case RD_KAFKA_OP_FETCH:
-                if (!rk->rk_conf.consume_cb ||
-                    cb_type == RD_KAFKA_Q_CB_RETURN ||
-                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
-                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
-                else {
-                        struct consume_ctx ctx = {
-                                .consume_cb = rk->rk_conf.consume_cb,
-                                .opaque = rk->rk_conf.opaque };
-
-                        return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
-                }
-                break;
-
-        case RD_KAFKA_OP_REBALANCE:
-                /* If EVENT_REBALANCE is enabled but rebalance_cb isnt
-                 * we need to perform a dummy assign for the application.
-                 * This might happen during termination with consumer_close() */
-                if (rk->rk_conf.rebalance_cb)
-                        rk->rk_conf.rebalance_cb(
-                                rk, rko->rko_err,
-                                rko->rko_u.rebalance.partitions,
-                                rk->rk_conf.opaque);
-                else {
-                        rd_kafka_dbg(rk, CGRP, "UNASSIGN",
-                                     "Forcing unassign of %d partition(s)",
-                                     rko->rko_u.rebalance.partitions ?
-                                     rko->rko_u.rebalance.partitions->cnt : 0);
-                        rd_kafka_assign(rk, NULL);
-                }
-                break;
-
-        case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
-		if (!rko->rko_u.offset_commit.cb)
-			return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
-		rko->rko_u.offset_commit.cb(
-                        rk, rko->rko_err,
-			rko->rko_u.offset_commit.partitions,
-			rko->rko_u.offset_commit.opaque);
-                break;
-
-        case RD_KAFKA_OP_CONSUMER_ERR:
-                /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
-                 *   Consumer errors are returned to the application
-                 *   as rkmessages, not error callbacks.
-                 *
-                 * rd_kafka_poll() (_Q_CB_GLOBAL):
-                 *   convert to ERR op (fallthru)
-                 */
-                if (cb_type == RD_KAFKA_Q_CB_RETURN ||
-                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
-                        /* return as message_t to application */
-                        return RD_KAFKA_OP_RES_PASS;
-                }
-		/* FALLTHRU */
-
-	case RD_KAFKA_OP_ERR:
-		if (rk->rk_conf.error_cb)
-			rk->rk_conf.error_cb(rk, rko->rko_err,
-					     rko->rko_u.err.errstr,
-                                             rk->rk_conf.opaque);
-		else
-			rd_kafka_log(rk, LOG_ERR, "ERROR",
-				     "%s: %s: %s",
-				     rk->rk_name,
-				     rd_kafka_err2str(rko->rko_err),
-				     rko->rko_u.err.errstr);
-		break;
-
-	case RD_KAFKA_OP_DR:
-		/* Delivery report:
-		 * call application DR callback for each message. */
-		while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
-                        rd_kafka_message_t *rkmessage;
-
-			TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
-				     rkm, rkm_link);
-
-                        rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
-
-                        if (rk->rk_conf.dr_msg_cb) {
-                                rk->rk_conf.dr_msg_cb(rk, rkmessage,
-                                                      rk->rk_conf.opaque);
-
-                        } else {
-
-                                rk->rk_conf.dr_cb(rk,
-                                                  rkmessage->payload,
-                                                  rkmessage->len,
-                                                  rkmessage->err,
-                                                  rk->rk_conf.opaque,
-                                                  rkmessage->_private);
-                        }
-
-                        rd_kafka_msg_destroy(rk, rkm);
-
-                        if (unlikely(rd_kafka_yield_thread)) {
-                                /* Callback called yield(),
-                                 * re-enqueue the op (if there are any
-                                 * remaining messages). */
-                                if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
-                                                 rkmq_msgs))
-                                        rd_kafka_q_reenq(rkq, rko);
-                                else
-                                        rd_kafka_op_destroy(rko);
-                                return RD_KAFKA_OP_RES_YIELD;
-                        }
-		}
-
-		rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
-
-		break;
-
-	case RD_KAFKA_OP_THROTTLE:
-		if (rk->rk_conf.throttle_cb)
-			rk->rk_conf.throt

<TRUNCATED>