You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:15:00 UTC

[32/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.c
deleted file mode 100644
index 742e0fd..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_broker.c
+++ /dev/null
@@ -1,3797 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-
-#ifndef _MSC_VER
-#define _GNU_SOURCE
-/*
- * AIX defines this and the value needs to be set correctly. For Solaris,
- * src/rd.h defines _POSIX_SOURCE to be 200809L, which corresponds to XPG7,
- * which itself is not compatible with _XOPEN_SOURCE on that platform.
- */
-#if !defined(_AIX) && !defined(__sun)
-#define _XOPEN_SOURCE
-#endif
-#include <signal.h>
-#endif
-
-#include <stdio.h>
-#include <stdarg.h>
-#include <string.h>
-#include <ctype.h>
-
-#include "rd.h"
-#include "rdkafka_int.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_msgset.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_proto.h"
-#include "rdkafka_buf.h"
-#include "rdkafka_request.h"
-#include "rdkafka_sasl.h"
-#include "rdkafka_interceptor.h"
-#include "rdtime.h"
-#include "rdcrc32.h"
-#include "rdrand.h"
-#include "rdkafka_lz4.h"
-#if WITH_SSL
-#include <openssl/err.h>
-#endif
-#include "rdendian.h"
-
-
-const char *rd_kafka_broker_state_names[] = {
-	"INIT",
-	"DOWN",
-	"CONNECT",
-	"AUTH",
-	"UP",
-        "UPDATE",
-	"APIVERSION_QUERY",
-	"AUTH_HANDSHAKE"
-};
-
-const char *rd_kafka_secproto_names[] = {
-	[RD_KAFKA_PROTO_PLAINTEXT] = "plaintext",
-	[RD_KAFKA_PROTO_SSL] = "ssl",
-	[RD_KAFKA_PROTO_SASL_PLAINTEXT] = "sasl_plaintext",
-	[RD_KAFKA_PROTO_SASL_SSL] = "sasl_ssl",
-	NULL
-};
-
-
-
-
-
-
-
-#define rd_kafka_broker_terminating(rkb) \
-        (rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1)
-
-
-/**
- * Construct broker nodename.
- */
-static void rd_kafka_mk_nodename (char *dest, size_t dsize,
-                                  const char *name, uint16_t port) {
-        rd_snprintf(dest, dsize, "%s:%hu", name, port);
-}
-
-/**
- * Construct descriptive broker name
- */
-static void rd_kafka_mk_brokername (char *dest, size_t dsize,
-				    rd_kafka_secproto_t proto,
-				    const char *nodename, int32_t nodeid,
-				    rd_kafka_confsource_t source) {
-
-	/* Prepend protocol name to brokername, unless it is a
-	 * standard plaintext broker in which case we omit the protocol part. */
-	if (proto != RD_KAFKA_PROTO_PLAINTEXT) {
-		int r = rd_snprintf(dest, dsize, "%s://",
-				    rd_kafka_secproto_names[proto]);
-		if (r >= (int)dsize) /* Skip proto name if it wont fit.. */
-			r = 0;
-
-		dest += r;
-		dsize -= r;
-	}
-
-	if (nodeid == RD_KAFKA_NODEID_UA)
-		rd_snprintf(dest, dsize, "%s/%s",
-			    nodename,
-			    source == RD_KAFKA_INTERNAL ?
-			    "internal":"bootstrap");
-	else
-		rd_snprintf(dest, dsize, "%s/%"PRId32, nodename, nodeid);
-}
-
-
-/**
- * @brief Enable protocol feature(s) for the current broker.
- *
- * Locality: broker thread
- */
-static void rd_kafka_broker_feature_enable (rd_kafka_broker_t *rkb,
-					    int features) {
-	if (features & rkb->rkb_features)
-		return;
-
-	rkb->rkb_features |= features;
-	rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE,
-		   "FEATURE",
-		   "Updated enabled protocol features +%s to %s",
-		   rd_kafka_features2str(features),
-		   rd_kafka_features2str(rkb->rkb_features));
-}
-
-
-/**
- * @brief Disable protocol feature(s) for the current broker.
- *
- * Locality: broker thread
- */
-static void rd_kafka_broker_feature_disable (rd_kafka_broker_t *rkb,
-						       int features) {
-	if (!(features & rkb->rkb_features))
-		return;
-
-	rkb->rkb_features &= ~features;
-	rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE,
-		   "FEATURE",
-		   "Updated enabled protocol features -%s to %s",
-		   rd_kafka_features2str(features),
-		   rd_kafka_features2str(rkb->rkb_features));
-}
-
-
-/**
- * @brief Set protocol feature(s) for the current broker.
- *
- * @remark This replaces the previous feature set.
- *
- * @locality broker thread
- * @locks rd_kafka_broker_lock()
- */
-static void rd_kafka_broker_features_set (rd_kafka_broker_t *rkb, int features) {
-	if (rkb->rkb_features == features)
-		return;
-
-	rkb->rkb_features = features;
-	rd_rkb_dbg(rkb, BROKER, "FEATURE",
-		   "Updated enabled protocol features to %s",
-		   rd_kafka_features2str(rkb->rkb_features));
-}
-
-
-/**
- * @brief Check and return supported ApiVersion for \p ApiKey.
- *
- * @returns the highest supported ApiVersion in the specified range (inclusive)
- *          or -1 if the ApiKey is not supported or no matching ApiVersion.
- *          The current feature set is also returned in \p featuresp
- * @locks none
- * @locality any
- */
-int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
-                                              int16_t ApiKey,
-                                              int16_t minver, int16_t maxver,
-                                              int *featuresp) {
-        struct rd_kafka_ApiVersion skel = { .ApiKey = ApiKey };
-        struct rd_kafka_ApiVersion ret = RD_ZERO_INIT, *retp;
-
-        rd_kafka_broker_lock(rkb);
-        retp = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt,
-                       sizeof(*rkb->rkb_ApiVersions),
-                       rd_kafka_ApiVersion_key_cmp);
-        if (retp)
-                ret = *retp;
-        if (featuresp)
-                *featuresp = rkb->rkb_features;
-        rd_kafka_broker_unlock(rkb);
-
-        if (!retp)
-                return -1;
-
-        if (ret.MaxVer < maxver) {
-                if (ret.MaxVer < minver)
-                        return -1;
-                else
-                        return ret.MaxVer;
-        } else if (ret.MinVer > maxver)
-                return -1;
-        else
-                return maxver;
-}
-
-
-/**
- * Locks: rd_kafka_broker_lock() MUST be held.
- * Locality: broker thread
- */
-void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state) {
-	if ((int)rkb->rkb_state == state)
-		return;
-
-	rd_kafka_dbg(rkb->rkb_rk, BROKER, "STATE",
-		     "%s: Broker changed state %s -> %s",
-		     rkb->rkb_name,
-		     rd_kafka_broker_state_names[rkb->rkb_state],
-		     rd_kafka_broker_state_names[state]);
-
-	if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
-		/* no-op */
-	} else if (state == RD_KAFKA_BROKER_STATE_DOWN &&
-		   !rkb->rkb_down_reported &&
-		   rkb->rkb_state != RD_KAFKA_BROKER_STATE_APIVERSION_QUERY) {
-		/* Propagate ALL_BROKERS_DOWN event if all brokers are
-		 * now down, unless we're terminating.
-		 * Dont do this if we're querying for ApiVersion since it
-		 * is bound to fail once on older brokers. */
-		if (rd_atomic32_add(&rkb->rkb_rk->rk_broker_down_cnt, 1) ==
-		    rd_atomic32_get(&rkb->rkb_rk->rk_broker_cnt) &&
-		    !rd_atomic32_get(&rkb->rkb_rk->rk_terminate))
-			rd_kafka_op_err(rkb->rkb_rk,
-					RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
-					"%i/%i brokers are down",
-					rd_atomic32_get(&rkb->rkb_rk->
-                                                        rk_broker_down_cnt),
-					rd_atomic32_get(&rkb->rkb_rk->
-                                                        rk_broker_cnt));
-		rkb->rkb_down_reported = 1;
-
-	} else if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
-		   rkb->rkb_down_reported) {
-		rd_atomic32_sub(&rkb->rkb_rk->rk_broker_down_cnt, 1);
-		rkb->rkb_down_reported = 0;
-	}
-
-	rkb->rkb_state = state;
-        rkb->rkb_ts_state = rd_clock();
-
-	rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
-}
-
-
-/**
- * @brief Locks broker, acquires the states, unlocks, and returns
- *        the state.
- * @locks !broker_lock
- * @locality any
- */
-int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb) {
-        int state;
-        rd_kafka_broker_lock(rkb);
-        state = rkb->rkb_state;
-        rd_kafka_broker_unlock(rkb);
-        return state;
-}
-
-
-/**
- * Failure propagation to application.
- * Will tear down connection to broker and trigger a reconnect.
- *
- * If 'fmt' is NULL nothing will be logged or propagated to the application.
- *
- * \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
- * be debug-logged.
- * 
- * Locality: Broker thread
- */
-void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
-                           int level, rd_kafka_resp_err_t err,
-			   const char *fmt, ...) {
-	va_list ap;
-	int errno_save = errno;
-	rd_kafka_bufq_t tmpq_waitresp, tmpq;
-        int old_state;
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	rd_kafka_dbg(rkb->rkb_rk, BROKER | RD_KAFKA_DBG_PROTOCOL, "BROKERFAIL",
-		     "%s: failed: err: %s: (errno: %s)",
-		     rkb->rkb_name, rd_kafka_err2str(err),
-		     rd_strerror(errno_save));
-
-	rkb->rkb_err.err = errno_save;
-
-	if (rkb->rkb_transport) {
-		rd_kafka_transport_close(rkb->rkb_transport);
-		rkb->rkb_transport = NULL;
-	}
-
-	rkb->rkb_req_timeouts = 0;
-
-	if (rkb->rkb_recv_buf) {
-		rd_kafka_buf_destroy(rkb->rkb_recv_buf);
-		rkb->rkb_recv_buf = NULL;
-	}
-
-	rd_kafka_broker_lock(rkb);
-
-	/* The caller may omit the format if it thinks this is a recurring
-	 * failure, in which case the following things are omitted:
-	 *  - log message
-	 *  - application OP_ERR
-	 *  - metadata request
-	 *
-	 * Dont log anything if this was the termination signal, or if the
-	 * socket disconnected while trying ApiVersionRequest.
-	 */
-	if (fmt &&
-	    !(errno_save == EINTR &&
-	      rd_atomic32_get(&rkb->rkb_rk->rk_terminate)) &&
-	    !(err == RD_KAFKA_RESP_ERR__TRANSPORT &&
-	      rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY)) {
-		int of;
-
-		/* Insert broker name in log message if it fits. */
-		of = rd_snprintf(rkb->rkb_err.msg, sizeof(rkb->rkb_err.msg),
-			      "%s: ", rkb->rkb_name);
-		if (of >= (int)sizeof(rkb->rkb_err.msg))
-			of = 0;
-		va_start(ap, fmt);
-		rd_vsnprintf(rkb->rkb_err.msg+of,
-			  sizeof(rkb->rkb_err.msg)-of, fmt, ap);
-		va_end(ap);
-
-                if (level >= LOG_DEBUG)
-                        rd_kafka_dbg(rkb->rkb_rk, BROKER, "FAIL",
-                                     "%s", rkb->rkb_err.msg);
-                else {
-                        /* Don't log if an error callback is registered */
-                        if (!rkb->rkb_rk->rk_conf.error_cb)
-                                rd_kafka_log(rkb->rkb_rk, level, "FAIL",
-                                             "%s", rkb->rkb_err.msg);
-                        /* Send ERR op back to application for processing. */
-                        rd_kafka_op_err(rkb->rkb_rk, err,
-                                        "%s", rkb->rkb_err.msg);
-                }
-	}
-
-	/* If we're currently asking for ApiVersion and the connection
-	 * went down it probably means the broker does not support that request
-	 * and tore down the connection. In this case we disable that feature flag. */
-	if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY)
-		rd_kafka_broker_feature_disable(rkb, RD_KAFKA_FEATURE_APIVERSION);
-
-	/* Set broker state */
-        old_state = rkb->rkb_state;
-	rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
-
-	/* Unlock broker since a requeue will try to lock it. */
-	rd_kafka_broker_unlock(rkb);
-
-	/*
-	 * Purge all buffers
-	 * (put bufs on a temporary queue since bufs may be requeued,
-	 *  make sure outstanding requests are re-enqueued before
-	 *  bufs on outbufs queue.)
-	 */
-	rd_kafka_bufq_init(&tmpq_waitresp);
-	rd_kafka_bufq_init(&tmpq);
-	rd_kafka_bufq_concat(&tmpq_waitresp, &rkb->rkb_waitresps);
-	rd_kafka_bufq_concat(&tmpq, &rkb->rkb_outbufs);
-        rd_atomic32_init(&rkb->rkb_blocking_request_cnt, 0);
-
-	/* Purge the buffers (might get re-enqueued in case of retries) */
-	rd_kafka_bufq_purge(rkb, &tmpq_waitresp, err);
-
-	/* Put the outbufs back on queue */
-	rd_kafka_bufq_concat(&rkb->rkb_outbufs, &tmpq);
-
-	/* Update bufq for connection reset:
-	 *  - Purge connection-setup requests from outbufs since they will be
-	 *    reissued on the next connect.
-	 *  - Reset any partially sent buffer's offset.
-	 */
-	rd_kafka_bufq_connection_reset(rkb, &rkb->rkb_outbufs);
-
-	/* Extra debugging for tracking termination-hang issues:
-	 * show what is keeping this broker from decommissioning. */
-	if (rd_kafka_terminating(rkb->rkb_rk) &&
-	    !rd_kafka_broker_terminating(rkb)) {
-		rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL, "BRKTERM",
-			   "terminating: broker still has %d refcnt(s), "
-			   "%"PRId32" buffer(s), %d partition(s)",
-			   rd_refcnt_get(&rkb->rkb_refcnt),
-			   rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
-			   rkb->rkb_toppar_cnt);
-		rd_kafka_bufq_dump(rkb, "BRKOUTBUFS", &rkb->rkb_outbufs);
-#if ENABLE_SHAREDPTR_DEBUG
-		if (rd_refcnt_get(&rkb->rkb_refcnt) > 1) {
-			rd_rkb_dbg(rkb, BROKER, "BRKTERM",
-				   "Dumping shared pointers: "
-				   "this broker is %p", rkb);
-			rd_shared_ptrs_dump();
-		}
-#endif
-	}
-
-
-        /* Query for topic leaders to quickly pick up on failover. */
-        if (fmt && err != RD_KAFKA_RESP_ERR__DESTROY &&
-            old_state >= RD_KAFKA_BROKER_STATE_UP)
-                rd_kafka_metadata_refresh_known_topics(rkb->rkb_rk, NULL,
-                                                       1/*force*/,
-                                                       "broker down");
-}
-
-
-
-
-
-/**
- * Scan bufq for buffer timeouts, trigger buffer callback on timeout.
- *
- * If \p partial_cntp is non-NULL any partially sent buffers will increase
- * the provided counter by 1.
- *
- * @returns the number of timed out buffers.
- *
- * @locality broker thread
- */
-static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
-					      int is_waitresp_q,
-					      rd_kafka_bufq_t *rkbq,
-					      int *partial_cntp,
-					      rd_kafka_resp_err_t err,
-					      rd_ts_t now) {
-	rd_kafka_buf_t *rkbuf, *tmp;
-	int cnt = 0;
-
-	TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
-
-		if (likely(now && rkbuf->rkbuf_ts_timeout > now))
-			continue;
-
-                if (partial_cntp && rd_slice_offset(&rkbuf->rkbuf_reader) > 0)
-                        (*partial_cntp)++;
-
-		/* Convert rkbuf_ts_sent to elapsed time since request */
-		if (rkbuf->rkbuf_ts_sent)
-			rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent;
-		else
-			rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_enq;
-
-		rd_kafka_bufq_deq(rkbq, rkbuf);
-
-		if (is_waitresp_q && rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING
-		    && rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 0)
-			rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
-
-                rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
-		cnt++;
-	}
-
-	return cnt;
-}
-
-
-/**
- * Scan the wait-response and outbuf queues for message timeouts.
- *
- * Locality: Broker thread
- */
-static void rd_kafka_broker_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) {
-	int req_cnt, retry_cnt, q_cnt;
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	/* Outstanding requests waiting for response */
-	req_cnt = rd_kafka_broker_bufq_timeout_scan(
-		rkb, 1, &rkb->rkb_waitresps, NULL,
-		RD_KAFKA_RESP_ERR__TIMED_OUT, now);
-	/* Requests in retry queue */
-	retry_cnt = rd_kafka_broker_bufq_timeout_scan(
-		rkb, 0, &rkb->rkb_retrybufs, NULL,
-		RD_KAFKA_RESP_ERR__TIMED_OUT, now);
-	/* Requests in local queue not sent yet. */
-	q_cnt = rd_kafka_broker_bufq_timeout_scan(
-		rkb, 0, &rkb->rkb_outbufs, &req_cnt,
-		RD_KAFKA_RESP_ERR__TIMED_OUT, now);
-
-	if (req_cnt + retry_cnt + q_cnt > 0) {
-		rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_BROKER,
-			   "REQTMOUT", "Timed out %i+%i+%i requests",
-			   req_cnt, retry_cnt, q_cnt);
-
-                /* Fail the broker if socket.max.fails is configured and
-                 * now exceeded. */
-                rkb->rkb_req_timeouts   += req_cnt + q_cnt;
-                rd_atomic64_add(&rkb->rkb_c.req_timeouts, req_cnt + q_cnt);
-
-		/* If this was an in-flight request that timed out, or
-		 * the other queues has reached the socket.max.fails threshold,
-		 * we need to take down the connection. */
-                if ((req_cnt > 0 ||
-		     (rkb->rkb_rk->rk_conf.socket_max_fails &&
-		      rkb->rkb_req_timeouts >=
-		      rkb->rkb_rk->rk_conf.socket_max_fails)) &&
-                    rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) {
-                        char rttinfo[32];
-                        /* Print average RTT (if avail) to help diagnose. */
-                        rd_avg_calc(&rkb->rkb_avg_rtt, now);
-                        if (rkb->rkb_avg_rtt.ra_v.avg)
-                                rd_snprintf(rttinfo, sizeof(rttinfo),
-                                            " (average rtt %.3fms)",
-                                            (float)(rkb->rkb_avg_rtt.ra_v.avg/
-                                                    1000.0f));
-                        else
-                                rttinfo[0] = 0;
-                        errno = ETIMEDOUT;
-                        rd_kafka_broker_fail(rkb, LOG_ERR,
-                                             RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
-                                             "%i request(s) timed out: "
-                                             "disconnect%s",
-                                             rkb->rkb_req_timeouts, rttinfo);
-                }
-        }
-}
-
-
-
-static ssize_t
-rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice) {
-	ssize_t r;
-	char errstr[128];
-
-	rd_kafka_assert(rkb->rkb_rk, rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP);
-	rd_kafka_assert(rkb->rkb_rk, rkb->rkb_transport);
-
-        r = rd_kafka_transport_send(rkb->rkb_transport, slice,
-                                    errstr, sizeof(errstr));
-
-	if (r == -1) {
-		rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
-                                     "Send failed: %s", errstr);
-		rd_atomic64_add(&rkb->rkb_c.tx_err, 1);
-		return -1;
-	}
-
-	rd_atomic64_add(&rkb->rkb_c.tx_bytes, r);
-	rd_atomic64_add(&rkb->rkb_c.tx, 1);
-	return r;
-}
-
-
-
-
-static int rd_kafka_broker_resolve (rd_kafka_broker_t *rkb) {
-	const char *errstr;
-
-	if (rkb->rkb_rsal &&
-	    rkb->rkb_t_rsal_last + rkb->rkb_rk->rk_conf.broker_addr_ttl <
-	    time(NULL)) { // FIXME: rd_clock()
-		/* Address list has expired. */
-		rd_sockaddr_list_destroy(rkb->rkb_rsal);
-		rkb->rkb_rsal = NULL;
-	}
-
-	if (!rkb->rkb_rsal) {
-		/* Resolve */
-
-		rkb->rkb_rsal = rd_getaddrinfo(rkb->rkb_nodename,
-					       RD_KAFKA_PORT_STR,
-					       AI_ADDRCONFIG,
-					       rkb->rkb_rk->rk_conf.
-                                               broker_addr_family,
-                                               SOCK_STREAM,
-					       IPPROTO_TCP, &errstr);
-
-		if (!rkb->rkb_rsal) {
-                        rd_kafka_broker_fail(rkb, LOG_ERR,
-                                             RD_KAFKA_RESP_ERR__RESOLVE,
-                                             /* Avoid duplicate log messages */
-                                             rkb->rkb_err.err == errno ?
-                                             NULL :
-                                             "Failed to resolve '%s': %s",
-                                             rkb->rkb_nodename, errstr);
-			return -1;
-		}
-	}
-
-	return 0;
-}
-
-
-static void rd_kafka_broker_buf_enq0 (rd_kafka_broker_t *rkb,
-				      rd_kafka_buf_t *rkbuf, int at_head) {
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-        rkbuf->rkbuf_ts_enq = rd_clock();
-
-        /* Set timeout if not already set */
-        if (!rkbuf->rkbuf_ts_timeout)
-        	rkbuf->rkbuf_ts_timeout = rkbuf->rkbuf_ts_enq +
-                        rkb->rkb_rk->rk_conf.socket_timeout_ms * 1000;
-
-	if (unlikely(at_head)) {
-		/* Insert message at head of queue */
-		rd_kafka_buf_t *prev, *after = NULL;
-
-		/* Put us behind any flash messages and partially sent buffers.
-		 * We need to check if buf corrid is set rather than
-		 * rkbuf_of since SSL_write may return 0 and expect the
-		 * exact same arguments the next call. */
-		TAILQ_FOREACH(prev, &rkb->rkb_outbufs.rkbq_bufs, rkbuf_link) {
-			if (!(prev->rkbuf_flags & RD_KAFKA_OP_F_FLASH) &&
-			    prev->rkbuf_corrid == 0)
-				break;
-			after = prev;
-		}
-
-		if (after)
-			TAILQ_INSERT_AFTER(&rkb->rkb_outbufs.rkbq_bufs,
-					   after, rkbuf, rkbuf_link);
-		else
-			TAILQ_INSERT_HEAD(&rkb->rkb_outbufs.rkbq_bufs,
-					  rkbuf, rkbuf_link);
-	} else {
-		/* Insert message at tail of queue */
-		TAILQ_INSERT_TAIL(&rkb->rkb_outbufs.rkbq_bufs,
-				  rkbuf, rkbuf_link);
-	}
-
-	(void)rd_atomic32_add(&rkb->rkb_outbufs.rkbq_cnt, 1);
-	(void)rd_atomic32_add(&rkb->rkb_outbufs.rkbq_msg_cnt,
-                            rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
-}
-
-
-/**
- * Finalize a stuffed rkbuf for sending to broker.
- */
-static void rd_kafka_buf_finalize (rd_kafka_t *rk, rd_kafka_buf_t *rkbuf) {
-        size_t totsize;
-
-        /* Calculate total request buffer length. */
-        totsize = rd_buf_len(&rkbuf->rkbuf_buf) - 4;
-        rd_assert(totsize <= (size_t)rk->rk_conf.max_msg_size);
-
-        /* Set up a buffer reader for sending the buffer. */
-        rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
-
-        /**
-         * Update request header fields
-         */
-        /* Total reuqest length */
-        rd_kafka_buf_update_i32(rkbuf, 0, (int32_t)totsize);
-
-        /* ApiVersion */
-        rd_kafka_buf_update_i16(rkbuf, 4+2, rkbuf->rkbuf_reqhdr.ApiVersion);
-}
-
-
-void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
-                               rd_kafka_buf_t *rkbuf,
-                               rd_kafka_resp_cb_t *resp_cb,
-                               void *opaque) {
-
-
-        rkbuf->rkbuf_cb     = resp_cb;
-	rkbuf->rkbuf_opaque = opaque;
-
-        rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf);
-
-	rd_kafka_broker_buf_enq0(rkb, rkbuf,
-				 (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLASH)?
-				 1/*head*/: 0/*tail*/);
-}
-
-
-/**
- * Enqueue buffer on broker's xmit queue, but fail buffer immediately
- * if broker is not up.
- *
- * Locality: broker thread
- */
-static int rd_kafka_broker_buf_enq2 (rd_kafka_broker_t *rkb,
-				      rd_kafka_buf_t *rkbuf) {
-        if (unlikely(rkb->rkb_source == RD_KAFKA_INTERNAL)) {
-                /* Fail request immediately if this is the internal broker. */
-                rd_kafka_buf_callback(rkb->rkb_rk, rkb,
-				      RD_KAFKA_RESP_ERR__TRANSPORT,
-                                      NULL, rkbuf);
-                return -1;
-        }
-
-	rd_kafka_broker_buf_enq0(rkb, rkbuf,
-				 (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLASH)?
-				 1/*head*/: 0/*tail*/);
-
-	return 0;
-}
-
-
-
-/**
- * Enqueue buffer for tranmission.
- * Responses are enqueued on 'replyq' (RD_KAFKA_OP_RECV_BUF)
- *
- * Locality: any thread
- */
-void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
-                                     rd_kafka_buf_t *rkbuf,
-                                     rd_kafka_replyq_t replyq,
-                                     rd_kafka_resp_cb_t *resp_cb,
-                                     void *opaque) {
-
-        assert(rkbuf->rkbuf_rkb == rkb);
-        if (resp_cb) {
-                rkbuf->rkbuf_replyq = replyq;
-                rkbuf->rkbuf_cb     = resp_cb;
-                rkbuf->rkbuf_opaque = opaque;
-        } else {
-		rd_dassert(!replyq.q);
-	}
-
-        rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf);
-
-
-	if (thrd_is_current(rkb->rkb_thread)) {
-		rd_kafka_broker_buf_enq2(rkb, rkbuf);
-
-	} else {
-		rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_BUF);
-		rko->rko_u.xbuf.rkbuf = rkbuf;
-		rd_kafka_q_enq(rkb->rkb_ops, rko);
-	}
-}
-
-
-
-
-/**
- * @returns the current broker state change version.
- *          Pass this value to fugure rd_kafka_brokers_wait_state_change() calls
- *          to avoid the race condition where a state-change happens between
- *          an initial call to some API that fails and the sub-sequent
- *          .._wait_state_change() call.
- */
-int rd_kafka_brokers_get_state_version (rd_kafka_t *rk) {
-	int version;
-	mtx_lock(&rk->rk_broker_state_change_lock);
-	version = rk->rk_broker_state_change_version;
-	mtx_unlock(&rk->rk_broker_state_change_lock);
-	return version;
-}
-
-/**
- * @brief Wait at most \p timeout_ms for any state change for any broker.
- *        \p stored_version is the value previously returned by
- *        rd_kafka_brokers_get_state_version() prior to another API call
- *        that failed due to invalid state.
- *
- * Triggers:
- *   - broker state changes
- *   - broker transitioning from blocking to non-blocking
- *   - partition leader changes
- *   - group state changes
- *
- * @remark There is no guarantee that a state change actually took place.
- *
- * @returns 1 if a state change was signaled (maybe), else 0 (timeout)
- *
- * @locality any thread
- */
-int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
-					int timeout_ms) {
-	int r;
-	mtx_lock(&rk->rk_broker_state_change_lock);
-	if (stored_version != rk->rk_broker_state_change_version)
-		r = 1;
-	else
-		r = cnd_timedwait_ms(&rk->rk_broker_state_change_cnd,
-				     &rk->rk_broker_state_change_lock,
-				     timeout_ms) == thrd_success;
-	mtx_unlock(&rk->rk_broker_state_change_lock);
-	return r;
-}
-
-
-/**
- * @brief Broadcast broker state change to listeners, if any.
- *
- * @locality any thread
- */
-void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk) {
-	rd_kafka_dbg(rk, GENERIC, "BROADCAST",
-		     "Broadcasting state change");
-	mtx_lock(&rk->rk_broker_state_change_lock);
-	rk->rk_broker_state_change_version++;
-	cnd_broadcast(&rk->rk_broker_state_change_cnd);
-	mtx_unlock(&rk->rk_broker_state_change_lock);
-}
-
-
-/**
- * Returns a random broker (with refcnt increased) in state 'state'.
- * Uses Reservoir sampling.
- *
- * 'filter' is an optional callback used to filter out undesired brokers.
- * The filter function should return 1 to filter out a broker, or 0 to keep it
- * in the list of eligible brokers to return.
- * rd_kafka_broker_lock() is held during the filter callback.
- *
- * Locks: rd_kafka_rdlock(rk) MUST be held.
- * Locality: any thread
- */
-rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
-                                        int (*filter) (rd_kafka_broker_t *rkb,
-                                                       void *opaque),
-                                        void *opaque) {
-	rd_kafka_broker_t *rkb, *good = NULL;
-        int cnt = 0;
-
-	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
-		rd_kafka_broker_lock(rkb);
-		if ((int)rkb->rkb_state == state &&
-                    (!filter || !filter(rkb, opaque))) {
-                        if (cnt < 1 || rd_jitter(0, cnt) < 1) {
-                                if (good)
-                                        rd_kafka_broker_destroy(good);
-                                rd_kafka_broker_keep(rkb);
-                                good = rkb;
-                        }
-                        cnt += 1;
-                }
-		rd_kafka_broker_unlock(rkb);
-	}
-
-        return good;
-}
-
-
-/**
- * @brief Spend at most \p timeout_ms to acquire a usable (Up && non-blocking)
- *        broker.
- *
- * @returns A probably usable broker with increased refcount, or NULL on timeout
- * @locks rd_kafka_*lock() if !do_lock
- * @locality any
- */
-rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk,
-                                                int timeout_ms,
-                                                int do_lock) {
-	const rd_ts_t ts_end = rd_timeout_init(timeout_ms);
-
-	while (1) {
-		rd_kafka_broker_t *rkb;
-		int remains;
-		int version = rd_kafka_brokers_get_state_version(rk);
-
-                /* Try non-blocking (e.g., non-fetching) brokers first. */
-                if (do_lock)
-                        rd_kafka_rdlock(rk);
-                rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP,
-                                          rd_kafka_broker_filter_non_blocking,
-                                          NULL);
-                if (!rkb)
-                        rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP,
-                                                  NULL, NULL);
-                if (do_lock)
-                        rd_kafka_rdunlock(rk);
-
-                if (rkb)
-                        return rkb;
-
-		remains = rd_timeout_remains(ts_end);
-		if (rd_timeout_expired(remains))
-			return NULL;
-
-		rd_kafka_brokers_wait_state_change(rk, version, remains);
-	}
-
-	return NULL;
-}
-
-
-
-/**
- * Returns a broker in state `state`, preferring the one with
- * matching `broker_id`.
- * Uses Reservoir sampling.
- *
- * Locks: rd_kafka_rdlock(rk) MUST be held.
- * Locality: any thread
- */
-rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id,
-					   int state) {
-	rd_kafka_broker_t *rkb, *good = NULL;
-        int cnt = 0;
-
-	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
-		rd_kafka_broker_lock(rkb);
-		if ((int)rkb->rkb_state == state) {
-                        if (broker_id != -1 && rkb->rkb_nodeid == broker_id) {
-                                if (good)
-                                        rd_kafka_broker_destroy(good);
-                                rd_kafka_broker_keep(rkb);
-                                good = rkb;
-                                rd_kafka_broker_unlock(rkb);
-                                break;
-                        }
-                        if (cnt < 1 || rd_jitter(0, cnt) < 1) {
-                                if (good)
-                                        rd_kafka_broker_destroy(good);
-                                rd_kafka_broker_keep(rkb);
-                                good = rkb;
-                        }
-                        cnt += 1;
-                }
-		rd_kafka_broker_unlock(rkb);
-	}
-
-        return good;
-}
-
-
-
-
-
-
-/**
- * Find a waitresp (rkbuf awaiting response) by the correlation id.
- */
-static rd_kafka_buf_t *rd_kafka_waitresp_find (rd_kafka_broker_t *rkb,
-					       int32_t corrid) {
-	rd_kafka_buf_t *rkbuf;
-	rd_ts_t now = rd_clock();
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	TAILQ_FOREACH(rkbuf, &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link)
-		if (rkbuf->rkbuf_corrid == corrid) {
-			/* Convert ts_sent to RTT */
-			rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent;
-			rd_avg_add(&rkb->rkb_avg_rtt, rkbuf->rkbuf_ts_sent);
-
-                        if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
-			    rd_atomic32_sub(&rkb->rkb_blocking_request_cnt,
-					    1) == 1)
-				rd_kafka_brokers_broadcast_state_change(
-					rkb->rkb_rk);
-
-			rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf);
-			return rkbuf;
-		}
-	return NULL;
-}
-
-
-
-
-/**
- * Map a response message to a request.
- */
-static int rd_kafka_req_response (rd_kafka_broker_t *rkb,
-				  rd_kafka_buf_t *rkbuf) {
-	rd_kafka_buf_t *req;
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-
-	/* Find corresponding request message by correlation id */
-	if (unlikely(!(req =
-		       rd_kafka_waitresp_find(rkb,
-					      rkbuf->rkbuf_reshdr.CorrId)))) {
-		/* unknown response. probably due to request timeout */
-                rd_atomic64_add(&rkb->rkb_c.rx_corrid_err, 1);
-		rd_rkb_dbg(rkb, BROKER, "RESPONSE",
-			   "Response for unknown CorrId %"PRId32" (timed out?)",
-			   rkbuf->rkbuf_reshdr.CorrId);
-                rd_kafka_buf_destroy(rkbuf);
-                return -1;
-	}
-
-	rd_rkb_dbg(rkb, PROTOCOL, "RECV",
-		   "Received %sResponse (v%hd, %"PRIusz" bytes, CorrId %"PRId32
-		   ", rtt %.2fms)",
-		   rd_kafka_ApiKey2str(req->rkbuf_reqhdr.ApiKey),
-                   req->rkbuf_reqhdr.ApiVersion,
-		   rkbuf->rkbuf_totlen, rkbuf->rkbuf_reshdr.CorrId,
-		   (float)req->rkbuf_ts_sent / 1000.0f);
-
-        /* Set up response reader slice starting past the response header */
-        rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf,
-                      RD_KAFKAP_RESHDR_SIZE,
-                      rd_buf_len(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE);
-
-        if (!rkbuf->rkbuf_rkb) {
-                rkbuf->rkbuf_rkb = rkb;
-                rd_kafka_broker_keep(rkbuf->rkbuf_rkb);
-        } else
-                rd_assert(rkbuf->rkbuf_rkb == rkb);
-
-	/* Call callback. */
-        rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, rkbuf, req);
-
-	return 0;
-}
-
-
-
-
-int rd_kafka_recv (rd_kafka_broker_t *rkb) {
-	rd_kafka_buf_t *rkbuf;
-	ssize_t r;
-        /* errstr is not set by buf_read errors, so default it here. */
-        char errstr[512] = "Protocol parse failure";
-        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
-	const int log_decode_errors = LOG_ERR;
-
-
-        /* It is impossible to estimate the correct size of the response
-         * so we split the read up in two parts: first we read the protocol
-         * length and correlation id (i.e., the Response header), and then
-         * when we know the full length of the response we allocate a new
-         * buffer and call receive again.
-         * All this in an async fashion (e.g., partial reads).
-         */
-	if (!(rkbuf = rkb->rkb_recv_buf)) {
-		/* No receive in progress: create new buffer */
-
-                rkbuf = rd_kafka_buf_new(2, RD_KAFKAP_RESHDR_SIZE);
-
-		rkb->rkb_recv_buf = rkbuf;
-
-                /* Set up buffer reader for the response header. */
-                rd_buf_write_ensure(&rkbuf->rkbuf_buf,
-                                    RD_KAFKAP_RESHDR_SIZE,
-                                    RD_KAFKAP_RESHDR_SIZE);
-        }
-
-        rd_dassert(rd_buf_write_remains(&rkbuf->rkbuf_buf) > 0);
-
-        r = rd_kafka_transport_recv(rkb->rkb_transport, &rkbuf->rkbuf_buf,
-                                    errstr, sizeof(errstr));
-        if (unlikely(r <= 0)) {
-                if (r == 0)
-                        return 0; /* EAGAIN */
-                err = RD_KAFKA_RESP_ERR__TRANSPORT;
-                rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
-                goto err;
-        }
-
-	if (rkbuf->rkbuf_totlen == 0) {
-		/* Packet length not known yet. */
-
-                if (unlikely(rd_buf_write_pos(&rkbuf->rkbuf_buf) <
-                             RD_KAFKAP_RESHDR_SIZE)) {
-			/* Need response header for packet length and corrid.
-			 * Wait for more data. */ 
-			return 0;
-		}
-
-                rd_assert(!rkbuf->rkbuf_rkb);
-                rkbuf->rkbuf_rkb = rkb; /* Protocol parsing code needs
-                                         * the rkb for logging, but we dont
-                                         * want to keep a reference to the
-                                         * broker this early since that extra
-                                         * refcount will mess with the broker's
-                                         * refcount-based termination code. */
-
-                /* Initialize reader */
-                rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0,
-                              RD_KAFKAP_RESHDR_SIZE);
-
-		/* Read protocol header */
-		rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.Size);
-		rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.CorrId);
-
-                rkbuf->rkbuf_rkb = NULL; /* Reset */
-
-		rkbuf->rkbuf_totlen = rkbuf->rkbuf_reshdr.Size;
-
-		/* Make sure message size is within tolerable limits. */
-		if (rkbuf->rkbuf_totlen < 4/*CorrId*/ ||
-		    rkbuf->rkbuf_totlen >
-		    (size_t)rkb->rkb_rk->rk_conf.recv_max_msg_size) {
-                        rd_snprintf(errstr, sizeof(errstr),
-                                    "Invalid response size %"PRId32" (0..%i): "
-                                    "increase receive.message.max.bytes",
-                                    rkbuf->rkbuf_reshdr.Size,
-                                    rkb->rkb_rk->rk_conf.recv_max_msg_size);
-                        err = RD_KAFKA_RESP_ERR__BAD_MSG;
-			rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
-			goto err;
-		}
-
-		rkbuf->rkbuf_totlen -= 4; /*CorrId*/
-
-		if (rkbuf->rkbuf_totlen > 0) {
-			/* Allocate another buffer that fits all data (short of
-			 * the common response header). We want all
-			 * data to be in contigious memory. */
-
-                        rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf,
-                                                   rkbuf->rkbuf_totlen);
-		}
-	}
-
-        if (rd_buf_write_pos(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE ==
-            rkbuf->rkbuf_totlen) {
-		/* Message is complete, pass it on to the original requester. */
-		rkb->rkb_recv_buf = NULL;
-                rd_atomic64_add(&rkb->rkb_c.rx, 1);
-                rd_atomic64_add(&rkb->rkb_c.rx_bytes,
-                                rd_buf_write_pos(&rkbuf->rkbuf_buf));
-		rd_kafka_req_response(rkb, rkbuf);
-	}
-
-	return 1;
-
- err_parse:
-        err = rkbuf->rkbuf_err;
- err:
-	rd_kafka_broker_fail(rkb,
-                             !rkb->rkb_rk->rk_conf.log_connection_close &&
-                             !strcmp(errstr, "Disconnected") ?
-                             LOG_DEBUG : LOG_ERR, err,
-                             "Receive failed: %s", errstr);
-	return -1;
-}
-
-
-/**
- * Linux version of socket_cb providing racefree CLOEXEC.
- */
-int rd_kafka_socket_cb_linux (int domain, int type, int protocol,
-                              void *opaque) {
-#ifdef SOCK_CLOEXEC
-        return socket(domain, type | SOCK_CLOEXEC, protocol);
-#else
-        return rd_kafka_socket_cb_generic(domain, type, protocol, opaque);
-#endif
-}
-
-/**
- * Fallback version of socket_cb NOT providing racefree CLOEXEC,
- * but setting CLOEXEC after socket creation (if FD_CLOEXEC is defined).
- */
-int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
-                                void *opaque) {
-        int s;
-        int on = 1;
-        s = (int)socket(domain, type, protocol);
-        if (s == -1)
-                return -1;
-#ifdef FD_CLOEXEC
-        fcntl(s, F_SETFD, FD_CLOEXEC, &on);
-#endif
-        return s;
-}
-
-
-/**
- * Initiate asynchronous connection attempt to the next address
- * in the broker's address list.
- * While the connect is asynchronous and its IO served in the CONNECT state,
- * the initial name resolve is blocking.
- *
- * Returns -1 on error, else 0.
- */
-static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
-	const rd_sockaddr_inx_t *sinx;
-	char errstr[512];
-
-	rd_rkb_dbg(rkb, BROKER, "CONNECT",
-		"broker in state %s connecting",
-		rd_kafka_broker_state_names[rkb->rkb_state]);
-
-	if (rd_kafka_broker_resolve(rkb) == -1)
-		return -1;
-
-	sinx = rd_sockaddr_list_next(rkb->rkb_rsal);
-
-	rd_kafka_assert(rkb->rkb_rk, !rkb->rkb_transport);
-
-	if (!(rkb->rkb_transport = rd_kafka_transport_connect(rkb, sinx,
-		errstr, sizeof(errstr)))) {
-		/* Avoid duplicate log messages */
-		if (rkb->rkb_err.err == errno)
-			rd_kafka_broker_fail(rkb, LOG_DEBUG,
-                                             RD_KAFKA_RESP_ERR__FAIL, NULL);
-		else
-			rd_kafka_broker_fail(rkb, LOG_ERR,
-                                             RD_KAFKA_RESP_ERR__TRANSPORT,
-					     "%s", errstr);
-		return -1;
-	}
-
-	rd_kafka_broker_lock(rkb);
-	rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECT);
-	rd_kafka_broker_unlock(rkb);
-
-	return 0;
-}
-
-
-/**
- * @brief Call when connection is ready to transition to fully functional
- *        UP state.
- *
- * @locality Broker thread
- */
-void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb) {
-
-	rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
-        rkb->rkb_err.err = 0;
-
-	rd_kafka_broker_lock(rkb);
-	rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
-	rd_kafka_broker_unlock(rkb);
-
-        /* Request metadata (async):
-         * try locally known topics first and if there are none try
-         * getting just the broker list. */
-        if (rd_kafka_metadata_refresh_known_topics(NULL, rkb, 0/*dont force*/,
-                                                   "connected") ==
-            RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
-                rd_kafka_metadata_refresh_brokers(NULL, rkb, "connected");
-}
-
-
-
-static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb);
-
-
-/**
- * @brief Parses and handles SaslMechanism response, transitions
- *        the broker state.
- *
- */
-static void
-rd_kafka_broker_handle_SaslHandshake (rd_kafka_t *rk,
-				      rd_kafka_broker_t *rkb,
-				      rd_kafka_resp_err_t err,
-				      rd_kafka_buf_t *rkbuf,
-				      rd_kafka_buf_t *request,
-				      void *opaque) {
-        const int log_decode_errors = LOG_ERR;
-	int32_t MechCnt;
-	int16_t ErrorCode;
-	int i = 0;
-	char *mechs = "(n/a)";
-	size_t msz, mof = 0;
-
-	if (err == RD_KAFKA_RESP_ERR__DESTROY)
-		return;
-
-        if (err)
-                goto err;
-
-	rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-        rd_kafka_buf_read_i32(rkbuf, &MechCnt);
-
-	/* Build a CSV string of supported mechanisms. */
-	msz = RD_MIN(511, MechCnt * 32);
-	mechs = rd_alloca(msz);
-	*mechs = '\0';
-
-	for (i = 0 ; i < MechCnt ; i++) {
-		rd_kafkap_str_t mech;
-		rd_kafka_buf_read_str(rkbuf, &mech);
-
-		mof += rd_snprintf(mechs+mof, msz-mof, "%s%.*s",
-				   i ? ",":"", RD_KAFKAP_STR_PR(&mech));
-
-		if (mof >= msz)
-			break;
-        }
-
-	rd_rkb_dbg(rkb,
-		   PROTOCOL | RD_KAFKA_DBG_SECURITY | RD_KAFKA_DBG_BROKER,
-		   "SASLMECHS", "Broker supported SASL mechanisms: %s",
-		   mechs);
-
-	if (ErrorCode) {
-		err = ErrorCode;
-		goto err;
-	}
-
-	/* Circle back to connect_auth() to start proper AUTH state. */
-	rd_kafka_broker_connect_auth(rkb);
-	return;
-
- err_parse:
-        err = rkbuf->rkbuf_err;
- err:
-	rd_kafka_broker_fail(rkb, LOG_ERR,
-			     RD_KAFKA_RESP_ERR__AUTHENTICATION,
-			     "SASL %s mechanism handshake failed: %s: "
-			     "broker's supported mechanisms: %s",
-                             rkb->rkb_rk->rk_conf.sasl.mechanisms,
-			     rd_kafka_err2str(err), mechs);
-}
-
-
-/**
- * @brief Transition state to:
- *        - AUTH_HANDSHAKE (if SASL is configured and handshakes supported)
- *        - AUTH (if SASL is configured but no handshake is required or
- *                not supported, or has already taken place.)
- *        - UP (if SASL is not configured)
- */
-static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb) {
-
-	if ((rkb->rkb_proto == RD_KAFKA_PROTO_SASL_PLAINTEXT ||
-	     rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL)) {
-
-		rd_rkb_dbg(rkb, SECURITY | RD_KAFKA_DBG_BROKER, "AUTH",
-			   "Auth in state %s (handshake %ssupported)",
-			   rd_kafka_broker_state_names[rkb->rkb_state],
-			   (rkb->rkb_features&RD_KAFKA_FEATURE_SASL_HANDSHAKE)
-			   ? "" : "not ");
-
-		/* Broker >= 0.10.0: send request to select mechanism */
-		if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE &&
-		    (rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
-
-			rd_kafka_broker_lock(rkb);
-			rd_kafka_broker_set_state(
-				rkb, RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE);
-			rd_kafka_broker_unlock(rkb);
-
-			rd_kafka_SaslHandshakeRequest(
-				rkb, rkb->rkb_rk->rk_conf.sasl.mechanisms,
-				RD_KAFKA_NO_REPLYQ,
-				rd_kafka_broker_handle_SaslHandshake,
-				NULL, 1 /* flash */);
-
-		} else {
-			/* Either Handshake succeeded (protocol selected)
-			 * or Handshakes were not supported.
-			 * In both cases continue with authentication. */
-			char sasl_errstr[512];
-
-			rd_kafka_broker_lock(rkb);
-			rd_kafka_broker_set_state(rkb,
-						  RD_KAFKA_BROKER_STATE_AUTH);
-			rd_kafka_broker_unlock(rkb);
-
-			if (rd_kafka_sasl_client_new(
-				    rkb->rkb_transport, sasl_errstr,
-				    sizeof(sasl_errstr)) == -1) {
-				errno = EINVAL;
-				rd_kafka_broker_fail(
-					rkb, LOG_ERR,
-					RD_KAFKA_RESP_ERR__AUTHENTICATION,
-					"Failed to initialize "
-					"SASL authentication: %s",
-					sasl_errstr);
-				return;
-			}
-
-			/* Enter non-Kafka-protocol-framed SASL communication
-			 * state handled in rdkafka_sasl.c */
-			rd_kafka_broker_lock(rkb);
-			rd_kafka_broker_set_state(rkb,
-						  RD_KAFKA_BROKER_STATE_AUTH);
-			rd_kafka_broker_unlock(rkb);
-		}
-
-		return;
-	}
-
-	/* No authentication required. */
-	rd_kafka_broker_connect_up(rkb);
-}
-
-
-/**
- * @brief Specify API versions to use for this connection.
- *
- * @param apis is an allocated list of supported partitions.
- *        If NULL the default set will be used based on the
- *        \p broker.version.fallback property.
- * @param api_cnt number of elements in \p apis
- *
- * @remark \p rkb takes ownership of \p apis.
- *
- * @locality Broker thread
- * @locks none
- */
-static void rd_kafka_broker_set_api_versions (rd_kafka_broker_t *rkb,
-					      struct rd_kafka_ApiVersion *apis,
-					      size_t api_cnt) {
-
-        rd_kafka_broker_lock(rkb);
-
-	if (rkb->rkb_ApiVersions)
-		rd_free(rkb->rkb_ApiVersions);
-
-
-	if (!apis) {
-		rd_rkb_dbg(rkb, PROTOCOL | RD_KAFKA_DBG_BROKER, "APIVERSION",
-			   "Using (configuration fallback) %s protocol features",
-			   rkb->rkb_rk->rk_conf.broker_version_fallback);
-
-
-		rd_kafka_get_legacy_ApiVersions(rkb->rkb_rk->rk_conf.
-						broker_version_fallback,
-						&apis, &api_cnt,
-						rkb->rkb_rk->rk_conf.
-						broker_version_fallback);
-
-		/* Make a copy to store on broker. */
-		rd_kafka_ApiVersions_copy(apis, api_cnt, &apis, &api_cnt);
-	}
-
-	rkb->rkb_ApiVersions = apis;
-	rkb->rkb_ApiVersions_cnt = api_cnt;
-
-	/* Update feature set based on supported broker APIs. */
-	rd_kafka_broker_features_set(rkb,
-				     rd_kafka_features_check(rkb, apis, api_cnt));
-
-        rd_kafka_broker_unlock(rkb);
-}
-
-
-/**
- * Handler for ApiVersion response.
- */
-static void
-rd_kafka_broker_handle_ApiVersion (rd_kafka_t *rk,
-				   rd_kafka_broker_t *rkb,
-				   rd_kafka_resp_err_t err,
-				   rd_kafka_buf_t *rkbuf,
-				   rd_kafka_buf_t *request, void *opaque) {
-	struct rd_kafka_ApiVersion *apis;
-	size_t api_cnt;
-
-	if (err == RD_KAFKA_RESP_ERR__DESTROY)
-		return;
-
-	err = rd_kafka_handle_ApiVersion(rk, rkb, err, rkbuf, request,
-					 &apis, &api_cnt);
-
-	if (err) {
-		rd_kafka_broker_fail(rkb, LOG_DEBUG,
-				     RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
-				     "ApiVersionRequest failed: %s: "
-				     "probably due to old broker version",
-				     rd_kafka_err2str(err));
-		return;
-	}
-
-	rd_kafka_broker_set_api_versions(rkb, apis, api_cnt);
-
-	rd_kafka_broker_connect_auth(rkb);
-}
-
-
-/**
- * Call when asynchronous connection attempt completes, either succesfully
- * (if errstr is NULL) or fails.
- *
- * Locality: broker thread
- */
-void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr) {
-
-	if (errstr) {
-		/* Connect failed */
-                rd_kafka_broker_fail(rkb,
-                                     errno != 0 && rkb->rkb_err.err == errno ?
-                                     LOG_DEBUG : LOG_ERR,
-                                     RD_KAFKA_RESP_ERR__TRANSPORT,
-                                     "%s", errstr);
-		return;
-	}
-
-	/* Connect succeeded */
-	rkb->rkb_connid++;
-	rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL,
-		   "CONNECTED", "Connected (#%d)", rkb->rkb_connid);
-	rkb->rkb_err.err = 0;
-	rkb->rkb_max_inflight = 1; /* Hold back other requests until
-				    * ApiVersion, SaslHandshake, etc
-				    * are done. */
-
-	rd_kafka_transport_poll_set(rkb->rkb_transport, POLLIN);
-
-	if (rkb->rkb_rk->rk_conf.api_version_request &&
-	    rd_interval_immediate(&rkb->rkb_ApiVersion_fail_intvl, 0, 0) > 0) {
-		/* Use ApiVersion to query broker for supported API versions. */
-		rd_kafka_broker_feature_enable(rkb, RD_KAFKA_FEATURE_APIVERSION);
-	}
-
-
-	if (rkb->rkb_features & RD_KAFKA_FEATURE_APIVERSION) {
-		/* Query broker for supported API versions.
-		 * This may fail with a disconnect on non-supporting brokers
-		 * so hold off any other requests until we get a response,
-		 * and if the connection is torn down we disable this feature. */
-		rd_kafka_broker_lock(rkb);
-		rd_kafka_broker_set_state(rkb,RD_KAFKA_BROKER_STATE_APIVERSION_QUERY);
-		rd_kafka_broker_unlock(rkb);
-
-		rd_kafka_ApiVersionRequest(
-			rkb, RD_KAFKA_NO_REPLYQ,
-			rd_kafka_broker_handle_ApiVersion, NULL,
-			1 /*Flash message: prepend to transmit queue*/);
-	} else {
-
-		/* Use configured broker.version.fallback to
-		 * figure out API versions */
-		rd_kafka_broker_set_api_versions(rkb, NULL, 0);
-
-		/* Authenticate if necessary */
-		rd_kafka_broker_connect_auth(rkb);
-	}
-
-}
-
-
-
-/**
- * @brief Checks if the given API request+version is supported by the broker.
- * @returns 1 if supported, else 0.
- * @locality broker thread
- * @locks none
- */
-static RD_INLINE int
-rd_kafka_broker_request_supported (rd_kafka_broker_t *rkb,
-                                   rd_kafka_buf_t *rkbuf) {
-        struct rd_kafka_ApiVersion skel = {
-                .ApiKey = rkbuf->rkbuf_reqhdr.ApiKey
-        };
-        struct rd_kafka_ApiVersion *ret;
-
-        if (unlikely(rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_ApiVersion))
-                return 1; /* ApiVersion requests are used to detect
-                           * the supported API versions, so should always
-                           * be allowed through. */
-
-        /* First try feature flags, if any, which may cover a larger
-         * set of APIs. */
-        if (rkbuf->rkbuf_features)
-                return (rkb->rkb_features & rkbuf->rkbuf_features) ==
-                        rkbuf->rkbuf_features;
-
-        /* Then try the ApiVersion map. */
-        ret = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt,
-                      sizeof(*rkb->rkb_ApiVersions),
-                      rd_kafka_ApiVersion_key_cmp);
-        if (!ret)
-                return 0;
-
-        return ret->MinVer <= rkbuf->rkbuf_reqhdr.ApiVersion &&
-                rkbuf->rkbuf_reqhdr.ApiVersion <= ret->MaxVer;
-}
-
-
-/**
- * Send queued messages to broker
- *
- * Locality: io thread
- */
-int rd_kafka_send (rd_kafka_broker_t *rkb) {
-	rd_kafka_buf_t *rkbuf;
-	unsigned int cnt = 0;
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
-	       rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
-	       (rkbuf = TAILQ_FIRST(&rkb->rkb_outbufs.rkbq_bufs))) {
-		ssize_t r;
-                size_t pre_of = rd_slice_offset(&rkbuf->rkbuf_reader);
-
-                /* Check for broker support */
-                if (unlikely(!rd_kafka_broker_request_supported(rkb, rkbuf))) {
-                        rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf);
-                        rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL,
-                                   "UNSUPPORTED",
-                                   "Failing %sResponse "
-                                   "(v%hd, %"PRIusz" bytes, CorrId %"PRId32"): "
-                                   "request not supported by broker "
-                                   "(missing api.version.request or "
-                                   "incorrect broker.version.fallback config?)",
-                                   rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
-                                                       ApiKey),
-                                   rkbuf->rkbuf_reqhdr.ApiVersion,
-                                   rkbuf->rkbuf_totlen,
-                                   rkbuf->rkbuf_reshdr.CorrId);
-                        rd_kafka_buf_callback(
-                                rkb->rkb_rk, rkb,
-                                RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
-                                NULL, rkbuf);
-                        continue;
-                }
-
-		/* Set CorrId header field, unless this is the latter part
-		 * of a partial send in which case the corrid has already
-		 * been set.
-		 * Due to how SSL_write() will accept a buffer but still
-		 * return 0 in some cases we can't rely on the buffer offset
-		 * but need to use corrid to check this. SSL_write() expects
-		 * us to send the same buffer again when 0 is returned.
-		 */
-		if (rkbuf->rkbuf_corrid == 0 ||
-		    rkbuf->rkbuf_connid != rkb->rkb_connid) {
-                        rd_assert(rd_slice_offset(&rkbuf->rkbuf_reader) == 0);
-			rkbuf->rkbuf_corrid = ++rkb->rkb_corrid;
-			rd_kafka_buf_update_i32(rkbuf, 4+2+2,
-						rkbuf->rkbuf_corrid);
-			rkbuf->rkbuf_connid = rkb->rkb_connid;
-		} else if (pre_of > RD_KAFKAP_REQHDR_SIZE) {
-			rd_kafka_assert(NULL,
-					rkbuf->rkbuf_connid == rkb->rkb_connid);
-                }
-
-		if (0) {
-			rd_rkb_dbg(rkb, PROTOCOL, "SEND",
-				   "Send %s corrid %"PRId32" at "
-				   "offset %"PRIusz"/%"PRIusz,
-				   rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
-						       ApiKey),
-				   rkbuf->rkbuf_corrid,
-                                   pre_of, rd_slice_size(&rkbuf->rkbuf_reader));
-		}
-
-                if ((r = rd_kafka_broker_send(rkb, &rkbuf->rkbuf_reader)) == -1)
-                        return -1;
-
-                /* Partial send? Continue next time. */
-                if (rd_slice_remains(&rkbuf->rkbuf_reader) > 0) {
-                        rd_rkb_dbg(rkb, PROTOCOL, "SEND",
-                                   "Sent partial %sRequest "
-                                   "(v%hd, "
-                                   "%"PRIdsz"+%"PRIdsz"/%"PRIusz" bytes, "
-                                   "CorrId %"PRId32")",
-                                   rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
-                                                       ApiKey),
-                                   rkbuf->rkbuf_reqhdr.ApiVersion,
-                                   (ssize_t)pre_of, r,
-                                   rd_slice_size(&rkbuf->rkbuf_reader),
-                                   rkbuf->rkbuf_corrid);
-                        return 0;
-                }
-
-		rd_rkb_dbg(rkb, PROTOCOL, "SEND",
-			   "Sent %sRequest (v%hd, %"PRIusz" bytes @ %"PRIusz", "
-			   "CorrId %"PRId32")",
-			   rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
-                           rkbuf->rkbuf_reqhdr.ApiVersion,
-                           rd_slice_size(&rkbuf->rkbuf_reader),
-                           pre_of, rkbuf->rkbuf_corrid);
-
-		/* Entire buffer sent, unlink from outbuf */
-		rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf);
-
-		/* Store time for RTT calculation */
-		rkbuf->rkbuf_ts_sent = rd_clock();
-
-                if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
-		    rd_atomic32_add(&rkb->rkb_blocking_request_cnt, 1) == 1)
-			rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
-
-		/* Put buffer on response wait list unless we are not
-		 * expecting a response (required_acks=0). */
-		if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NO_RESPONSE))
-			rd_kafka_bufq_enq(&rkb->rkb_waitresps, rkbuf);
-		else { /* Call buffer callback for delivery report. */
-                        rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
-                }
-
-		cnt++;
-	}
-
-	return cnt;
-}
-
-
-/**
- * Add 'rkbuf' to broker 'rkb's retry queue.
- */
-void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
-
-        /* Restore original replyq since replyq.q will have been NULLed
-         * by buf_callback()/replyq_enq(). */
-        if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) {
-                rkbuf->rkbuf_replyq = rkbuf->rkbuf_orig_replyq;
-                rd_kafka_replyq_clear(&rkbuf->rkbuf_orig_replyq);
-        }
-
-        /* If called from another thread than rkb's broker thread
-         * enqueue the buffer on the broker's op queue. */
-        if (!thrd_is_current(rkb->rkb_thread)) {
-                rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_RETRY);
-                rko->rko_u.xbuf.rkbuf = rkbuf;
-                rd_kafka_q_enq(rkb->rkb_ops, rko);
-                return;
-        }
-
-        rd_rkb_dbg(rkb, PROTOCOL, "RETRY",
-                   "Retrying %sRequest (v%hd, %"PRIusz" bytes, retry %d/%d)",
-                   rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
-                   rkbuf->rkbuf_reqhdr.ApiVersion,
-                   rd_slice_size(&rkbuf->rkbuf_reader),
-                   rkbuf->rkbuf_retries, rkb->rkb_rk->rk_conf.max_retries);
-
-	rd_atomic64_add(&rkb->rkb_c.tx_retries, 1);
-
-	rkbuf->rkbuf_ts_retry = rd_clock() +
-		(rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000);
-        /* Reset send offset */
-        rd_slice_seek(&rkbuf->rkbuf_reader, 0);
-	rkbuf->rkbuf_corrid = 0;
-
-	rd_kafka_bufq_enq(&rkb->rkb_retrybufs, rkbuf);
-}
-
-
-/**
- * Move buffers that have expired their retry backoff time from the 
- * retry queue to the outbuf.
- */
-static void rd_kafka_broker_retry_bufs_move (rd_kafka_broker_t *rkb) {
-	rd_ts_t now = rd_clock();
-	rd_kafka_buf_t *rkbuf;
-
-	while ((rkbuf = TAILQ_FIRST(&rkb->rkb_retrybufs.rkbq_bufs))) {
-		if (rkbuf->rkbuf_ts_retry > now)
-			break;
-
-		rd_kafka_bufq_deq(&rkb->rkb_retrybufs, rkbuf);
-
-		rd_kafka_broker_buf_enq0(rkb, rkbuf, 0/*tail*/);
-	}
-}
-
-
-/**
- * Propagate delivery report for entire message queue.
- */
-void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
-		       rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err) {
-        rd_kafka_t *rk = rkt->rkt_rk;
-
-	if (unlikely(rd_kafka_msgq_len(rkmq) == 0))
-	    return;
-
-        /* Call on_acknowledgement() interceptors */
-        rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq);
-
-        if ((rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) &&
-	    (!rk->rk_conf.dr_err_only || err)) {
-		/* Pass all messages to application thread in one op. */
-		rd_kafka_op_t *rko;
-
-		rko = rd_kafka_op_new(RD_KAFKA_OP_DR);
-		rko->rko_err = err;
-		rko->rko_u.dr.s_rkt = rd_kafka_topic_keep(rkt);
-		rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
-
-		/* Move all messages to op's msgq */
-		rd_kafka_msgq_move(&rko->rko_u.dr.msgq, rkmq);
-
-		rd_kafka_q_enq(rk->rk_rep, rko);
-
-	} else {
-		/* No delivery report callback. */
-
-                /* Destroy the messages right away. */
-                rd_kafka_msgq_purge(rk, rkmq);
-	}
-}
-
-
-
-
-
-
-
-
-
-
-
-/**
- * @brief Map and assign existing partitions to this broker using
- *        the leader-id.
- *
- * @locks none
- * @locality any
- */
-static void rd_kafka_broker_map_partitions (rd_kafka_broker_t *rkb) {
-        rd_kafka_t *rk = rkb->rkb_rk;
-        rd_kafka_itopic_t *rkt;
-        int cnt = 0;
-
-        if (rkb->rkb_nodeid == -1)
-                return;
-
-        rd_kafka_rdlock(rk);
-        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
-                int i;
-
-                rd_kafka_topic_wrlock(rkt);
-                for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) {
-                        shptr_rd_kafka_toppar_t *s_rktp = rkt->rkt_p[i];
-                        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
-
-                        /* Only map unassigned partitions matching this broker*/
-                        rd_kafka_toppar_lock(rktp);
-                        if (rktp->rktp_leader_id == rkb->rkb_nodeid &&
-                            !(rktp->rktp_leader && rktp->rktp_next_leader)) {
-                                rd_kafka_toppar_leader_update(
-                                        rktp, rktp->rktp_leader_id, rkb);
-                                cnt++;
-                        }
-                        rd_kafka_toppar_unlock(rktp);
-                }
-                rd_kafka_topic_wrunlock(rkt);
-        }
-        rd_kafka_rdunlock(rk);
-
-        rd_rkb_dbg(rkb, TOPIC|RD_KAFKA_DBG_BROKER, "LEADER",
-                   "Mapped %d partition(s) to broker", cnt);
-}
-
-
-/**
- * @brief Broker id comparator
- */
-static int rd_kafka_broker_cmp_by_id (const void *_a, const void *_b) {
-        const rd_kafka_broker_t *a = _a, *b = _b;
-        return a->rkb_nodeid - b->rkb_nodeid;
-}
-
-
-
-/**
- * @brief Serve a broker op (an op posted by another thread to be handled by
- *        this broker's thread).
- *
- * @returns 0 if calling op loop should break out, else 1 to continue.
- * @locality broker thread
- * @locks none
- */
-static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
-				      rd_kafka_op_t *rko) {
-        shptr_rd_kafka_toppar_t *s_rktp;
-        rd_kafka_toppar_t *rktp;
-        int ret = 1;
-
-	rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	switch (rko->rko_type)
-	{
-        case RD_KAFKA_OP_NODE_UPDATE:
-        {
-                enum {
-                        _UPD_NAME = 0x1,
-                        _UPD_ID = 0x2
-                } updated = 0;
-                char brokername[RD_KAFKA_NODENAME_SIZE];
-
-                /* Need kafka_wrlock for updating rk_broker_by_id */
-                rd_kafka_wrlock(rkb->rkb_rk);
-                rd_kafka_broker_lock(rkb);
-
-                if (strcmp(rkb->rkb_nodename,
-                           rko->rko_u.node.nodename)) {
-                        rd_rkb_dbg(rkb, BROKER, "UPDATE",
-                                   "Nodename changed from %s to %s",
-                                   rkb->rkb_nodename,
-                                   rko->rko_u.node.nodename);
-                        strncpy(rkb->rkb_nodename,
-                                rko->rko_u.node.nodename,
-                                sizeof(rkb->rkb_nodename)-1);
-                        updated |= _UPD_NAME;
-                }
-
-                if (rko->rko_u.node.nodeid != -1 &&
-                    rko->rko_u.node.nodeid != rkb->rkb_nodeid) {
-                        int32_t old_nodeid = rkb->rkb_nodeid;
-                        rd_rkb_dbg(rkb, BROKER, "UPDATE",
-                                   "NodeId changed from %"PRId32" to %"PRId32,
-                                   rkb->rkb_nodeid,
-                                   rko->rko_u.node.nodeid);
-
-                        rkb->rkb_nodeid = rko->rko_u.node.nodeid;
-
-                        /* Update broker_by_id sorted list */
-                        if (old_nodeid == -1)
-                                rd_list_add(&rkb->rkb_rk->rk_broker_by_id, rkb);
-                        rd_list_sort(&rkb->rkb_rk->rk_broker_by_id,
-                                     rd_kafka_broker_cmp_by_id);
-
-                        updated |= _UPD_ID;
-                }
-
-                rd_kafka_mk_brokername(brokername, sizeof(brokername),
-                                       rkb->rkb_proto,
-				       rkb->rkb_nodename, rkb->rkb_nodeid,
-				       RD_KAFKA_LEARNED);
-                if (strcmp(rkb->rkb_name, brokername)) {
-                        /* Udate the name copy used for logging. */
-                        mtx_lock(&rkb->rkb_logname_lock);
-                        rd_free(rkb->rkb_logname);
-                        rkb->rkb_logname = rd_strdup(brokername);
-                        mtx_unlock(&rkb->rkb_logname_lock);
-
-                        rd_rkb_dbg(rkb, BROKER, "UPDATE",
-                                   "Name changed from %s to %s",
-                                   rkb->rkb_name, brokername);
-                        strncpy(rkb->rkb_name, brokername,
-                                sizeof(rkb->rkb_name)-1);
-                }
-                rd_kafka_broker_unlock(rkb);
-                rd_kafka_wrunlock(rkb->rkb_rk);
-
-                if (updated & _UPD_NAME)
-                        rd_kafka_broker_fail(rkb, LOG_NOTICE,
-                                             RD_KAFKA_RESP_ERR__NODE_UPDATE,
-                                             "Broker hostname updated");
-                else if (updated & _UPD_ID) {
-                        /* Map existing partitions to this broker. */
-                        rd_kafka_broker_map_partitions(rkb);
-
-			/* If broker is currently in state up we need
-			 * to trigger a state change so it exits its
-			 * state&type based .._serve() loop. */
-                        rd_kafka_broker_lock(rkb);
-			if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP)
-				rd_kafka_broker_set_state(
-					rkb, RD_KAFKA_BROKER_STATE_UPDATE);
-                        rd_kafka_broker_unlock(rkb);
-                }
-                break;
-        }
-
-        case RD_KAFKA_OP_XMIT_BUF:
-                rd_kafka_broker_buf_enq2(rkb, rko->rko_u.xbuf.rkbuf);
-                rko->rko_u.xbuf.rkbuf = NULL; /* buffer now owned by broker */
-                if (rko->rko_replyq.q) {
-                        /* Op will be reused for forwarding response. */
-                        rko = NULL;
-                }
-                break;
-
-        case RD_KAFKA_OP_XMIT_RETRY:
-                rd_kafka_broker_buf_retry(rkb, rko->rko_u.xbuf.rkbuf);
-                rko->rko_u.xbuf.rkbuf = NULL;
-                break;
-
-        case RD_KAFKA_OP_PARTITION_JOIN:
-                /*
-		 * Add partition to broker toppars
-		 */
-                rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
-                rd_kafka_toppar_lock(rktp);
-
-                /* Abort join if instance is terminating */
-                if (rd_kafka_terminating(rkb->rkb_rk) ||
-		    (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE)) {
-                        rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
-                                   "Topic %s [%"PRId32"]: not joining broker: "
-                                   "%s",
-                                   rktp->rktp_rkt->rkt_topic->str,
-                                   rktp->rktp_partition,
-				   rd_kafka_terminating(rkb->rkb_rk) ?
-				   "instance is terminating" :
-				   "partition removed");
-
-                        rd_kafka_broker_destroy(rktp->rktp_next_leader);
-                        rktp->rktp_next_leader = NULL;
-                        rd_kafka_toppar_unlock(rktp);
-                        break;
-                }
-
-                /* See if we are still the next leader */
-                if (rktp->rktp_next_leader != rkb) {
-                        rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
-                                   "Topic %s [%"PRId32"]: not joining broker "
-                                   "(next leader %s)",
-                                   rktp->rktp_rkt->rkt_topic->str,
-                                   rktp->rktp_partition,
-                                   rktp->rktp_next_leader ?
-                                   rd_kafka_broker_name(rktp->rktp_next_leader):
-                                   "(none)");
-
-                        /* Need temporary refcount so we can safely unlock
-                         * after q_enq(). */
-                        s_rktp = rd_kafka_toppar_keep(rktp);
-
-                        /* No, forward this op to the new next leader. */
-                        rd_kafka_q_enq(rktp->rktp_next_leader->rkb_ops, rko);
-                        rko = NULL;
-
-                        rd_kafka_toppar_unlock(rktp);
-                        rd_kafka_toppar_destroy(s_rktp);
-
-                        break;
-                }
-
-                rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
-                           "Topic %s [%"PRId32"]: joining broker (rktp %p)",
-                           rktp->rktp_rkt->rkt_topic->str,
-                           rktp->rktp_partition, rktp);
-
-                rd_kafka_assert(NULL, rktp->rktp_s_for_rkb == NULL);
-		rktp->rktp_s_for_rkb = rd_kafka_toppar_keep(rktp);
-                rd_kafka_broker_lock(rkb);
-		TAILQ_INSERT_TAIL(&rkb->rkb_toppars, rktp, rktp_rkblink);
-		rkb->rkb_toppar_cnt++;
-                rd_kafka_broker_unlock(rkb);
-		rktp->rktp_leader = rkb;
-                rktp->rktp_msgq_wakeup_fd = rkb->rkb_toppar_wakeup_fd;
-                rd_kafka_broker_keep(rkb);
-
-                rd_kafka_broker_destroy(rktp->rktp_next_leader);
-                rktp->rktp_next_leader = NULL;
-
-                rd_kafka_toppar_unlock(rktp);
-
-		rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
-                break;
-
-        case RD_KAFKA_OP_PARTITION_LEAVE:
-                /*
-		 * Remove partition from broker toppars
-		 */
-                rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
-
-		rd_kafka_toppar_lock(rktp);
-
-		/* Multiple PARTITION_LEAVEs are possible during partition
-		 * migration, make sure we're supposed to handle this one. */
-		if (unlikely(rktp->rktp_leader != rkb)) {
-			rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
-				   "Topic %s [%"PRId32"]: "
-				   "ignoring PARTITION_LEAVE: "
-				   "broker is not leader (%s)",
-				   rktp->rktp_rkt->rkt_topic->str,
-				   rktp->rktp_partition,
-				   rktp->rktp_leader ?
-				   rd_kafka_broker_name(rktp->rktp_leader) :
-				   "none");
-			rd_kafka_toppar_unlock(rktp);
-			break;
-		}
-		rd_kafka_toppar_unlock(rktp);
-
-		/* Remove from fetcher list */
-		rd_kafka_toppar_fetch_decide(rktp, rkb, 1/*force remove*/);
-
-		rd_kafka_toppar_lock(rktp);
-
-		rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
-			   "Topic %s [%"PRId32"]: leaving broker "
-			   "(%d messages in xmitq, next leader %s, rktp %p)",
-			   rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-			   rd_kafka_msgq_len(&rktp->rktp_xmit_msgq),
-			   rktp->rktp_next_leader ?
-			   rd_kafka_broker_name(rktp->rktp_next_leader) :
-			   "(none)", rktp);
-
-		/* Prepend xmitq(broker-local) messages to the msgq(global).
-		 * There is no msgq_prepend() so we append msgq to xmitq
-		 * and then move the queue altogether back over to msgq. */
-		rd_kafka_msgq_concat(&rktp->rktp_xmit_msgq,
-				     &rktp->rktp_msgq);
-		rd_kafka_msgq_move(&rktp->rktp_msgq, &rktp->rktp_xmit_msgq);
-
-                rd_kafka_broker_lock(rkb);
-		TAILQ_REMOVE(&rkb->rkb_toppars, rktp, rktp_rkblink);
-		rkb->rkb_toppar_cnt--;
-                rd_kafka_broker_unlock(rkb);
-                rd_kafka_broker_destroy(rktp->rktp_leader);
-                rktp->rktp_msgq_wakeup_fd = -1;
-		rktp->rktp_leader = NULL;
-
-                /* Need to hold on to a refcount past q_enq() and
-                 * unlock() below */
-                s_rktp = rktp->rktp_s_for_rkb;
-                rktp->rktp_s_for_rkb = NULL;
-
-                if (rktp->rktp_next_leader) {
-                        /* There is a next leader we need to migrate to. */
-                        rko->rko_type = RD_KAFKA_OP_PARTITION_JOIN;
-                        rd_kafka_q_enq(rktp->rktp_next_leader->rkb_ops, rko);
-                        rko = NULL;
-                } else {
-			rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
-				   "Topic %s [%"PRId32"]: no next leader, "
-				   "failing %d message(s) in partition queue",
-				   rktp->rktp_rkt->rkt_topic->str,
-				   rktp->rktp_partition,
-				   rd_kafka_msgq_len(&rktp->rktp_msgq));
-			rd_kafka_assert(NULL, rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
-			rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
-					 rd_kafka_terminating(rkb->rkb_rk) ?
-					 RD_KAFKA_RESP_ERR__DESTROY :
-					 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
-
-		}
-
-                rd_kafka_toppar_unlock(rktp);
-                rd_kafka_toppar_destroy(s_rktp);
-
-		rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
-                break;
-
-        case RD_KAFKA_OP_TERMINATE:
-                /* nop: just a wake-up. */
-                if (rkb->rkb_blocking_max_ms > 1)
-                        rkb->rkb_blocking_max_ms = 1; /* Speed up termination*/
-                rd_rkb_dbg(rkb, BROKER, "TERM",
-                           "Received TERMINATE op in state %s: "
-                           "%d refcnts, %d toppar(s), %d fetch toppar(s), "
-                           "%d outbufs, %d waitresps, %d retrybufs",
-                           rd_kafka_broker_state_names[rkb->rkb_state],
-                           rd_refcnt_get(&rkb->rkb_refcnt),
-                           rkb->rkb_toppar_cnt, rkb->rkb_fetch_toppar_cnt,
-                           (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
-                           (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps),
-                           (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs));
-                ret = 0;
-                break;
-
-        default:
-                rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type");
-                break;
-        }
-
-        if (rko)
-                rd_kafka_op_destroy(rko);
-
-        return ret;
-}
-
-
-/**
- * @brief Serve broker ops and IOs.
- *
- * @param abs_timeout Maximum block time (absolute time).
- *
- * @locality broker thread
- * @locks none
- */
-static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb,
-                                   rd_ts_t abs_timeout) {
-        rd_kafka_op_t *rko;
-        rd_ts_t now;
-        int initial_state = rkb->rkb_state;
-        int remains_ms = rd_timeout_remains(abs_timeout);
-
-        /* Serve broker ops */
-        while ((rko = rd_kafka_q_pop(rkb->rkb_ops,
-                                     !rkb->rkb_transport ?
-                                     remains_ms : RD_POLL_NOWAIT,
-                                     0))
-               && rd_kafka_broker_op_serve(rkb, rko))
-                remains_ms = RD_POLL_NOWAIT;
-
-        /* If the broker state changed in op_serve() we minimize
-         * the IO timeout since our caller might want to exit out of
-         * its loop on state change. */
-        if (likely(rkb->rkb_transport != NULL)) {
-                int blocking_max_ms;
-
-                if ((int)rkb->rkb_state != initial_state)
-                        blocking_max_ms = 0;
-                else {
-                        int remains_ms = rd_timeout_remains(abs_timeout);
-                        if (remains_ms == RD_POLL_INFINITE ||
-                            remains_ms > rkb->rkb_blocking_max_ms)
-                                remains_ms = rkb->rkb_blocking_max_ms;
-                        blocking_max_ms = remains_ms;
-                }
-
-                /* Serve IO events */
-                rd_kafka_transport_io_serve(rkb->rkb_transport,
-                                            blocking_max_ms);
-        }
-
-        /* Scan wait-response queue for timeouts. */
-        now = rd_clock();
-        if (rd_interval(&rkb->rkb_timeout_scan_intvl, 1000000, now) > 0)
-                rd_kafka_broker_timeout_scan(rkb, now);
-}
-
-
-/**
- * @brief Serve the toppar's assigned to this broker.
- *
- * @returns the minimum Fetch backoff time (abs timestamp) for the
- *          partitions to fetch.
- *
- * @locality broker thread
- */
-static rd_ts_t rd_kafka_broker_toppars_serve (rd_kafka_broker_t *rkb) {
-        rd_kafka_toppar_t *rktp, *rktp_tmp;
-        rd_ts_t min_backoff = RD_TS_MAX;
-
-        TAILQ_FOREACH_SAFE(rktp, &rkb->rkb_toppars, rktp_rkblink, rktp_tmp) {
-                rd_ts_t backoff;
-
-                /* Serve toppar to update desired rktp state */
-                backoff = rd_kafka_broker_consumer_toppar_serve(rkb, rktp);
-                if (backoff < min_backoff)
-                        min_backoff = backoff;
-        }
-
-        return min_backoff;
-}
-
-
-/**
- * Idle function for unassigned brokers
- * If \p timeout_ms is not RD_POLL_INFINITE the serve loop will be exited
- * regardless of state after this long (approximately).
- */
-static void rd_kafka_broker_ua_idle (rd_kafka_broker_t *rkb, int timeout_ms) {
-        int initial_state = rkb->rkb_state;
-        rd_ts_t abs_timeout;
-
-        if (rd_kafka_terminating(rkb->rkb_rk))
-                timeout_ms = 1;
-        else if (timeout_ms == RD_POLL_INFINITE)
-                timeout_ms = rkb->rkb_blocking_max_ms;
-
-        abs_timeout = rd_timeout_init(timeout_ms);
-
-        /* Since ua_idle is used during connection setup
-         * in state ..BROKER_STATE_CONNECT we only run this loop
-         * as long as the state remains the same as the initial, on a state
-         * change - most likely to UP, a correct serve() function
-         * should be used instead. */
-        while (!rd_kafka_broker_terminating(rkb) &&
-               (int)rkb->rkb_state == initial_state &&
-               !rd_timeout_expired(rd_timeout_remains(abs_timeout))) {
-
-                rd_kafka_broker_toppars_serve(rkb);
-                rd_kafka_broker_serve(rkb, abs_timeout);
-        }
-}
-
-
-/**
- * @brief Serve a toppar for producing.
- *
- * @param next_wakeup will be updated to when the next wake-up/attempt is
- *                    desired, only lower (sooner) values will be set.
- *
- * Locks: toppar_lock(rktp) MUST be held. 
- * Returns the number of messages produced.
- */
-static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
-                                           rd_kafka_toppar_t *rktp,
-                                           int do_timeout_scan,
-                                           rd_ts_t now,
-                                           rd_ts_t *next_wakeup) {
-        int cnt = 0;
-        int r;
-
-        rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
-                   "%.*s [%"PRId32"] %i+%i msgs",
-                   RD_KAFKAP_STR_PR(rktp->rktp_rkt->
-                                    rkt_topic),
-                   rktp->rktp_partition,
-                   rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt),
-                   rd_atomic32_get(&rktp->rktp_xmit_msgq.
-                                   rkmq_msg_cnt));
-
-        if (rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt) > 0)
-                rd_kafka_msgq_concat(&rktp->rktp_xmit_msgq, &rktp->rktp_msgq);
-
-        /* Timeout scan */
-        if (unlikely(do_timeout_scan)) {
-                rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout);
-
-                if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
-                                           &timedout, now)) {
-                        /* Trigger delivery report for timed out messages */
-                        rd_kafka_dr_msgq(rktp->rktp_rkt, &timedout,
-                                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
-                }
-        }
-
-        r = rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt);
-        if (r == 0)
-                return 0;
-
-        /* Attempt to fill the batch size, but limit
-         * our waiting to queue.buffering.max.ms
-         * and batch.num.messages. */
-        if (r < rkb->rkb_rk->rk_conf.batch_num_messages) {
-                rd_kafka_msg_t *rkm_oldest;
-                rd_ts_t wait_max;
-
-                rkm_oldest = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);
-                if (unlikely(!rkm_oldest))
-                        return 0;
-
-                /* Calculate maximum wait-time to
-                 * honour queue.buffering.max.ms contract. */
-                wait_max = rd_kafka_msg_enq_time(rkm_oldest) +
-                        (rkb->rkb_rk->rk_conf.buffering_max_ms * 1000);
-                if (wait_max > now) {
-                        if (wait_max < *next_wakeup)
-                                *next_wakeup = wait_max;
-                        /* Wait for more messages or queue.buffering.max.ms
-                         * to expire. */
-                        return 0;
-                }
-        }
-
-        /* Send Produce requests for this toppar */
-        while (1) {
-                r = rd_kafka_ProduceRequest(rkb, rktp);
-                if (likely(r > 0))
-                        cnt += r;
-                else
-                        break;
-        }
-
-        return cnt;
-}
-
-
-/**
- * Producer serving
- */
-static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) {
-        rd_interval_t timeout_scan;
-
-        rd_interval_init(&timeout_scan);
-
-        rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
-
-	rd_kafka_broker_lock(rkb);
-
-	while (!rd_kafka_broker_terminating(rkb) &&
-	       rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) {
-		rd_kafka_toppar_t *rktp;
-		int cnt;
-		rd_ts_t now;
-                rd_ts_t next_wakeup;
-                int do_timeout_scan = 0;
-
-		rd_kafka_broker_unlock(rkb);
-
-		now = rd_clock();
-                next_wakeup = now + (rkb->rkb_rk->rk_conf.
-                                     socket_blocking_max_ms * 1000);
-
-                if (rd_interval(&timeout_scan, 1000*1000, now) >= 0)
-                        do_timeout_scan = 1;
-
-		do {
-			cnt = 0;
-
-                        /* Serve each toppar */
-			TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
-                                /* Serve toppar op queue */
-                                rd_kafka_toppar_lock(rktp);
-                                if (unlikely(rktp->rktp_leader != rkb)) {
-                                        /* Currently migrating away from this
-                                         * broker. */
-                                        rd_kafka_toppar_unlock(rktp);
-                                        continue;
-                                }
-				if (unlikely(RD_KAFKA_TOPPAR_IS_PAUSED(rktp))) {
-					/* Partition is paused */
-					rd_kafka_toppar_unlock(rktp);
-					continue;
-				}
-                                /* Try producing toppar */
-                                cnt += rd_kafka_toppar_producer_serve(
-                                        rkb, rktp, do_timeout_scan, now,
-                                        &next_wakeup);
-
-                                rd_kafka_toppar_unlock(rktp);
-			}
-
-		} while (cnt);
-
-		/* Check and move retry buffers */
-		if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0))
-			rd_kafka_broker_retry_bufs_move(rkb);
-
-                rkb->rkb_blocking_max_ms = (int)
-                        (next_wakeup > now ? (next_wakeup - now) / 1000 : 0);
-		rd_kafka_broker_serve(rkb, next_wakeup);
-
-		rd_kafka_broker_lock(rkb);
-	}
-
-	rd_kafka_broker_unlock(rkb);
-}
-
-
-
-
-
-
-
-/**
- * Backoff the next Fetch request (due to error).
- */
-static void rd_kafka_broker_fetch_backoff (rd_kafka_broker_t *rkb,
-                                           rd_kafka_resp_err_t err) {
-        int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;
-        rkb->rkb_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000);
-        rd_rkb_dbg(rkb, FETCH, "BACKOFF",
-                   "Fetch backoff for %dms: %s",
-                   backoff_ms, rd_kafka_err2str(err));
-}
-
-/**
- * @brief Backoff the next Fetch for specific partition
- */
-static void rd_kafka_toppar_fetch_backoff (rd_kafka_broker_t *rkb,
-                                           rd_kafka_toppar_t *rktp,
-                                           rd_kafka_resp_err_t err) {
-        int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;
-        rktp->rktp_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000);
-        rd_rkb_dbg(rkb, FETCH, "BACKOFF",
-                   "%s [%"PRId32"]: Fetch backoff for %dms: %s",
-                   rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-                   backoff_ms, rd_kafka_err2str(err));
-}
-
-
-/**
- * Parses and handles a Fetch reply.
- * Returns 0 on success or an error code on failure.
- */
-static rd_kafka_resp_err_t
-rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
-			     rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request) {
-	int32_t TopicArrayCnt;
-	int i;
-        const int log_decode_errors = LOG_ERR;
-        shptr_rd_kafka_itopic_t *s_rkt = NULL;
-
-	if (rd_kafka_buf_ApiVersion(request) >= 1) {
-		int32_t Throttle_Time;
-		rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
-
-		rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
-					  Throttle_Time);
-	}
-
-	rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
-	/* Verify that TopicArrayCnt seems to be in line with remaining size */
-	rd_kafka_buf_check_len(rkbuf,
-			       TopicArrayCnt * (3/*topic min size*/ +
-						4/*PartitionArrayCnt*/ +
-						4+2+8+4/*inner header*/));
-
-	for (i = 0 ; i < TopicArrayCnt ; i++) {
-		rd_kafkap_str_t topic;
-		int32_t fetch_version;
-		int32_t PartitionArrayCnt;
-		int j;
-
-		rd_kafka_buf_read_str(rkbuf, &topic);
-		rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
-
-                s_rkt = rd_kafka_topic_find0(rkb->rkb_rk, &topic);
-
-		for (j = 0 ; j < PartitionArrayCnt ; j++) {
-			struct rd_kafka_toppar_ver *tver, tver_skel;
-                        rd_kafka_toppar_t *rktp;
-                        shptr_rd_kafka_toppar_t *s_rktp = NULL;
-                        rd_slice_t save_slice;
-                        struct {
-                                int32_t Partition;
-                                int16_t ErrorCode;
-                                int64_t HighwaterMarkOffset;
-                                int64_t LastStableOffset;       /* v4 */
-                                int32_t MessageSetSize;
-                        } hdr;
-                        rd_kafka_resp_err_t err;
-
-			rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
-			rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
-			rd_kafka_buf_read_i64(rkbuf, &hdr.HighwaterMarkOffset);
-
-                        if (rd_kafka_buf_ApiVersion(request) == 4) {
-                                int32_t AbortedTxCnt;
-                                rd_kafka_buf_read_i64(rkbuf,
-                                                      &hdr.LastStableOffset);
-                                rd_kafka_buf_read_i32(rkbuf, &AbortedTxCnt);
-                                /* Ignore aborted transactions for now */
-                                if (AbortedTxCnt > 0)
-                                        rd_kafka_buf_skip(rkbuf,
-                                                          AbortedTxCnt * (8+8));
-                        } else
-                                hdr.LastStableOffset = -1;
-
-			rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSetSize);
-
-                        if (unlikely(hdr.MessageSetSize < 0))
-                                rd_kafka_buf_parse_fail(
-                                        rkbuf,
-                                        "%.*s [%"PRId32"]: "
-                                        "invalid MessageSetSize %"PRId32,
-                                        RD_KAFKAP_STR_PR(&topic),
-                                        hdr.Partition,
-                                        hdr.MessageSetSize);
-
-			/* Look up topic+partition */
-                        if (likely(s_rkt != NULL)) {
-                                rd_kafka_itopic_t *rkt;
-                                rkt = rd_kafka_topic_s2i(s_rkt);
-                                rd_kafka_topic_rdlock(rkt);
-                                s_rktp = rd_kafka_toppar_get(
-                                        rkt, hdr.Partition, 0/*no ua-on-miss*/);
-                                rd_kafka_topic_rdunlock(rkt);
-                        }
-
-			if (unlikely(!s_rkt || !s_rktp)) {
-				rd_rkb_dbg(rkb, TOPIC, "UNKTOPIC",
-					   "Received Fetch response "
-					   "(error %hu) for unknown topic "
-					   "%.*s [%"PRId32"]: ignoring",
-					   hdr.ErrorCode,
-					   RD_KAFKAP_STR_PR(&topic),
-					   hdr.Partition);
-				rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
-				continue;
-			}
-
-                        rktp = rd_kafka_toppar_s2i(s_rktp);
-
-                        rd_kafka_toppar_lock(rktp);
-                        /* Make sure toppar hasn't moved to another broker
-                         * during the lifetime of the request. */
-                        if (unlikely(rktp->rktp_leader != rkb)) {
-                                rd_kafka_toppar_unlock(rktp);
-                                rd_rkb_dbg(rkb, MSG, "FETCH",
-

<TRUNCATED>