You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:47 UTC

[19/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
deleted file mode 100644
index 2d023b4..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
+++ /dev/null
@@ -1,1848 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include <stdarg.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_request.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_metadata.h"
-#include "rdkafka_msgset.h"
-
-#include "rdrand.h"
-
-/**
- * Kafka protocol request and response handling.
- * All of this code runs in the broker thread and uses op queues for
- * propagating results back to the various sub-systems operating in
- * other threads.
- */
-
-
-/**
- * @brief Decide action(s) to take based on the returned error code.
- *
- * The optional var-args is a .._ACTION_END terminated list
- * of action,error tuples which overrides the general behaviour.
- * It is to be read as: for \p error, return \p action(s).
- */
-int rd_kafka_err_action (rd_kafka_broker_t *rkb,
-			 rd_kafka_resp_err_t err,
-			 rd_kafka_buf_t *rkbuf,
-			 rd_kafka_buf_t *request, ...) {
-	va_list ap;
-        int actions = 0;
-	int exp_act;
-
-	/* Match explicitly defined error mappings first. */
-	va_start(ap, request);
-	while ((exp_act = va_arg(ap, int))) {
-		int exp_err = va_arg(ap, int);
-
-		if (err == exp_err)
-			actions |= exp_act;
-	}
-	va_end(ap);
-
-	if (err && rkb && request)
-                rd_rkb_dbg(rkb, BROKER, "REQERR",
-                           "%sRequest failed: %s: explicit actions 0x%x",
-                           rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
-                           rd_kafka_err2str(err), actions);
-
-	/* Explicit error match. */
-	if (actions)
-		return actions;
-
-	/* Default error matching */
-        switch (err)
-        {
-        case RD_KAFKA_RESP_ERR_NO_ERROR:
-                break;
-        case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
-        case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
-        case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
-        case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
-        case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
-        case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
-        case RD_KAFKA_RESP_ERR__WAIT_COORD:
-                /* Request metadata information update */
-                actions |= RD_KAFKA_ERR_ACTION_REFRESH;
-                break;
-        case RD_KAFKA_RESP_ERR__TIMED_OUT:
-        case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
-                /* Broker-side request handling timeout */
-	case RD_KAFKA_RESP_ERR__TRANSPORT:
-		/* Broker connection down */
-		actions |= RD_KAFKA_ERR_ACTION_RETRY;
-		break;
-        case RD_KAFKA_RESP_ERR__DESTROY:
-	case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
-        case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
-        default:
-                actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
-                break;
-        }
-
-        return actions;
-}
-
-
-/**
- * Send GroupCoordinatorRequest
- */
-void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
-                                       const rd_kafkap_str_t *cgrp,
-                                       rd_kafka_replyq_t replyq,
-                                       rd_kafka_resp_cb_t *resp_cb,
-                                       void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1,
-                                         RD_KAFKAP_STR_SIZE(cgrp));
-        rd_kafka_buf_write_kstr(rkbuf, cgrp);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-/**
- * @brief Parses and handles Offset replies.
- *
- * Returns the parsed offsets (and errors) in \p offsets
- *
- * @returns 0 on success, else an error.
- */
-rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
-                                            rd_kafka_broker_t *rkb,
-                                            rd_kafka_resp_err_t err,
-                                            rd_kafka_buf_t *rkbuf,
-                                            rd_kafka_buf_t *request,
-                                            rd_kafka_topic_partition_list_t
-                                            *offsets) {
-
-        const int log_decode_errors = LOG_ERR;
-        int16_t ErrorCode = 0;
-        int32_t TopicArrayCnt;
-        int actions;
-        int16_t api_version;
-
-        if (err) {
-                ErrorCode = err;
-                goto err;
-        }
-
-        api_version = request->rkbuf_reqhdr.ApiVersion;
-
-        /* NOTE:
-         * Broker may return offsets in a different constellation than
-         * in the original request .*/
-
-        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
-        while (TopicArrayCnt-- > 0) {
-                rd_kafkap_str_t ktopic;
-                int32_t PartArrayCnt;
-                char *topic_name;
-
-                rd_kafka_buf_read_str(rkbuf, &ktopic);
-                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
-                RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);
-
-                while (PartArrayCnt-- > 0) {
-                        int32_t kpartition;
-                        int32_t OffsetArrayCnt;
-                        int64_t Offset = -1;
-                        rd_kafka_topic_partition_t *rktpar;
-
-                        rd_kafka_buf_read_i32(rkbuf, &kpartition);
-                        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
-                        if (api_version == 1) {
-                                int64_t Timestamp;
-                                rd_kafka_buf_read_i64(rkbuf, &Timestamp);
-                                rd_kafka_buf_read_i64(rkbuf, &Offset);
-                        } else if (api_version == 0) {
-                                rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
-                                /* We only request one offset so just grab
-                                 * the first one. */
-                                while (OffsetArrayCnt-- > 0)
-                                        rd_kafka_buf_read_i64(rkbuf, &Offset);
-                        } else {
-                                rd_kafka_assert(NULL, !*"NOTREACHED");
-                        }
-
-                        rktpar = rd_kafka_topic_partition_list_add(
-                                offsets, topic_name, kpartition);
-                        rktpar->err = ErrorCode;
-                        rktpar->offset = Offset;
-                }
-        }
-
-        goto done;
-
- err_parse:
-        ErrorCode = rkbuf->rkbuf_err;
- err:
-        actions = rd_kafka_err_action(
-                rkb, ErrorCode, rkbuf, request,
-                RD_KAFKA_ERR_ACTION_PERMANENT,
-                RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-
-                RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
-                RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
-
-                RD_KAFKA_ERR_ACTION_END);
-
-        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
-                char tmp[256];
-                /* Re-query for leader */
-                rd_snprintf(tmp, sizeof(tmp),
-                            "OffsetRequest failed: %s",
-                            rd_kafka_err2str(ErrorCode));
-                rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
-                                                       tmp);
-        }
-
-        if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
-                if (rd_kafka_buf_retry(rkb, request))
-                        return RD_KAFKA_RESP_ERR__IN_PROGRESS;
-                /* FALLTHRU */
-        }
-
-done:
-        return ErrorCode;
-}
-
-
-
-
-
-
-/**
- * Send OffsetRequest for toppar 'rktp'.
- */
-void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
-                             rd_kafka_topic_partition_list_t *partitions,
-                             int16_t api_version,
-                             rd_kafka_replyq_t replyq,
-                             rd_kafka_resp_cb_t *resp_cb,
-                             void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-        int i;
-        size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
-        const char *last_topic = "";
-        int32_t topic_cnt = 0, part_cnt = 0;
-
-        rd_kafka_topic_partition_list_sort_by_topic(partitions);
-
-        rkbuf = rd_kafka_buf_new_request(
-                rkb, RD_KAFKAP_Offset, 1,
-                /* ReplicaId+TopicArrayCnt+Topic */
-                4+4+100+
-                /* PartArrayCnt */
-                4 +
-                /* partition_cnt * Partition+Time+MaxNumOffs */
-                (partitions->cnt * (4+8+4)));
-
-        /* ReplicaId */
-        rd_kafka_buf_write_i32(rkbuf, -1);
-        /* TopicArrayCnt */
-        of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */
-
-        for (i = 0 ; i < partitions->cnt ; i++) {
-                const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
-
-                if (strcmp(rktpar->topic, last_topic)) {
-                        /* Finish last topic, if any. */
-                        if (of_PartArrayCnt > 0)
-                                rd_kafka_buf_update_i32(rkbuf,
-                                                        of_PartArrayCnt,
-                                                        part_cnt);
-
-                        /* Topic */
-                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
-                        topic_cnt++;
-                        last_topic = rktpar->topic;
-                        /* New topic so reset partition count */
-                        part_cnt = 0;
-
-                        /* PartitionArrayCnt: updated later */
-                        of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-                }
-
-                /* Partition */
-                rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
-                part_cnt++;
-
-                /* Time/Offset */
-                rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
-
-                if (api_version == 0) {
-                        /* MaxNumberOfOffsets */
-                        rd_kafka_buf_write_i32(rkbuf, 1);
-                }
-        }
-
-        if (of_PartArrayCnt > 0) {
-                rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
-                rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
-        }
-
-        rd_kafka_buf_ApiVersion_set(rkbuf, api_version,
-                                    api_version == 1 ?
-                                    RD_KAFKA_FEATURE_OFFSET_TIME : 0);
-
-        rd_rkb_dbg(rkb, TOPIC, "OFFSET",
-                   "OffsetRequest (v%hd, opv %d) "
-                   "for %"PRId32" topic(s) and %"PRId32" partition(s)",
-                   api_version, rkbuf->rkbuf_replyq.version,
-                   topic_cnt, partitions->cnt);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * Generic handler for OffsetFetch responses.
- * Offsets for included partitions will be propagated through the passed
- * 'offsets' list.
- *
- * \p update_toppar: update toppar's committed_offset
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
-			     rd_kafka_broker_t *rkb,
-			     rd_kafka_resp_err_t err,
-			     rd_kafka_buf_t *rkbuf,
-			     rd_kafka_buf_t *request,
-			     rd_kafka_topic_partition_list_t *offsets,
-			     int update_toppar) {
-        const int log_decode_errors = LOG_ERR;
-        int32_t TopicArrayCnt;
-        int64_t offset = RD_KAFKA_OFFSET_INVALID;
-        rd_kafkap_str_t metadata;
-        int i;
-        int actions;
-        int seen_cnt = 0;
-
-        if (err)
-                goto err;
-
-        /* Set default offset for all partitions. */
-        rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0,
-                                                  RD_KAFKA_OFFSET_INVALID,
-						  0 /* !is commit */);
-
-        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
-        for (i = 0 ; i < TopicArrayCnt ; i++) {
-                rd_kafkap_str_t topic;
-                int32_t PartArrayCnt;
-                char *topic_name;
-                int j;
-
-                rd_kafka_buf_read_str(rkbuf, &topic);
-                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
-                RD_KAFKAP_STR_DUPA(&topic_name, &topic);
-
-                for (j = 0 ; j < PartArrayCnt ; j++) {
-                        int32_t partition;
-                        shptr_rd_kafka_toppar_t *s_rktp;
-                        rd_kafka_topic_partition_t *rktpar;
-                        int16_t err2;
-
-                        rd_kafka_buf_read_i32(rkbuf, &partition);
-                        rd_kafka_buf_read_i64(rkbuf, &offset);
-                        rd_kafka_buf_read_str(rkbuf, &metadata);
-                        rd_kafka_buf_read_i16(rkbuf, &err2);
-
-                        rktpar = rd_kafka_topic_partition_list_find(offsets,
-                                                                    topic_name,
-                                                                    partition);
-                        if (!rktpar) {
-				rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
-					   "OffsetFetchResponse: %s [%"PRId32"] "
-					   "not found in local list: ignoring",
-					   topic_name, partition);
-                                continue;
-			}
-
-                        seen_cnt++;
-
-			if (!(s_rktp = rktpar->_private)) {
-				s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk,
-							      topic_name,
-							      partition, 0, 0);
-				/* May be NULL if topic is not locally known */
-				rktpar->_private = s_rktp;
-			}
-
-			/* broker reports invalid offset as -1 */
-			if (offset == -1)
-				rktpar->offset = RD_KAFKA_OFFSET_INVALID;
-			else
-				rktpar->offset = offset;
-                        rktpar->err = err2;
-
-			rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
-				   "OffsetFetchResponse: %s [%"PRId32"] offset %"PRId64,
-				   topic_name, partition, offset);
-
-			if (update_toppar && !err2 && s_rktp) {
-				rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
-				/* Update toppar's committed offset */
-				rd_kafka_toppar_lock(rktp);
-				rktp->rktp_committed_offset = rktpar->offset;
-				rd_kafka_toppar_unlock(rktp);
-			}
-
-
-                        if (rktpar->metadata)
-                                rd_free(rktpar->metadata);
-
-                        if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
-                                rktpar->metadata = NULL;
-                                rktpar->metadata_size = 0;
-                        } else {
-                                rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
-                                rktpar->metadata_size =
-                                        RD_KAFKAP_STR_LEN(&metadata);
-                        }
-                }
-        }
-
-
-err:
-        rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
-                   "OffsetFetch for %d/%d partition(s) returned %s",
-                   seen_cnt,
-                   offsets ? offsets->cnt : -1, rd_kafka_err2str(err));
-
-        actions = rd_kafka_err_action(rkb, err, rkbuf, request,
-				      RD_KAFKA_ERR_ACTION_END);
-
-        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
-                /* Re-query for coordinator */
-                rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL,
-                                 RD_KAFKA_NO_REPLYQ,
-				 RD_KAFKA_OP_COORD_QUERY, err);
-                if (request) {
-                        /* Schedule a retry */
-                        rd_kafka_buf_keep(request);
-                        rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
-                }
-        }
-
-	return err;
-
- err_parse:
-        err = rkbuf->rkbuf_err;
-        goto err;
-}
-
-
-
-/**
- * opaque=rko wrapper for handle_OffsetFetch.
- * rko->rko_payload MUST be a `rd_kafka_topic_partition_list_t *` which will
- * be filled in with fetch offsets.
- *
- * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
- *
- * Locality: cgrp's broker thread
- */
-void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
-				     rd_kafka_broker_t *rkb,
-                                     rd_kafka_resp_err_t err,
-                                     rd_kafka_buf_t *rkbuf,
-                                     rd_kafka_buf_t *request,
-                                     void *opaque) {
-        rd_kafka_op_t *rko = opaque;
-        rd_kafka_op_t *rko_reply;
-        rd_kafka_topic_partition_list_t *offsets;
-
-	RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);
-
-        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
-                /* Termination, quick cleanup. */
-                rd_kafka_op_destroy(rko);
-                return;
-        }
-
-        offsets = rd_kafka_topic_partition_list_copy(
-                rko->rko_u.offset_fetch.partitions);
-
-        rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY);
-        rko_reply->rko_err = err;
-        rko_reply->rko_u.offset_fetch.partitions = offsets;
-        rko_reply->rko_u.offset_fetch.do_free = 1;
-	if (rko->rko_rktp)
-		rko_reply->rko_rktp = rd_kafka_toppar_keep(
-			rd_kafka_toppar_s2i(rko->rko_rktp));
-
-	/* If all partitions already had usable offsets then there
-	 * was no request sent and thus no reply, the offsets list is
-	 * good to go. */
-	if (rkbuf)
-		rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf,
-					    request, offsets, 0);
-
-	rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);
-
-        rd_kafka_op_destroy(rko);
-}
-
-
-
-
-
-
-/**
- * Send OffsetFetchRequest for toppar.
- *
- * Any partition with a usable offset will be ignored, if all partitions
- * have usable offsets then no request is sent at all but an empty
- * reply is enqueued on the replyq.
- */
-void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
-                                  int16_t api_version,
-                                  rd_kafka_topic_partition_list_t *parts,
-				  rd_kafka_replyq_t replyq,
-                                  rd_kafka_resp_cb_t *resp_cb,
-                                  void *opaque) {
-	rd_kafka_buf_t *rkbuf;
-        size_t of_TopicCnt;
-        int TopicCnt = 0;
-        ssize_t of_PartCnt = -1;
-        const char *last_topic = NULL;
-        int PartCnt = 0;
-	int tot_PartCnt = 0;
-        int i;
-
-        rkbuf = rd_kafka_buf_new_request(
-                rkb, RD_KAFKAP_OffsetFetch, 1,
-                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) +
-                4 +
-                (parts->cnt * 32));
-
-
-        /* ConsumerGroup */
-        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id);
-
-        /* Sort partitions by topic */
-        rd_kafka_topic_partition_list_sort_by_topic(parts);
-
-	/* TopicArrayCnt */
-        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
-
-        for (i = 0 ; i < parts->cnt ; i++) {
-                rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
-
-		/* Ignore partitions with a usable offset. */
-		if (rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
-		    rktpar->offset != RD_KAFKA_OFFSET_STORED) {
-			rd_rkb_dbg(rkb, TOPIC, "OFFSET",
-				   "OffsetFetchRequest: skipping %s [%"PRId32"] "
-				   "with valid offset %s",
-				   rktpar->topic, rktpar->partition,
-				   rd_kafka_offset2str(rktpar->offset));
-			continue;
-		}
-
-                if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
-                        /* New topic */
-
-                        /* Finalize previous PartitionCnt */
-                        if (PartCnt > 0)
-                                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
-                                                        PartCnt);
-
-                        /* TopicName */
-                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
-                        /* PartitionCnt, finalized later */
-                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-                        PartCnt = 0;
-			last_topic = rktpar->topic;
-                        TopicCnt++;
-                }
-
-                /* Partition */
-                rd_kafka_buf_write_i32(rkbuf,  rktpar->partition);
-                PartCnt++;
-		tot_PartCnt++;
-        }
-
-        /* Finalize previous PartitionCnt */
-        if (PartCnt > 0)
-                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,  PartCnt);
-
-        /* Finalize TopicCnt */
-        rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
-
-        rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
-
-	rd_rkb_dbg(rkb, TOPIC, "OFFSET",
-		   "OffsetFetchRequest(v%d) for %d/%d partition(s)",
-                   api_version, tot_PartCnt, parts->cnt);
-
-	if (tot_PartCnt == 0) {
-		/* No partitions needs OffsetFetch, enqueue empty
-		 * response right away. */
-                rkbuf->rkbuf_replyq = replyq;
-                rkbuf->rkbuf_cb     = resp_cb;
-                rkbuf->rkbuf_opaque = opaque;
-		rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
-		return;
-	}
-
-
-
-	rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * @remark \p offsets may be NULL if \p err is set
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
-			      rd_kafka_broker_t *rkb,
-			      rd_kafka_resp_err_t err,
-			      rd_kafka_buf_t *rkbuf,
-			      rd_kafka_buf_t *request,
-			      rd_kafka_topic_partition_list_t *offsets) {
-        const int log_decode_errors = LOG_ERR;
-        int32_t TopicArrayCnt;
-        int16_t ErrorCode = 0, last_ErrorCode = 0;
-	int errcnt = 0;
-        int i;
-	int actions;
-
-        if (err)
-		goto err;
-
-        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
-        for (i = 0 ; i < TopicArrayCnt ; i++) {
-                rd_kafkap_str_t topic;
-                char *topic_str;
-                int32_t PartArrayCnt;
-                int j;
-
-                rd_kafka_buf_read_str(rkbuf, &topic);
-                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
-                RD_KAFKAP_STR_DUPA(&topic_str, &topic);
-
-                for (j = 0 ; j < PartArrayCnt ; j++) {
-                        int32_t partition;
-                        rd_kafka_topic_partition_t *rktpar;
-
-                        rd_kafka_buf_read_i32(rkbuf, &partition);
-                        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
-                        rktpar = rd_kafka_topic_partition_list_find(
-                                offsets, topic_str, partition);
-
-                        if (!rktpar) {
-                                /* Received offset for topic/partition we didn't
-                                 * ask for, this shouldn't really happen. */
-                                continue;
-                        }
-
-                        rktpar->err = ErrorCode;
-			if (ErrorCode) {
-				last_ErrorCode = ErrorCode;
-				errcnt++;
-			}
-                }
-        }
-
-	/* If all partitions failed use error code
-	 * from last partition as the global error. */
-	if (offsets && errcnt == offsets->cnt)
-		err = last_ErrorCode;
-	goto done;
-
- err_parse:
-        err = rkbuf->rkbuf_err;
-
- err:
-        actions = rd_kafka_err_action(
-		rkb, err, rkbuf, request,
-
-		RD_KAFKA_ERR_ACTION_PERMANENT,
-		RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
-
-		RD_KAFKA_ERR_ACTION_RETRY,
-		RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
-
-		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
-		RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
-
-		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
-		RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
-
-		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
-		RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
-
-		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
-		RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
-
-		RD_KAFKA_ERR_ACTION_RETRY,
-		RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
-
-		RD_KAFKA_ERR_ACTION_PERMANENT,
-		RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
-
-		RD_KAFKA_ERR_ACTION_PERMANENT,
-		RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
-
-		RD_KAFKA_ERR_ACTION_PERMANENT,
-		RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
-
-		RD_KAFKA_ERR_ACTION_END);
-
-	if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) {
-		/* Mark coordinator dead or re-query for coordinator.
-		 * ..dead() will trigger a re-query. */
-		if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
-			rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
-						 "OffsetCommitRequest failed");
-		else
-			rd_kafka_cgrp_coord_query(rk->rk_cgrp,
-						  "OffsetCommitRequest failed");
-	}
-	if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
-		if (rd_kafka_buf_retry(rkb, request))
-			return RD_KAFKA_RESP_ERR__IN_PROGRESS;
-		/* FALLTHRU */
-	}
-
- done:
-	return err;
-}
-
-
-
-
-/**
- * @brief Send OffsetCommitRequest for a list of partitions.
- *
- * @returns 0 if none of the partitions in \p offsets had valid offsets,
- *          else 1.
- */
-int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
-                                   rd_kafka_cgrp_t *rkcg,
-                                   int16_t api_version,
-                                   rd_kafka_topic_partition_list_t *offsets,
-                                   rd_kafka_replyq_t replyq,
-                                   rd_kafka_resp_cb_t *resp_cb,
-                                   void *opaque, const char *reason) {
-	rd_kafka_buf_t *rkbuf;
-        ssize_t of_TopicCnt = -1;
-        int TopicCnt = 0;
-        const char *last_topic = NULL;
-        ssize_t of_PartCnt = -1;
-        int PartCnt = 0;
-	int tot_PartCnt = 0;
-        int i;
-
-        rd_kafka_assert(NULL, offsets != NULL);
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit,
-                                         1, 100 + (offsets->cnt * 128));
-
-        /* ConsumerGroup */
-        rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id);
-
-        /* v1,v2 */
-        if (api_version >= 1) {
-                /* ConsumerGroupGenerationId */
-                rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id);
-                /* ConsumerId */
-                rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id);
-                /* v2: RetentionTime */
-                if (api_version == 2)
-                        rd_kafka_buf_write_i64(rkbuf, -1);
-        }
-
-        /* Sort offsets by topic */
-        rd_kafka_topic_partition_list_sort_by_topic(offsets);
-
-        /* TopicArrayCnt: Will be updated when we know the number of topics. */
-        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-
-        for (i = 0 ; i < offsets->cnt ; i++) {
-                rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
-
-		/* Skip partitions with invalid offset. */
-		if (rktpar->offset < 0)
-			continue;
-
-                if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
-                        /* New topic */
-
-                        /* Finalize previous PartitionCnt */
-                        if (PartCnt > 0)
-                                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
-                                                        PartCnt);
-
-                        /* TopicName */
-                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
-                        /* PartitionCnt, finalized later */
-                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-                        PartCnt = 0;
-			last_topic = rktpar->topic;
-                        TopicCnt++;
-                }
-
-                /* Partition */
-                rd_kafka_buf_write_i32(rkbuf,  rktpar->partition);
-                PartCnt++;
-		tot_PartCnt++;
-
-                /* Offset */
-                rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
-
-                /* v1: TimeStamp */
-                if (api_version == 1)
-                        rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time
-
-                /* Metadata */
-		/* Java client 0.9.0 and broker <0.10.0 can't parse
-		 * Null metadata fields, so as a workaround we send an
-		 * empty string if it's Null. */
-		if (!rktpar->metadata)
-			rd_kafka_buf_write_str(rkbuf, "", 0);
-		else
-			rd_kafka_buf_write_str(rkbuf,
-					       rktpar->metadata,
-					       rktpar->metadata_size);
-        }
-
-	if (tot_PartCnt == 0) {
-		/* No topic+partitions had valid offsets to commit. */
-		rd_kafka_replyq_destroy(&replyq);
-		rd_kafka_buf_destroy(rkbuf);
-		return 0;
-	}
-
-        /* Finalize previous PartitionCnt */
-        if (PartCnt > 0)
-                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,  PartCnt);
-
-        /* Finalize TopicCnt */
-        rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
-
-        rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
-
-        rd_rkb_dbg(rkb, TOPIC, "OFFSET",
-                   "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
-                   api_version, tot_PartCnt, offsets->cnt, reason);
-
-	rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
-	return 1;
-
-}
-
-
-
-/**
- * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
- *        enveloping buffer \p rkbuf.
- */
-static void rd_kafka_group_MemberState_consumer_write (
-        rd_kafka_buf_t *env_rkbuf,
-        const rd_kafka_group_member_t *rkgm) {
-        rd_kafka_buf_t *rkbuf;
-        int i;
-        const char *last_topic = NULL;
-        size_t of_TopicCnt;
-        ssize_t of_PartCnt = -1;
-        int TopicCnt = 0;
-        int PartCnt = 0;
-        rd_slice_t slice;
-
-        rkbuf = rd_kafka_buf_new(1, 100);
-        rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
-        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
-        for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) {
-                const rd_kafka_topic_partition_t *rktpar;
-
-                rktpar = &rkgm->rkgm_assignment->elems[i];
-
-                if (!last_topic || strcmp(last_topic,
-                                          rktpar->topic)) {
-                        if (last_topic)
-                                /* Finalize previous PartitionCnt */
-                                rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
-                                                        PartCnt);
-                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
-                        /* Updated later */
-                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-                        PartCnt = 0;
-                        last_topic = rktpar->topic;
-                        TopicCnt++;
-                }
-
-                rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
-                PartCnt++;
-        }
-
-        if (of_PartCnt != -1)
-                rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt);
-        rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);
-
-        rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);
-
-        /* Get pointer to binary buffer */
-        rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);
-
-        /* Write binary buffer as Kafka Bytes to enveloping buffer. */
-        rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
-        rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);
-
-        rd_kafka_buf_destroy(rkbuf);
-}
-
-/**
- * Send SyncGroupRequest
- */
-void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
-                                const rd_kafkap_str_t *group_id,
-                                int32_t generation_id,
-                                const rd_kafkap_str_t *member_id,
-                                const rd_kafka_group_member_t
-                                *assignments,
-                                int assignment_cnt,
-                                rd_kafka_replyq_t replyq,
-                                rd_kafka_resp_cb_t *resp_cb,
-                                void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-        int i;
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup,
-                                         1,
-                                         RD_KAFKAP_STR_SIZE(group_id) +
-                                         4 /* GenerationId */ +
-                                         RD_KAFKAP_STR_SIZE(member_id) +
-                                         4 /* array size group_assignment */ +
-                                         (assignment_cnt * 100/*guess*/));
-        rd_kafka_buf_write_kstr(rkbuf, group_id);
-        rd_kafka_buf_write_i32(rkbuf, generation_id);
-        rd_kafka_buf_write_kstr(rkbuf, member_id);
-        rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
-
-        for (i = 0 ; i < assignment_cnt ; i++) {
-                const rd_kafka_group_member_t *rkgm = &assignments[i];
-
-                rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
-                rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
-        }
-
-        /* This is a blocking request */
-        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
-        rkbuf->rkbuf_ts_timeout = rd_clock() +
-                (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000) +
-                (3*1000*1000/* 3s grace period*/);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-/**
- * Handler for SyncGroup responses
- * opaque must be the cgrp handle.
- */
-void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
-				rd_kafka_broker_t *rkb,
-                                rd_kafka_resp_err_t err,
-                                rd_kafka_buf_t *rkbuf,
-                                rd_kafka_buf_t *request,
-                                void *opaque) {
-        rd_kafka_cgrp_t *rkcg = opaque;
-        const int log_decode_errors = LOG_ERR;
-        int16_t ErrorCode = 0;
-        rd_kafkap_bytes_t MemberState = RD_ZERO_INIT;
-        int actions;
-
-	if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
-		rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
-			     "SyncGroup response: discarding outdated request "
-			     "(now in join-state %s)",
-			     rd_kafka_cgrp_join_state_names[rkcg->
-							    rkcg_join_state]);
-		return;
-	}
-
-        if (err) {
-                ErrorCode = err;
-                goto err;
-        }
-
-        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-        rd_kafka_buf_read_bytes(rkbuf, &MemberState);
-
-err:
-        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
-				      RD_KAFKA_ERR_ACTION_END);
-
-        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
-                /* Re-query for coordinator */
-                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
-				 RD_KAFKA_OP_COORD_QUERY,
-                                 ErrorCode);
-                /* FALLTHRU */
-        }
-
-        rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
-                     "SyncGroup response: %s (%d bytes of MemberState data)",
-                     rd_kafka_err2str(ErrorCode),
-                     RD_KAFKAP_BYTES_LEN(&MemberState));
-
-        if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
-                return; /* Termination */
-
-        rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState);
-
-        return;
-
- err_parse:
-        ErrorCode = rkbuf->rkbuf_err;
-        goto err;
-}
-
-
-/**
- * Send JoinGroupRequest
- */
-void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
-                                const rd_kafkap_str_t *group_id,
-                                const rd_kafkap_str_t *member_id,
-                                const rd_kafkap_str_t *protocol_type,
-				const rd_list_t *topics,
-                                rd_kafka_replyq_t replyq,
-                                rd_kafka_resp_cb_t *resp_cb,
-                                void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-        rd_kafka_t *rk = rkb->rkb_rk;
-        rd_kafka_assignor_t *rkas;
-        int i;
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup,
-                                         1,
-                                         RD_KAFKAP_STR_SIZE(group_id) +
-                                         4 /* sessionTimeoutMs */ +
-                                         RD_KAFKAP_STR_SIZE(member_id) +
-                                         RD_KAFKAP_STR_SIZE(protocol_type) +
-                                         4 /* array count GroupProtocols */ +
-                                         (rd_list_cnt(topics) * 100));
-        rd_kafka_buf_write_kstr(rkbuf, group_id);
-        rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
-        rd_kafka_buf_write_kstr(rkbuf, member_id);
-        rd_kafka_buf_write_kstr(rkbuf, protocol_type);
-        rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
-
-        RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
-                rd_kafkap_bytes_t *member_metadata;
-		if (!rkas->rkas_enabled)
-			continue;
-                rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
-                member_metadata = rkas->rkas_get_metadata_cb(rkas, topics);
-                rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
-                rd_kafkap_bytes_destroy(member_metadata);
-        }
-
-        /* This is a blocking request */
-        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
-        rkbuf->rkbuf_ts_timeout = rd_clock() +
-                (rk->rk_conf.group_session_timeout_ms * 1000) +
-                (3*1000*1000/* 3s grace period*/);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-
-
-/**
- * Send LeaveGroupRequest
- */
-void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
-                                 const rd_kafkap_str_t *group_id,
-                                 const rd_kafkap_str_t *member_id,
-                                 rd_kafka_replyq_t replyq,
-                                 rd_kafka_resp_cb_t *resp_cb,
-                                 void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup,
-                                         1,
-                                         RD_KAFKAP_STR_SIZE(group_id) +
-                                         RD_KAFKAP_STR_SIZE(member_id));
-        rd_kafka_buf_write_kstr(rkbuf, group_id);
-        rd_kafka_buf_write_kstr(rkbuf, member_id);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * Handler for LeaveGroup responses
- * opaque must be the cgrp handle.
- */
-void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
-				 rd_kafka_broker_t *rkb,
-                                 rd_kafka_resp_err_t err,
-                                 rd_kafka_buf_t *rkbuf,
-                                 rd_kafka_buf_t *request,
-                                 void *opaque) {
-        rd_kafka_cgrp_t *rkcg = opaque;
-        const int log_decode_errors = LOG_ERR;
-        int16_t ErrorCode = 0;
-        int actions;
-
-        if (err) {
-                ErrorCode = err;
-                goto err;
-        }
-
-        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
-
-err:
-        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
-				      RD_KAFKA_ERR_ACTION_END);
-
-        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
-                /* Re-query for coordinator */
-                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
-				 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
-                /* Schedule a retry */
-                rd_kafka_buf_keep(request);
-                rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
-                return;
-        }
-
-        if (ErrorCode)
-                rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
-                             "LeaveGroup response: %s",
-                             rd_kafka_err2str(ErrorCode));
-
- err_parse:
-        ErrorCode = rkbuf->rkbuf_err;
-        goto err;
-}
-
-
-
-
-
-
-/**
- * Send HeartbeatRequest
- */
-void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
-                                const rd_kafkap_str_t *group_id,
-                                int32_t generation_id,
-                                const rd_kafkap_str_t *member_id,
-                                rd_kafka_replyq_t replyq,
-                                rd_kafka_resp_cb_t *resp_cb,
-                                void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-
-        rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
-                   "Heartbeat for group \"%s\" generation id %"PRId32,
-                   group_id->str, generation_id);
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat,
-                                         1,
-                                         RD_KAFKAP_STR_SIZE(group_id) +
-                                         4 /* GenerationId */ +
-                                         RD_KAFKAP_STR_SIZE(member_id));
-
-        rd_kafka_buf_write_kstr(rkbuf, group_id);
-        rd_kafka_buf_write_i32(rkbuf, generation_id);
-        rd_kafka_buf_write_kstr(rkbuf, member_id);
-
-        rkbuf->rkbuf_ts_timeout = rd_clock() +
-                (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-/**
- * Send ListGroupsRequest
- */
-void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
-                                 rd_kafka_replyq_t replyq,
-                                 rd_kafka_resp_cb_t *resp_cb,
-                                 void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * Send DescribeGroupsRequest
- */
-void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
-                                     const char **groups, int group_cnt,
-                                     rd_kafka_replyq_t replyq,
-                                     rd_kafka_resp_cb_t *resp_cb,
-                                     void *opaque) {
-        rd_kafka_buf_t *rkbuf;
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups,
-                                         1, 32*group_cnt);
-
-        rd_kafka_buf_write_i32(rkbuf, group_cnt);
-        while (group_cnt-- > 0)
-                rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-/**
- * @brief Generic handler for Metadata responses
- *
- * @locality rdkafka main thread
- */
-static void rd_kafka_handle_Metadata (rd_kafka_t *rk,
-                                      rd_kafka_broker_t *rkb,
-                                      rd_kafka_resp_err_t err,
-                                      rd_kafka_buf_t *rkbuf,
-                                      rd_kafka_buf_t *request,
-                                      void *opaque) {
-        rd_kafka_op_t *rko = opaque; /* Possibly NULL */
-        struct rd_kafka_metadata *md = NULL;
-        const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
-
-        rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
-                        thrd_is_current(rk->rk_thread));
-
-	/* Avoid metadata updates when we're terminating. */
-	if (rd_kafka_terminating(rkb->rkb_rk))
-                err = RD_KAFKA_RESP_ERR__DESTROY;
-
-	if (unlikely(err)) {
-                if (err == RD_KAFKA_RESP_ERR__DESTROY) {
-                        /* Terminating */
-                        goto done;
-                }
-
-                /* FIXME: handle errors */
-                rd_rkb_log(rkb, LOG_WARNING, "METADATA",
-                           "Metadata request failed: %s (%dms)",
-                           rd_kafka_err2str(err),
-			   (int)(request->rkbuf_ts_sent/1000));
-	} else {
-
-                if (!topics)
-                        rd_rkb_dbg(rkb, METADATA, "METADATA",
-                                   "===== Received metadata: %s =====",
-                                   request->rkbuf_u.Metadata.reason);
-                else
-                        rd_rkb_dbg(rkb, METADATA, "METADATA",
-                                   "===== Received metadata "
-                                   "(for %d requested topics): %s =====",
-                                   rd_list_cnt(topics),
-                                   request->rkbuf_u.Metadata.reason);
-
-                md = rd_kafka_parse_Metadata(rkb, request, rkbuf);
-		if (!md) {
-			if (rd_kafka_buf_retry(rkb, request))
-				return;
-			err = RD_KAFKA_RESP_ERR__BAD_MSG;
-                }
-        }
-
-        if (rko && rko->rko_replyq.q) {
-                /* Reply to metadata requester, passing on the metadata.
-                 * Reuse requesting rko for the reply. */
-                rko->rko_err = err;
-                rko->rko_u.metadata.md = md;
-
-                rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
-                rko = NULL;
-        } else {
-                if (md)
-                        rd_free(md);
-        }
-
- done:
-        if (rko)
-                rd_kafka_op_destroy(rko);
-}
-
-
-
-/**
- * @brief Construct MetadataRequest (does not send)
- *
- * \p topics is a list of topic names (char *) to request.
- *
- * !topics          - only request brokers (if supported by broker, else
- *                    all topics)
- *  topics.cnt==0   - all topics in cluster are requested
- *  topics.cnt >0   - only specified topics are requested
- *
- * @param reason    - metadata request reason
- * @param rko       - (optional) rko with replyq for handling response.
- *                    Specifying an rko forces a metadata request even if
- *                    there is already a matching one in-transit.
- *
- * If full metadata for all topics is requested (or all brokers, which
- * results in all-topics on older brokers) and there is already a full request
- * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
- * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
- * is sent regardless.
- */
-rd_kafka_resp_err_t
-rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
-                          const rd_list_t *topics, const char *reason,
-                          rd_kafka_op_t *rko) {
-        rd_kafka_buf_t *rkbuf;
-        int16_t ApiVersion = 0;
-        int features;
-        int topic_cnt = topics ? rd_list_cnt(topics) : 0;
-        int *full_incr = NULL;
-
-        ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb,
-                                                          RD_KAFKAP_Metadata,
-                                                          0, 2,
-                                                          &features);
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1,
-                                         4 + (50 * topic_cnt));
-
-        if (!reason)
-                reason = "";
-
-        rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);
-
-        if (!topics && ApiVersion >= 1) {
-                /* a null(0) array (in the protocol) represents no topics */
-                rd_kafka_buf_write_i32(rkbuf, 0);
-                rd_rkb_dbg(rkb, METADATA, "METADATA",
-                           "Request metadata for brokers only: %s", reason);
-                full_incr = &rkb->rkb_rk->rk_metadata_cache.
-                        rkmc_full_brokers_sent;
-
-        } else {
-                if (topic_cnt == 0 && !rko)
-                        full_incr = &rkb->rkb_rk->rk_metadata_cache.
-                                rkmc_full_topics_sent;
-
-                if (topic_cnt == 0 && ApiVersion >= 1)
-                        rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/
-                else
-                        rd_kafka_buf_write_i32(rkbuf, topic_cnt);
-
-                if (topic_cnt == 0) {
-                        rkbuf->rkbuf_u.Metadata.all_topics = 1;
-                        rd_rkb_dbg(rkb, METADATA, "METADATA",
-                                   "Request metadata for all topics: "
-                                   "%s", reason);
-                } else
-                        rd_rkb_dbg(rkb, METADATA, "METADATA",
-                                   "Request metadata for %d topic(s): "
-                                   "%s", topic_cnt, reason);
-        }
-
-        if (full_incr) {
-                /* Avoid multiple outstanding full requests
-                 * (since they are redundant and side-effect-less).
-                 * Forced requests (app using metadata() API) are passed
-                 * through regardless. */
-
-                mtx_lock(&rkb->rkb_rk->rk_metadata_cache.
-                         rkmc_full_lock);
-                if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
-                        mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
-                                   rkmc_full_lock);
-                        rd_rkb_dbg(rkb, METADATA, "METADATA",
-                                   "Skipping metadata request: %s: "
-                                   "full request already in-transit",
-                                   reason);
-                        rd_kafka_buf_destroy(rkbuf);
-                        return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
-                }
-
-                (*full_incr)++;
-                mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
-                           rkmc_full_lock);
-                rkbuf->rkbuf_u.Metadata.decr = full_incr;
-                rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk->
-                        rk_metadata_cache.rkmc_full_lock;
-        }
-
-
-        if (topic_cnt > 0) {
-                char *topic;
-                int i;
-
-                /* Maintain a copy of the topics list so we can purge
-                 * hints from the metadata cache on error. */
-                rkbuf->rkbuf_u.Metadata.topics =
-                        rd_list_copy(topics, rd_list_string_copy, NULL);
-
-                RD_LIST_FOREACH(topic, topics, i)
-                        rd_kafka_buf_write_str(rkbuf, topic, -1);
-
-        }
-
-        rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
-        /* Metadata requests are part of the important control plane
-         * and should go before other requests (Produce, Fetch, etc). */
-        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLASH;
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
-                                       /* Handle response thru rk_ops,
-                                        * but forward parsed result to
-                                        * rko's replyq when done. */
-                                       RD_KAFKA_REPLYQ(rkb->rkb_rk->
-                                                       rk_ops, 0),
-                                       rd_kafka_handle_Metadata, rko);
-
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-
-
-
-
-
-/**
- * @brief Parses and handles ApiVersion reply.
- *
- * @param apis will be allocated, populated and sorted
- *             with broker's supported APIs.
- * @param api_cnt will be set to the number of elements in \p *apis
-
- * @returns 0 on success, else an error.
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
-			    rd_kafka_broker_t *rkb,
-			    rd_kafka_resp_err_t err,
-			    rd_kafka_buf_t *rkbuf,
-			    rd_kafka_buf_t *request,
-			    struct rd_kafka_ApiVersion **apis,
-			    size_t *api_cnt) {
-        const int log_decode_errors = LOG_ERR;
-        int actions;
-	int32_t ApiArrayCnt;
-	int16_t ErrorCode;
-	int i = 0;
-
-	*apis = NULL;
-
-        if (err)
-                goto err;
-
-	rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-	if ((err = ErrorCode))
-		goto err;
-
-        rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt);
-	if (ApiArrayCnt > 1000)
-		rd_kafka_buf_parse_fail(rkbuf,
-					"ApiArrayCnt %"PRId32" out of range",
-					ApiArrayCnt);
-
-	rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
-		   "Broker API support:");
-
-	*apis = malloc(sizeof(**apis) * ApiArrayCnt);
-
-	for (i = 0 ; i < ApiArrayCnt ; i++) {
-		struct rd_kafka_ApiVersion *api = &(*apis)[i];
-
-		rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
-		rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
-		rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);
-
-		rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
-			   "  ApiKey %s (%hd) Versions %hd..%hd",
-			   rd_kafka_ApiKey2str(api->ApiKey),
-			   api->ApiKey, api->MinVer, api->MaxVer);
-        }
-
-	*api_cnt = ApiArrayCnt;
-        qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);
-
-	goto done;
-
- err_parse:
-        err = rkbuf->rkbuf_err;
- err:
-	if (*apis)
-		rd_free(*apis);
-
-        actions = rd_kafka_err_action(
-		rkb, err, rkbuf, request,
-		RD_KAFKA_ERR_ACTION_END);
-
-	if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
-		if (rd_kafka_buf_retry(rkb, request))
-			return RD_KAFKA_RESP_ERR__IN_PROGRESS;
-		/* FALLTHRU */
-	}
-
-done:
-        return err;
-}
-
-
-
-/**
- * Send ApiVersionRequest (KIP-35)
- */
-void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
-				 rd_kafka_replyq_t replyq,
-				 rd_kafka_resp_cb_t *resp_cb,
-				 void *opaque, int flash_msg) {
-        rd_kafka_buf_t *rkbuf;
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4);
-	rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
-	rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */
-
-	/* Non-supporting brokers will tear down the connection when they
-	 * receive an unknown API request, so dont retry request on failure. */
-	rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
-
-	/* 0.9.0.x brokers will not close the connection on unsupported
-	 * API requests, so we minimize the timeout for the request.
-	 * This is a regression on the broker part. */
-	rkbuf->rkbuf_ts_timeout = rd_clock() + (rkb->rkb_rk->rk_conf.api_version_request_timeout_ms * 1000);
-
-        if (replyq.q)
-                rd_kafka_broker_buf_enq_replyq(rkb,
-                                               rkbuf, replyq, resp_cb, opaque);
-	else /* in broker thread */
-		rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
-}
-
-
-/**
- * Send SaslHandshakeRequest (KIP-43)
- */
-void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
-				    const char *mechanism,
-				    rd_kafka_replyq_t replyq,
-				    rd_kafka_resp_cb_t *resp_cb,
-				    void *opaque, int flash_msg) {
-        rd_kafka_buf_t *rkbuf;
-	int mechlen = (int)strlen(mechanism);
-
-        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake,
-                                         1, RD_KAFKAP_STR_SIZE0(mechlen));
-	rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
-	rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);
-
-	/* Non-supporting brokers will tear down the conneciton when they
-	 * receive an unknown API request or where the SASL GSSAPI
-	 * token type is not recognized, so dont retry request on failure. */
-	rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
-
-	/* 0.9.0.x brokers will not close the connection on unsupported
-	 * API requests, so we minimize the timeout of the request.
-	 * This is a regression on the broker part. */
-	if (!rkb->rkb_rk->rk_conf.api_version_request &&
-            rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000)
-		rkbuf->rkbuf_ts_timeout = rd_clock() + (10 * 1000 * 1000);
-
-	if (replyq.q)
-		rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq,
-                                               resp_cb, opaque);
-	else /* in broker thread */
-		rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
-}
-
-
-
-
-/**
- * @brief Parses a Produce reply.
- * @returns 0 on success or an error code on failure.
- * @locality broker thread
- */
-static rd_kafka_resp_err_t
-rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb,
-                               rd_kafka_toppar_t *rktp,
-                               rd_kafka_buf_t *rkbuf,
-                               rd_kafka_buf_t *request,
-                               int64_t *offsetp,
-                               int64_t *timestampp) {
-        int32_t TopicArrayCnt;
-        int32_t PartitionArrayCnt;
-        struct {
-                int32_t Partition;
-                int16_t ErrorCode;
-                int64_t Offset;
-        } hdr;
-        const int log_decode_errors = LOG_ERR;
-
-        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
-        if (TopicArrayCnt != 1)
-                goto err;
-
-        /* Since we only produce to one single topic+partition in each
-         * request we assume that the reply only contains one topic+partition
-         * and that it is the same that we requested.
-         * If not the broker is buggy. */
-        rd_kafka_buf_skip_str(rkbuf);
-        rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
-
-        if (PartitionArrayCnt != 1)
-                goto err;
-
-        rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
-        rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
-        rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
-
-        *offsetp = hdr.Offset;
-
-        *timestampp = -1;
-        if (request->rkbuf_reqhdr.ApiVersion >= 2) {
-                rd_kafka_buf_read_i64(rkbuf, timestampp);
-        }
-
-        if (request->rkbuf_reqhdr.ApiVersion >= 1) {
-                int32_t Throttle_Time;
-                rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
-
-                rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
-                                          Throttle_Time);
-        }
-
-
-        return hdr.ErrorCode;
-
- err_parse:
-        return rkbuf->rkbuf_err;
- err:
-        return RD_KAFKA_RESP_ERR__BAD_MSG;
-}
-
-
-/**
- * @brief Handle ProduceResponse
- *
- * @locality broker thread
- */
-static void rd_kafka_handle_Produce (rd_kafka_t *rk,
-                                     rd_kafka_broker_t *rkb,
-                                     rd_kafka_resp_err_t err,
-                                     rd_kafka_buf_t *reply,
-                                     rd_kafka_buf_t *request,
-                                     void *opaque) {
-        shptr_rd_kafka_toppar_t *s_rktp = opaque; /* from ProduceRequest() */
-        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
-        int64_t offset = RD_KAFKA_OFFSET_INVALID;
-        int64_t timestamp = -1;
-
-        /* Parse Produce reply (unless the request errored) */
-        if (!err && reply)
-                err = rd_kafka_handle_Produce_parse(rkb, rktp,
-                                                    reply, request,
-                                                    &offset, &timestamp);
-
-
-        if (likely(!err)) {
-                rd_rkb_dbg(rkb, MSG, "MSGSET",
-                           "%s [%"PRId32"]: MessageSet with %i message(s) "
-                           "delivered",
-                           rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-                           rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt));
-
-        } else {
-                /* Error */
-                int actions;
-
-                actions = rd_kafka_err_action(
-                        rkb, err, reply, request,
-
-                        RD_KAFKA_ERR_ACTION_REFRESH,
-                        RD_KAFKA_RESP_ERR__TRANSPORT,
-
-                        RD_KAFKA_ERR_ACTION_REFRESH,
-                        RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-
-                        RD_KAFKA_ERR_ACTION_END);
-
-                rd_rkb_dbg(rkb, MSG, "MSGSET",
-                           "%s [%"PRId32"]: MessageSet with %i message(s) "
-                           "encountered error: %s (actions 0x%x)",
-                           rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-                           rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt),
-                           rd_kafka_err2str(err), actions);
-
-                /* NOTE: REFRESH implies a later retry, which does NOT affect
-                 *       the retry count since refresh-errors are considered
-                 *       to be stale metadata rather than temporary errors.
-                 *
-                 *       This is somewhat problematic since it may cause
-                 *       duplicate messages even with retries=0 if the
-                 *       ProduceRequest made it to the broker but only the
-                 *       response was lost due to network connectivity issues.
-                 *       That problem will be sorted when EoS is implemented.
-                 */
-                if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
-                        /* Request metadata information update */
-                        rd_kafka_toppar_leader_unavailable(rktp,
-                                                           "produce", err);
-
-                        /* Move messages (in the rkbuf) back to the partition's
-                         * queue head. They will be resent when a new leader
-                         * is delegated. */
-                        rd_kafka_toppar_insert_msgq(rktp, &request->rkbuf_msgq);
-
-                        /* No need for fallthru here since the request
-                         * no longer has any messages associated with it. */
-                        goto done;
-                }
-
-                if ((actions & RD_KAFKA_ERR_ACTION_RETRY) &&
-                    rd_kafka_buf_retry(rkb, request))
-                        return; /* Scheduled for retry */
-
-                /* Refresh implies a later retry through other means */
-                if (actions & RD_KAFKA_ERR_ACTION_REFRESH)
-                        goto done;
-
-                /* Translate request-level timeout error code
-                 * to message-level timeout error code. */
-                if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
-                        err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
-
-                /* Fatal errors: no message transmission retries */
-                /* FALLTHRU */
-        }
-
-        /* Propagate assigned offset and timestamp back to app. */
-        if (likely(offset != RD_KAFKA_OFFSET_INVALID)) {
-                rd_kafka_msg_t *rkm;
-                if (rktp->rktp_rkt->rkt_conf.produce_offset_report) {
-                        /* produce.offset.report: each message */
-                        TAILQ_FOREACH(rkm, &request->rkbuf_msgq.rkmq_msgs,
-                                      rkm_link) {
-                                rkm->rkm_offset = offset++;
-                                if (timestamp != -1) {
-                                        rkm->rkm_timestamp = timestamp;
-                                        rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
-                                }
-                        }
-                } else {
-                        /* Last message in each batch */
-                        rkm = TAILQ_LAST(&request->rkbuf_msgq.rkmq_msgs,
-                                         rd_kafka_msg_head_s);
-                        rkm->rkm_offset = offset +
-                                rd_atomic32_get(&request->rkbuf_msgq.
-                                                rkmq_msg_cnt) - 1;
-                        if (timestamp != -1) {
-                                rkm->rkm_timestamp = timestamp;
-                                rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
-                        }
-                }
-        }
-
-        /* Enqueue messages for delivery report */
-        rd_kafka_dr_msgq(rktp->rktp_rkt, &request->rkbuf_msgq, err);
-
- done:
-        rd_kafka_toppar_destroy(s_rktp); /* from ProduceRequest() */
-}
-
-
-/**
- * @brief Send ProduceRequest for messages in toppar queue.
- *
- * @returns the number of messages included, or 0 on error / no messages.
- *
- * @locality broker thread
- */
-int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp) {
-        rd_kafka_buf_t *rkbuf;
-        rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
-        size_t MessageSetSize = 0;
-        int cnt;
-
-        /**
-         * Create ProduceRequest with as many messages from the toppar
-         * transmit queue as possible.
-         */
-        rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
-                                                      &MessageSetSize);
-        if (unlikely(!rkbuf))
-                return 0;
-
-        cnt = rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt);
-        rd_dassert(cnt > 0);
-
-        rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
-        rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize);
-
-        if (!rkt->rkt_conf.required_acks)
-                rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
-
-        /* Use timeout from first message. */
-        rkbuf->rkbuf_ts_timeout =
-                TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->rkm_ts_timeout;
-
-        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
-                                       RD_KAFKA_NO_REPLYQ,
-                                       rd_kafka_handle_Produce,
-                                       /* toppar ref for handle_Produce() */
-                                       rd_kafka_toppar_keep(rktp));
-
-        return cnt;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
deleted file mode 100644
index a6a11e5..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdkafka_cgrp.h"
-#include "rdkafka_feature.h"
-
-
-#define RD_KAFKA_ERR_ACTION_PERMANENT  0x1 /* Permanent error */
-#define RD_KAFKA_ERR_ACTION_IGNORE     0x2 /* Error can be ignored */
-#define RD_KAFKA_ERR_ACTION_REFRESH    0x4 /* Refresh state (e.g., metadata) */
-#define RD_KAFKA_ERR_ACTION_RETRY      0x8 /* Retry request after backoff */
-#define RD_KAFKA_ERR_ACTION_INFORM    0x10 /* Inform application about err */
-#define RD_KAFKA_ERR_ACTION_SPECIAL   0x20 /* Special-purpose, depends on context */
-#define RD_KAFKA_ERR_ACTION_END          0 /* var-arg sentinel */
-
-int rd_kafka_err_action (rd_kafka_broker_t *rkb,
-			 rd_kafka_resp_err_t err,
-			 rd_kafka_buf_t *rkbuf,
-			 rd_kafka_buf_t *request, ...);
-
-
-void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
-                                       const rd_kafkap_str_t *cgrp,
-                                       rd_kafka_replyq_t replyq,
-                                       rd_kafka_resp_cb_t *resp_cb,
-                                       void *opaque);
-
-rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
-					    rd_kafka_broker_t *rkb,
-					    rd_kafka_resp_err_t err,
-					    rd_kafka_buf_t *rkbuf,
-					    rd_kafka_buf_t *request,
-                                            rd_kafka_topic_partition_list_t
-                                            *offsets);
-
-void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
-                             rd_kafka_topic_partition_list_t *offsets,
-                             int16_t api_version,
-                             rd_kafka_replyq_t replyq,
-                             rd_kafka_resp_cb_t *resp_cb,
-                             void *opaque);
-
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
-			     rd_kafka_broker_t *rkb,
-			     rd_kafka_resp_err_t err,
-			     rd_kafka_buf_t *rkbuf,
-			     rd_kafka_buf_t *request,
-			     rd_kafka_topic_partition_list_t *offsets,
-			     int update_toppar);
-
-void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
-				     rd_kafka_broker_t *rkb,
-                                     rd_kafka_resp_err_t err,
-                                     rd_kafka_buf_t *rkbuf,
-                                     rd_kafka_buf_t *request,
-                                     void *opaque);
-
-void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
-                                  int16_t api_version,
-                                  rd_kafka_topic_partition_list_t *parts,
-                                  rd_kafka_replyq_t replyq,
-                                  rd_kafka_resp_cb_t *resp_cb,
-                                  void *opaque);
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
-			      rd_kafka_broker_t *rkb,
-			      rd_kafka_resp_err_t err,
-			      rd_kafka_buf_t *rkbuf,
-			      rd_kafka_buf_t *request,
-			      rd_kafka_topic_partition_list_t *offsets);
-int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
-				  rd_kafka_cgrp_t *rkcg,
-				  int16_t api_version,
-				  rd_kafka_topic_partition_list_t *offsets,
-				  rd_kafka_replyq_t replyq,
-				  rd_kafka_resp_cb_t *resp_cb,
-				  void *opaque, const char *reason);
-
-
-
-void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
-                                const rd_kafkap_str_t *group_id,
-                                const rd_kafkap_str_t *member_id,
-                                const rd_kafkap_str_t *protocol_type,
-				const rd_list_t *topics,
-                                rd_kafka_replyq_t replyq,
-                                rd_kafka_resp_cb_t *resp_cb,
-                                void *opaque);
-
-
-void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
-                                 const rd_kafkap_str_t *group_id,
-                                 const rd_kafkap_str_t *member_id,
-                                 rd_kafka_replyq_t replyq,
-                                 rd_kafka_resp_cb_t *resp_cb,
-                                 void *opaque);
-void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
-				 rd_kafka_broker_t *rkb,
-                                 rd_kafka_resp_err_t err,
-                                 rd_kafka_buf_t *rkbuf,
-                                 rd_kafka_buf_t *request,
-                                 void *opaque);
-
-void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
-                                const rd_kafkap_str_t *group_id,
-                                int32_t generation_id,
-                                const rd_kafkap_str_t *member_id,
-                                const rd_kafka_group_member_t
-                                *assignments,
-                                int assignment_cnt,
-                                rd_kafka_replyq_t replyq,
-                                rd_kafka_resp_cb_t *resp_cb,
-                                void *opaque);
-void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
-				rd_kafka_broker_t *rkb,
-                                rd_kafka_resp_err_t err,
-                                rd_kafka_buf_t *rkbuf,
-                                rd_kafka_buf_t *request,
-                                void *opaque);
-
-void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
-                                 rd_kafka_replyq_t replyq,
-                                 rd_kafka_resp_cb_t *resp_cb,
-                                 void *opaque);
-
-void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
-                                     const char **groups, int group_cnt,
-                                     rd_kafka_replyq_t replyq,
-                                     rd_kafka_resp_cb_t *resp_cb,
-                                     void *opaque);
-
-
-void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
-                                const rd_kafkap_str_t *group_id,
-                                int32_t generation_id,
-                                const rd_kafkap_str_t *member_id,
-                                rd_kafka_replyq_t replyq,
-                                rd_kafka_resp_cb_t *resp_cb,
-                                void *opaque);
-
-rd_kafka_resp_err_t
-rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
-                          const rd_list_t *topics, const char *reason,
-                          rd_kafka_op_t *rko);
-
-rd_kafka_resp_err_t
-rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
-			    rd_kafka_broker_t *rkb,
-			    rd_kafka_resp_err_t err,
-			    rd_kafka_buf_t *rkbuf,
-			    rd_kafka_buf_t *request,
-			    struct rd_kafka_ApiVersion **apis,
-			    size_t *api_cnt);
-void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
-				 rd_kafka_replyq_t replyq,
-				 rd_kafka_resp_cb_t *resp_cb,
-				 void *opaque, int flash_msg);
-
-void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
-				    const char *mechanism,
-				    rd_kafka_replyq_t replyq,
-				    rd_kafka_resp_cb_t *resp_cb,
-				    void *opaque, int flash_msg);
-
-int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
deleted file mode 100644
index 0482f88..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2015 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#include "rdkafka_int.h"
-#include "rdkafka_assignor.h"
-
-
-/**
- * Source: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
- *
- * The roundrobin assignor lays out all the available partitions and all the
- * available consumers. It then proceeds to do a roundrobin assignment from
- * partition to consumer. If the subscriptions of all consumer instances are
- * identical, then the partitions will be uniformly distributed. (i.e., the 
- * partition ownership counts will be within a delta of exactly one across all
- * consumers.)
- *
- * For example, suppose there are two consumers C0 and C1, two topics t0 and
- * t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
- * t0p2, t1p0, t1p1, and t1p2.
- *
- * The assignment will be:
- * C0: [t0p0, t0p2, t1p1]
- * C1: [t0p1, t1p0, t1p2]
- */
-
-rd_kafka_resp_err_t
-rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
-					const char *member_id,
-					const char *protocol_name,
-					const rd_kafka_metadata_t *metadata,
-					rd_kafka_group_member_t *members,
-					size_t member_cnt,
-					rd_kafka_assignor_topic_t
-					**eligible_topics,
-					size_t eligible_topic_cnt,
-					char *errstr, size_t errstr_size,
-					void *opaque) {
-        unsigned int ti;
-	int next = 0; /* Next member id */
-
-	/* Sort topics by name */
-	qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics),
-	      rd_kafka_assignor_topic_cmp);
-
-	/* Sort members by name */
-	qsort(members, member_cnt, sizeof(*members),
-	      rd_kafka_group_member_cmp);
-
-        for (ti = 0 ; ti < eligible_topic_cnt ; ti++) {
-                rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti];
-		int partition;
-
-		/* For each topic+partition, assign one member (in a cyclic
-		 * iteration) per partition until the partitions are exhausted*/
-		for (partition = 0 ;
-		     partition < eligible_topic->metadata->partition_cnt ;
-		     partition++) {
-			rd_kafka_group_member_t *rkgm;
-
-			/* Scan through members until we find one with a
-			 * subscription to this topic. */
-			while (!rd_kafka_group_member_find_subscription(
-				       rk, &members[next],
-				       eligible_topic->metadata->topic))
-				next++;
-
-			rkgm = &members[next];
-
-			rd_kafka_dbg(rk, CGRP, "ASSIGN",
-				     "roundrobin: Member \"%s\": "
-				     "assigned topic %s partition %d",
-				     rkgm->rkgm_member_id->str,
-				     eligible_topic->metadata->topic,
-				     partition);
-
-			rd_kafka_topic_partition_list_add(
-				rkgm->rkgm_assignment,
-				eligible_topic->metadata->topic, partition);
-
-			next = (next+1) % rd_list_cnt(&eligible_topic->members);
-		}
-	}
-
-
-        return 0;
-}
-
-
-