You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:39 UTC

[09/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
new file mode 100644
index 0000000..2d023b4
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
@@ -0,0 +1,1848 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdarg.h>
+
+#include "rdkafka_int.h"
+#include "rdkafka_request.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_metadata.h"
+#include "rdkafka_msgset.h"
+
+#include "rdrand.h"
+
+/**
+ * Kafka protocol request and response handling.
+ * All of this code runs in the broker thread and uses op queues for
+ * propagating results back to the various sub-systems operating in
+ * other threads.
+ */
+
+
+/**
+ * @brief Decide action(s) to take based on the returned error code.
+ *
+ * The optional var-args is a .._ACTION_END terminated list
+ * of action,error tuples which overrides the general behaviour.
+ * It is to be read as: for \p error, return \p action(s).
+ */
+int rd_kafka_err_action (rd_kafka_broker_t *rkb,
+			 rd_kafka_resp_err_t err,
+			 rd_kafka_buf_t *rkbuf,
+			 rd_kafka_buf_t *request, ...) {
+	va_list ap;
+        int actions = 0;
+	int exp_act;
+
+	/* Match explicitly defined error mappings first. */
+	va_start(ap, request);
+	while ((exp_act = va_arg(ap, int))) {
+		int exp_err = va_arg(ap, int);
+
+		if (err == exp_err)
+			actions |= exp_act;
+	}
+	va_end(ap);
+
+	if (err && rkb && request)
+                rd_rkb_dbg(rkb, BROKER, "REQERR",
+                           "%sRequest failed: %s: explicit actions 0x%x",
+                           rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
+                           rd_kafka_err2str(err), actions);
+
+	/* Explicit error match. */
+	if (actions)
+		return actions;
+
+	/* Default error matching */
+        switch (err)
+        {
+        case RD_KAFKA_RESP_ERR_NO_ERROR:
+                break;
+        case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
+        case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
+        case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
+        case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
+        case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
+        case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
+        case RD_KAFKA_RESP_ERR__WAIT_COORD:
+                /* Request metadata information update */
+                actions |= RD_KAFKA_ERR_ACTION_REFRESH;
+                break;
+        case RD_KAFKA_RESP_ERR__TIMED_OUT:
+        case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
+                /* Broker-side request handling timeout */
+	case RD_KAFKA_RESP_ERR__TRANSPORT:
+		/* Broker connection down */
+		actions |= RD_KAFKA_ERR_ACTION_RETRY;
+		break;
+        case RD_KAFKA_RESP_ERR__DESTROY:
+	case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
+        case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
+        default:
+                actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
+                break;
+        }
+
+        return actions;
+}
+
+
+/**
+ * Send GroupCoordinatorRequest
+ */
+void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
+                                       const rd_kafkap_str_t *cgrp,
+                                       rd_kafka_replyq_t replyq,
+                                       rd_kafka_resp_cb_t *resp_cb,
+                                       void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1,
+                                         RD_KAFKAP_STR_SIZE(cgrp));
+        rd_kafka_buf_write_kstr(rkbuf, cgrp);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * @brief Parses and handles Offset replies.
+ *
+ * Returns the parsed offsets (and errors) in \p offsets
+ *
+ * @returns 0 on success, else an error.
+ */
+rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
+                                            rd_kafka_broker_t *rkb,
+                                            rd_kafka_resp_err_t err,
+                                            rd_kafka_buf_t *rkbuf,
+                                            rd_kafka_buf_t *request,
+                                            rd_kafka_topic_partition_list_t
+                                            *offsets) {
+
+        const int log_decode_errors = LOG_ERR;
+        int16_t ErrorCode = 0;
+        int32_t TopicArrayCnt;
+        int actions;
+        int16_t api_version;
+
+        if (err) {
+                ErrorCode = err;
+                goto err;
+        }
+
+        api_version = request->rkbuf_reqhdr.ApiVersion;
+
+        /* NOTE:
+         * Broker may return offsets in a different constellation than
+         * in the original request .*/
+
+        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+        while (TopicArrayCnt-- > 0) {
+                rd_kafkap_str_t ktopic;
+                int32_t PartArrayCnt;
+                char *topic_name;
+
+                rd_kafka_buf_read_str(rkbuf, &ktopic);
+                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+                RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);
+
+                while (PartArrayCnt-- > 0) {
+                        int32_t kpartition;
+                        int32_t OffsetArrayCnt;
+                        int64_t Offset = -1;
+                        rd_kafka_topic_partition_t *rktpar;
+
+                        rd_kafka_buf_read_i32(rkbuf, &kpartition);
+                        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+                        if (api_version == 1) {
+                                int64_t Timestamp;
+                                rd_kafka_buf_read_i64(rkbuf, &Timestamp);
+                                rd_kafka_buf_read_i64(rkbuf, &Offset);
+                        } else if (api_version == 0) {
+                                rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
+                                /* We only request one offset so just grab
+                                 * the first one. */
+                                while (OffsetArrayCnt-- > 0)
+                                        rd_kafka_buf_read_i64(rkbuf, &Offset);
+                        } else {
+                                rd_kafka_assert(NULL, !*"NOTREACHED");
+                        }
+
+                        rktpar = rd_kafka_topic_partition_list_add(
+                                offsets, topic_name, kpartition);
+                        rktpar->err = ErrorCode;
+                        rktpar->offset = Offset;
+                }
+        }
+
+        goto done;
+
+ err_parse:
+        ErrorCode = rkbuf->rkbuf_err;
+ err:
+        actions = rd_kafka_err_action(
+                rkb, ErrorCode, rkbuf, request,
+                RD_KAFKA_ERR_ACTION_PERMANENT,
+                RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+
+                RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
+                RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
+
+                RD_KAFKA_ERR_ACTION_END);
+
+        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+                char tmp[256];
+                /* Re-query for leader */
+                rd_snprintf(tmp, sizeof(tmp),
+                            "OffsetRequest failed: %s",
+                            rd_kafka_err2str(ErrorCode));
+                rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
+                                                       tmp);
+        }
+
+        if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+                if (rd_kafka_buf_retry(rkb, request))
+                        return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+                /* FALLTHRU */
+        }
+
+done:
+        return ErrorCode;
+}
+
+
+
+
+
+
+/**
+ * Send OffsetRequest for toppar 'rktp'.
+ */
+void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
+                             rd_kafka_topic_partition_list_t *partitions,
+                             int16_t api_version,
+                             rd_kafka_replyq_t replyq,
+                             rd_kafka_resp_cb_t *resp_cb,
+                             void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+        int i;
+        size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
+        const char *last_topic = "";
+        int32_t topic_cnt = 0, part_cnt = 0;
+
+        rd_kafka_topic_partition_list_sort_by_topic(partitions);
+
+        rkbuf = rd_kafka_buf_new_request(
+                rkb, RD_KAFKAP_Offset, 1,
+                /* ReplicaId+TopicArrayCnt+Topic */
+                4+4+100+
+                /* PartArrayCnt */
+                4 +
+                /* partition_cnt * Partition+Time+MaxNumOffs */
+                (partitions->cnt * (4+8+4)));
+
+        /* ReplicaId */
+        rd_kafka_buf_write_i32(rkbuf, -1);
+        /* TopicArrayCnt */
+        of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */
+
+        for (i = 0 ; i < partitions->cnt ; i++) {
+                const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+
+                if (strcmp(rktpar->topic, last_topic)) {
+                        /* Finish last topic, if any. */
+                        if (of_PartArrayCnt > 0)
+                                rd_kafka_buf_update_i32(rkbuf,
+                                                        of_PartArrayCnt,
+                                                        part_cnt);
+
+                        /* Topic */
+                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+                        topic_cnt++;
+                        last_topic = rktpar->topic;
+                        /* New topic so reset partition count */
+                        part_cnt = 0;
+
+                        /* PartitionArrayCnt: updated later */
+                        of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+                }
+
+                /* Partition */
+                rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+                part_cnt++;
+
+                /* Time/Offset */
+                rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
+
+                if (api_version == 0) {
+                        /* MaxNumberOfOffsets */
+                        rd_kafka_buf_write_i32(rkbuf, 1);
+                }
+        }
+
+        if (of_PartArrayCnt > 0) {
+                rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
+                rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
+        }
+
+        rd_kafka_buf_ApiVersion_set(rkbuf, api_version,
+                                    api_version == 1 ?
+                                    RD_KAFKA_FEATURE_OFFSET_TIME : 0);
+
+        rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+                   "OffsetRequest (v%hd, opv %d) "
+                   "for %"PRId32" topic(s) and %"PRId32" partition(s)",
+                   api_version, rkbuf->rkbuf_replyq.version,
+                   topic_cnt, partitions->cnt);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * Generic handler for OffsetFetch responses.
+ * Offsets for included partitions will be propagated through the passed
+ * 'offsets' list.
+ *
+ * \p update_toppar: update toppar's committed_offset
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
+			     rd_kafka_broker_t *rkb,
+			     rd_kafka_resp_err_t err,
+			     rd_kafka_buf_t *rkbuf,
+			     rd_kafka_buf_t *request,
+			     rd_kafka_topic_partition_list_t *offsets,
+			     int update_toppar) {
+        const int log_decode_errors = LOG_ERR;
+        int32_t TopicArrayCnt;
+        int64_t offset = RD_KAFKA_OFFSET_INVALID;
+        rd_kafkap_str_t metadata;
+        int i;
+        int actions;
+        int seen_cnt = 0;
+
+        if (err)
+                goto err;
+
+        /* Set default offset for all partitions. */
+        rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0,
+                                                  RD_KAFKA_OFFSET_INVALID,
+						  0 /* !is commit */);
+
+        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+        for (i = 0 ; i < TopicArrayCnt ; i++) {
+                rd_kafkap_str_t topic;
+                int32_t PartArrayCnt;
+                char *topic_name;
+                int j;
+
+                rd_kafka_buf_read_str(rkbuf, &topic);
+                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+                RD_KAFKAP_STR_DUPA(&topic_name, &topic);
+
+                for (j = 0 ; j < PartArrayCnt ; j++) {
+                        int32_t partition;
+                        shptr_rd_kafka_toppar_t *s_rktp;
+                        rd_kafka_topic_partition_t *rktpar;
+                        int16_t err2;
+
+                        rd_kafka_buf_read_i32(rkbuf, &partition);
+                        rd_kafka_buf_read_i64(rkbuf, &offset);
+                        rd_kafka_buf_read_str(rkbuf, &metadata);
+                        rd_kafka_buf_read_i16(rkbuf, &err2);
+
+                        rktpar = rd_kafka_topic_partition_list_find(offsets,
+                                                                    topic_name,
+                                                                    partition);
+                        if (!rktpar) {
+				rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
+					   "OffsetFetchResponse: %s [%"PRId32"] "
+					   "not found in local list: ignoring",
+					   topic_name, partition);
+                                continue;
+			}
+
+                        seen_cnt++;
+
+			if (!(s_rktp = rktpar->_private)) {
+				s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk,
+							      topic_name,
+							      partition, 0, 0);
+				/* May be NULL if topic is not locally known */
+				rktpar->_private = s_rktp;
+			}
+
+			/* broker reports invalid offset as -1 */
+			if (offset == -1)
+				rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+			else
+				rktpar->offset = offset;
+                        rktpar->err = err2;
+
+			rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
+				   "OffsetFetchResponse: %s [%"PRId32"] offset %"PRId64,
+				   topic_name, partition, offset);
+
+			if (update_toppar && !err2 && s_rktp) {
+				rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
+				/* Update toppar's committed offset */
+				rd_kafka_toppar_lock(rktp);
+				rktp->rktp_committed_offset = rktpar->offset;
+				rd_kafka_toppar_unlock(rktp);
+			}
+
+
+                        if (rktpar->metadata)
+                                rd_free(rktpar->metadata);
+
+                        if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
+                                rktpar->metadata = NULL;
+                                rktpar->metadata_size = 0;
+                        } else {
+                                rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
+                                rktpar->metadata_size =
+                                        RD_KAFKAP_STR_LEN(&metadata);
+                        }
+                }
+        }
+
+
+err:
+        rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
+                   "OffsetFetch for %d/%d partition(s) returned %s",
+                   seen_cnt,
+                   offsets ? offsets->cnt : -1, rd_kafka_err2str(err));
+
+        actions = rd_kafka_err_action(rkb, err, rkbuf, request,
+				      RD_KAFKA_ERR_ACTION_END);
+
+        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+                /* Re-query for coordinator */
+                rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL,
+                                 RD_KAFKA_NO_REPLYQ,
+				 RD_KAFKA_OP_COORD_QUERY, err);
+                if (request) {
+                        /* Schedule a retry */
+                        rd_kafka_buf_keep(request);
+                        rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
+                }
+        }
+
+	return err;
+
+ err_parse:
+        err = rkbuf->rkbuf_err;
+        goto err;
+}
+
+
+
+/**
+ * opaque=rko wrapper for handle_OffsetFetch.
+ * rko->rko_payload MUST be a `rd_kafka_topic_partition_list_t *` which will
+ * be filled in with fetch offsets.
+ *
+ * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
+ *
+ * Locality: cgrp's broker thread
+ */
+void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
+				     rd_kafka_broker_t *rkb,
+                                     rd_kafka_resp_err_t err,
+                                     rd_kafka_buf_t *rkbuf,
+                                     rd_kafka_buf_t *request,
+                                     void *opaque) {
+        rd_kafka_op_t *rko = opaque;
+        rd_kafka_op_t *rko_reply;
+        rd_kafka_topic_partition_list_t *offsets;
+
+	RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);
+
+        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+                /* Termination, quick cleanup. */
+                rd_kafka_op_destroy(rko);
+                return;
+        }
+
+        offsets = rd_kafka_topic_partition_list_copy(
+                rko->rko_u.offset_fetch.partitions);
+
+        rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY);
+        rko_reply->rko_err = err;
+        rko_reply->rko_u.offset_fetch.partitions = offsets;
+        rko_reply->rko_u.offset_fetch.do_free = 1;
+	if (rko->rko_rktp)
+		rko_reply->rko_rktp = rd_kafka_toppar_keep(
+			rd_kafka_toppar_s2i(rko->rko_rktp));
+
+	/* If all partitions already had usable offsets then there
+	 * was no request sent and thus no reply, the offsets list is
+	 * good to go. */
+	if (rkbuf)
+		rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf,
+					    request, offsets, 0);
+
+	rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);
+
+        rd_kafka_op_destroy(rko);
+}
+
+
+
+
+
+
+/**
+ * Send OffsetFetchRequest for toppar.
+ *
+ * Any partition with a usable offset will be ignored, if all partitions
+ * have usable offsets then no request is sent at all but an empty
+ * reply is enqueued on the replyq.
+ */
+void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
+                                  int16_t api_version,
+                                  rd_kafka_topic_partition_list_t *parts,
+				  rd_kafka_replyq_t replyq,
+                                  rd_kafka_resp_cb_t *resp_cb,
+                                  void *opaque) {
+	rd_kafka_buf_t *rkbuf;
+        size_t of_TopicCnt;
+        int TopicCnt = 0;
+        ssize_t of_PartCnt = -1;
+        const char *last_topic = NULL;
+        int PartCnt = 0;
+	int tot_PartCnt = 0;
+        int i;
+
+        rkbuf = rd_kafka_buf_new_request(
+                rkb, RD_KAFKAP_OffsetFetch, 1,
+                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) +
+                4 +
+                (parts->cnt * 32));
+
+
+        /* ConsumerGroup */
+        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id);
+
+        /* Sort partitions by topic */
+        rd_kafka_topic_partition_list_sort_by_topic(parts);
+
+	/* TopicArrayCnt */
+        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
+
+        for (i = 0 ; i < parts->cnt ; i++) {
+                rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
+
+		/* Ignore partitions with a usable offset. */
+		if (rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
+		    rktpar->offset != RD_KAFKA_OFFSET_STORED) {
+			rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+				   "OffsetFetchRequest: skipping %s [%"PRId32"] "
+				   "with valid offset %s",
+				   rktpar->topic, rktpar->partition,
+				   rd_kafka_offset2str(rktpar->offset));
+			continue;
+		}
+
+                if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
+                        /* New topic */
+
+                        /* Finalize previous PartitionCnt */
+                        if (PartCnt > 0)
+                                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
+                                                        PartCnt);
+
+                        /* TopicName */
+                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+                        /* PartitionCnt, finalized later */
+                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+                        PartCnt = 0;
+			last_topic = rktpar->topic;
+                        TopicCnt++;
+                }
+
+                /* Partition */
+                rd_kafka_buf_write_i32(rkbuf,  rktpar->partition);
+                PartCnt++;
+		tot_PartCnt++;
+        }
+
+        /* Finalize previous PartitionCnt */
+        if (PartCnt > 0)
+                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,  PartCnt);
+
+        /* Finalize TopicCnt */
+        rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
+
+        rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
+
+	rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+		   "OffsetFetchRequest(v%d) for %d/%d partition(s)",
+                   api_version, tot_PartCnt, parts->cnt);
+
+	if (tot_PartCnt == 0) {
+		/* No partitions needs OffsetFetch, enqueue empty
+		 * response right away. */
+                rkbuf->rkbuf_replyq = replyq;
+                rkbuf->rkbuf_cb     = resp_cb;
+                rkbuf->rkbuf_opaque = opaque;
+		rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
+		return;
+	}
+
+
+
+	rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * @remark \p offsets may be NULL if \p err is set
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
+			      rd_kafka_broker_t *rkb,
+			      rd_kafka_resp_err_t err,
+			      rd_kafka_buf_t *rkbuf,
+			      rd_kafka_buf_t *request,
+			      rd_kafka_topic_partition_list_t *offsets) {
+        const int log_decode_errors = LOG_ERR;
+        int32_t TopicArrayCnt;
+        int16_t ErrorCode = 0, last_ErrorCode = 0;
+	int errcnt = 0;
+        int i;
+	int actions;
+
+        if (err)
+		goto err;
+
+        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+        for (i = 0 ; i < TopicArrayCnt ; i++) {
+                rd_kafkap_str_t topic;
+                char *topic_str;
+                int32_t PartArrayCnt;
+                int j;
+
+                rd_kafka_buf_read_str(rkbuf, &topic);
+                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+                RD_KAFKAP_STR_DUPA(&topic_str, &topic);
+
+                for (j = 0 ; j < PartArrayCnt ; j++) {
+                        int32_t partition;
+                        rd_kafka_topic_partition_t *rktpar;
+
+                        rd_kafka_buf_read_i32(rkbuf, &partition);
+                        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+                        rktpar = rd_kafka_topic_partition_list_find(
+                                offsets, topic_str, partition);
+
+                        if (!rktpar) {
+                                /* Received offset for topic/partition we didn't
+                                 * ask for, this shouldn't really happen. */
+                                continue;
+                        }
+
+                        rktpar->err = ErrorCode;
+			if (ErrorCode) {
+				last_ErrorCode = ErrorCode;
+				errcnt++;
+			}
+                }
+        }
+
+	/* If all partitions failed use error code
+	 * from last partition as the global error. */
+	if (offsets && errcnt == offsets->cnt)
+		err = last_ErrorCode;
+	goto done;
+
+ err_parse:
+        err = rkbuf->rkbuf_err;
+
+ err:
+        actions = rd_kafka_err_action(
+		rkb, err, rkbuf, request,
+
+		RD_KAFKA_ERR_ACTION_PERMANENT,
+		RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
+
+		RD_KAFKA_ERR_ACTION_RETRY,
+		RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
+
+		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
+		RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
+
+		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
+		RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
+
+		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
+		RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+
+		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
+		RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+
+		RD_KAFKA_ERR_ACTION_RETRY,
+		RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
+
+		RD_KAFKA_ERR_ACTION_PERMANENT,
+		RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
+
+		RD_KAFKA_ERR_ACTION_PERMANENT,
+		RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+
+		RD_KAFKA_ERR_ACTION_PERMANENT,
+		RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
+
+		RD_KAFKA_ERR_ACTION_END);
+
+	if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) {
+		/* Mark coordinator dead or re-query for coordinator.
+		 * ..dead() will trigger a re-query. */
+		if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
+			rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
+						 "OffsetCommitRequest failed");
+		else
+			rd_kafka_cgrp_coord_query(rk->rk_cgrp,
+						  "OffsetCommitRequest failed");
+	}
+	if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+		if (rd_kafka_buf_retry(rkb, request))
+			return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+		/* FALLTHRU */
+	}
+
+ done:
+	return err;
+}
+
+
+
+
+/**
+ * @brief Send OffsetCommitRequest for a list of partitions.
+ *
+ * @returns 0 if none of the partitions in \p offsets had valid offsets,
+ *          else 1.
+ */
+int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
+                                   rd_kafka_cgrp_t *rkcg,
+                                   int16_t api_version,
+                                   rd_kafka_topic_partition_list_t *offsets,
+                                   rd_kafka_replyq_t replyq,
+                                   rd_kafka_resp_cb_t *resp_cb,
+                                   void *opaque, const char *reason) {
+	rd_kafka_buf_t *rkbuf;
+        ssize_t of_TopicCnt = -1;
+        int TopicCnt = 0;
+        const char *last_topic = NULL;
+        ssize_t of_PartCnt = -1;
+        int PartCnt = 0;
+	int tot_PartCnt = 0;
+        int i;
+
+        rd_kafka_assert(NULL, offsets != NULL);
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit,
+                                         1, 100 + (offsets->cnt * 128));
+
+        /* ConsumerGroup */
+        rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id);
+
+        /* v1,v2 */
+        if (api_version >= 1) {
+                /* ConsumerGroupGenerationId */
+                rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id);
+                /* ConsumerId */
+                rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id);
+                /* v2: RetentionTime */
+                if (api_version == 2)
+                        rd_kafka_buf_write_i64(rkbuf, -1);
+        }
+
+        /* Sort offsets by topic */
+        rd_kafka_topic_partition_list_sort_by_topic(offsets);
+
+        /* TopicArrayCnt: Will be updated when we know the number of topics. */
+        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+
+        for (i = 0 ; i < offsets->cnt ; i++) {
+                rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
+
+		/* Skip partitions with invalid offset. */
+		if (rktpar->offset < 0)
+			continue;
+
+                if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
+                        /* New topic */
+
+                        /* Finalize previous PartitionCnt */
+                        if (PartCnt > 0)
+                                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
+                                                        PartCnt);
+
+                        /* TopicName */
+                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+                        /* PartitionCnt, finalized later */
+                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+                        PartCnt = 0;
+			last_topic = rktpar->topic;
+                        TopicCnt++;
+                }
+
+                /* Partition */
+                rd_kafka_buf_write_i32(rkbuf,  rktpar->partition);
+                PartCnt++;
+		tot_PartCnt++;
+
+                /* Offset */
+                rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
+
+                /* v1: TimeStamp */
+                if (api_version == 1)
+                        rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time
+
+                /* Metadata */
+		/* Java client 0.9.0 and broker <0.10.0 can't parse
+		 * Null metadata fields, so as a workaround we send an
+		 * empty string if it's Null. */
+		if (!rktpar->metadata)
+			rd_kafka_buf_write_str(rkbuf, "", 0);
+		else
+			rd_kafka_buf_write_str(rkbuf,
+					       rktpar->metadata,
+					       rktpar->metadata_size);
+        }
+
+	if (tot_PartCnt == 0) {
+		/* No topic+partitions had valid offsets to commit. */
+		rd_kafka_replyq_destroy(&replyq);
+		rd_kafka_buf_destroy(rkbuf);
+		return 0;
+	}
+
+        /* Finalize previous PartitionCnt */
+        if (PartCnt > 0)
+                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,  PartCnt);
+
+        /* Finalize TopicCnt */
+        rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
+
+        rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
+
+        rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+                   "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
+                   api_version, tot_PartCnt, offsets->cnt, reason);
+
+	rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+	return 1;
+
+}
+
+
+
+/**
+ * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
+ *        enveloping buffer \p rkbuf.
+ */
+static void rd_kafka_group_MemberState_consumer_write (
+        rd_kafka_buf_t *env_rkbuf,
+        const rd_kafka_group_member_t *rkgm) {
+        rd_kafka_buf_t *rkbuf;
+        int i;
+        const char *last_topic = NULL;
+        size_t of_TopicCnt;
+        ssize_t of_PartCnt = -1;
+        int TopicCnt = 0;
+        int PartCnt = 0;
+        rd_slice_t slice;
+
+        rkbuf = rd_kafka_buf_new(1, 100);
+        rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
+        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
+        for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) {
+                const rd_kafka_topic_partition_t *rktpar;
+
+                rktpar = &rkgm->rkgm_assignment->elems[i];
+
+                if (!last_topic || strcmp(last_topic,
+                                          rktpar->topic)) {
+                        if (last_topic)
+                                /* Finalize previous PartitionCnt */
+                                rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
+                                                        PartCnt);
+                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+                        /* Updated later */
+                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+                        PartCnt = 0;
+                        last_topic = rktpar->topic;
+                        TopicCnt++;
+                }
+
+                rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+                PartCnt++;
+        }
+
+        if (of_PartCnt != -1)
+                rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt);
+        rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);
+
+        rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);
+
+        /* Get pointer to binary buffer */
+        rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);
+
+        /* Write binary buffer as Kafka Bytes to enveloping buffer. */
+        rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
+        rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);
+
+        rd_kafka_buf_destroy(rkbuf);
+}
+
+/**
+ * Send SyncGroupRequest
+ */
+void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
+                                const rd_kafkap_str_t *group_id,
+                                int32_t generation_id,
+                                const rd_kafkap_str_t *member_id,
+                                const rd_kafka_group_member_t
+                                *assignments,
+                                int assignment_cnt,
+                                rd_kafka_replyq_t replyq,
+                                rd_kafka_resp_cb_t *resp_cb,
+                                void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+        int i;
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup,
+                                         1,
+                                         RD_KAFKAP_STR_SIZE(group_id) +
+                                         4 /* GenerationId */ +
+                                         RD_KAFKAP_STR_SIZE(member_id) +
+                                         4 /* array size group_assignment */ +
+                                         (assignment_cnt * 100/*guess*/));
+        rd_kafka_buf_write_kstr(rkbuf, group_id);
+        rd_kafka_buf_write_i32(rkbuf, generation_id);
+        rd_kafka_buf_write_kstr(rkbuf, member_id);
+        rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
+
+        for (i = 0 ; i < assignment_cnt ; i++) {
+                const rd_kafka_group_member_t *rkgm = &assignments[i];
+
+                rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
+                rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
+        }
+
+        /* This is a blocking request */
+        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
+        rkbuf->rkbuf_ts_timeout = rd_clock() +
+                (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000) +
+                (3*1000*1000/* 3s grace period*/);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+/**
+ * Handler for SyncGroup responses
+ * opaque must be the cgrp handle.
+ */
+void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
+				rd_kafka_broker_t *rkb,
+                                rd_kafka_resp_err_t err,
+                                rd_kafka_buf_t *rkbuf,
+                                rd_kafka_buf_t *request,
+                                void *opaque) {
+        rd_kafka_cgrp_t *rkcg = opaque;
+        const int log_decode_errors = LOG_ERR;
+        int16_t ErrorCode = 0;
+        rd_kafkap_bytes_t MemberState = RD_ZERO_INIT;
+        int actions;
+
+	if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
+		rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
+			     "SyncGroup response: discarding outdated request "
+			     "(now in join-state %s)",
+			     rd_kafka_cgrp_join_state_names[rkcg->
+							    rkcg_join_state]);
+		return;
+	}
+
+        if (err) {
+                ErrorCode = err;
+                goto err;
+        }
+
+        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+        rd_kafka_buf_read_bytes(rkbuf, &MemberState);
+
+err:
+        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+				      RD_KAFKA_ERR_ACTION_END);
+
+        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+                /* Re-query for coordinator */
+                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+				 RD_KAFKA_OP_COORD_QUERY,
+                                 ErrorCode);
+                /* FALLTHRU */
+        }
+
+        rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
+                     "SyncGroup response: %s (%d bytes of MemberState data)",
+                     rd_kafka_err2str(ErrorCode),
+                     RD_KAFKAP_BYTES_LEN(&MemberState));
+
+        if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+                return; /* Termination */
+
+        rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState);
+
+        return;
+
+ err_parse:
+        ErrorCode = rkbuf->rkbuf_err;
+        goto err;
+}
+
+
+/**
+ * Send JoinGroupRequest
+ */
+void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
+                                const rd_kafkap_str_t *group_id,
+                                const rd_kafkap_str_t *member_id,
+                                const rd_kafkap_str_t *protocol_type,
+				const rd_list_t *topics,
+                                rd_kafka_replyq_t replyq,
+                                rd_kafka_resp_cb_t *resp_cb,
+                                void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+        rd_kafka_t *rk = rkb->rkb_rk;
+        rd_kafka_assignor_t *rkas;
+        int i;
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup,
+                                         1,
+                                         RD_KAFKAP_STR_SIZE(group_id) +
+                                         4 /* sessionTimeoutMs */ +
+                                         RD_KAFKAP_STR_SIZE(member_id) +
+                                         RD_KAFKAP_STR_SIZE(protocol_type) +
+                                         4 /* array count GroupProtocols */ +
+                                         (rd_list_cnt(topics) * 100));
+        rd_kafka_buf_write_kstr(rkbuf, group_id);
+        rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
+        rd_kafka_buf_write_kstr(rkbuf, member_id);
+        rd_kafka_buf_write_kstr(rkbuf, protocol_type);
+        rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
+
+        RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
+                rd_kafkap_bytes_t *member_metadata;
+		if (!rkas->rkas_enabled)
+			continue;
+                rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
+                member_metadata = rkas->rkas_get_metadata_cb(rkas, topics);
+                rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
+                rd_kafkap_bytes_destroy(member_metadata);
+        }
+
+        /* This is a blocking request */
+        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
+        rkbuf->rkbuf_ts_timeout = rd_clock() +
+                (rk->rk_conf.group_session_timeout_ms * 1000) +
+                (3*1000*1000/* 3s grace period*/);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+
+
+/**
+ * Send LeaveGroupRequest
+ */
+void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
+                                 const rd_kafkap_str_t *group_id,
+                                 const rd_kafkap_str_t *member_id,
+                                 rd_kafka_replyq_t replyq,
+                                 rd_kafka_resp_cb_t *resp_cb,
+                                 void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup,
+                                         1,
+                                         RD_KAFKAP_STR_SIZE(group_id) +
+                                         RD_KAFKAP_STR_SIZE(member_id));
+        rd_kafka_buf_write_kstr(rkbuf, group_id);
+        rd_kafka_buf_write_kstr(rkbuf, member_id);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * Handler for LeaveGroup responses
+ * opaque must be the cgrp handle.
+ */
+void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
+				 rd_kafka_broker_t *rkb,
+                                 rd_kafka_resp_err_t err,
+                                 rd_kafka_buf_t *rkbuf,
+                                 rd_kafka_buf_t *request,
+                                 void *opaque) {
+        rd_kafka_cgrp_t *rkcg = opaque;
+        const int log_decode_errors = LOG_ERR;
+        int16_t ErrorCode = 0;
+        int actions;
+
+        if (err) {
+                ErrorCode = err;
+                goto err;
+        }
+
+        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+
+err:
+        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+				      RD_KAFKA_ERR_ACTION_END);
+
+        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+                /* Re-query for coordinator */
+                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+				 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+                /* Schedule a retry */
+                rd_kafka_buf_keep(request);
+                rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
+                return;
+        }
+
+        if (ErrorCode)
+                rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
+                             "LeaveGroup response: %s",
+                             rd_kafka_err2str(ErrorCode));
+
+ err_parse:
+        ErrorCode = rkbuf->rkbuf_err;
+        goto err;
+}
+
+
+
+
+
+
+/**
+ * Send HeartbeatRequest
+ */
+void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
+                                const rd_kafkap_str_t *group_id,
+                                int32_t generation_id,
+                                const rd_kafkap_str_t *member_id,
+                                rd_kafka_replyq_t replyq,
+                                rd_kafka_resp_cb_t *resp_cb,
+                                void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+
+        rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
+                   "Heartbeat for group \"%s\" generation id %"PRId32,
+                   group_id->str, generation_id);
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat,
+                                         1,
+                                         RD_KAFKAP_STR_SIZE(group_id) +
+                                         4 /* GenerationId */ +
+                                         RD_KAFKAP_STR_SIZE(member_id));
+
+        rd_kafka_buf_write_kstr(rkbuf, group_id);
+        rd_kafka_buf_write_i32(rkbuf, generation_id);
+        rd_kafka_buf_write_kstr(rkbuf, member_id);
+
+        rkbuf->rkbuf_ts_timeout = rd_clock() +
+                (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * Send ListGroupsRequest
+ */
+void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
+                                 rd_kafka_replyq_t replyq,
+                                 rd_kafka_resp_cb_t *resp_cb,
+                                 void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * Send DescribeGroupsRequest
+ */
+void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
+                                     const char **groups, int group_cnt,
+                                     rd_kafka_replyq_t replyq,
+                                     rd_kafka_resp_cb_t *resp_cb,
+                                     void *opaque) {
+        rd_kafka_buf_t *rkbuf;
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups,
+                                         1, 32*group_cnt);
+
+        rd_kafka_buf_write_i32(rkbuf, group_cnt);
+        while (group_cnt-- > 0)
+                rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * @brief Generic handler for Metadata responses
+ *
+ * @locality rdkafka main thread
+ */
+static void rd_kafka_handle_Metadata (rd_kafka_t *rk,
+                                      rd_kafka_broker_t *rkb,
+                                      rd_kafka_resp_err_t err,
+                                      rd_kafka_buf_t *rkbuf,
+                                      rd_kafka_buf_t *request,
+                                      void *opaque) {
+        rd_kafka_op_t *rko = opaque; /* Possibly NULL */
+        struct rd_kafka_metadata *md = NULL;
+        const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
+
+        rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
+                        thrd_is_current(rk->rk_thread));
+
+	/* Avoid metadata updates when we're terminating. */
+	if (rd_kafka_terminating(rkb->rkb_rk))
+                err = RD_KAFKA_RESP_ERR__DESTROY;
+
+	if (unlikely(err)) {
+                if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+                        /* Terminating */
+                        goto done;
+                }
+
+                /* FIXME: handle errors */
+                rd_rkb_log(rkb, LOG_WARNING, "METADATA",
+                           "Metadata request failed: %s (%dms)",
+                           rd_kafka_err2str(err),
+			   (int)(request->rkbuf_ts_sent/1000));
+	} else {
+
+                if (!topics)
+                        rd_rkb_dbg(rkb, METADATA, "METADATA",
+                                   "===== Received metadata: %s =====",
+                                   request->rkbuf_u.Metadata.reason);
+                else
+                        rd_rkb_dbg(rkb, METADATA, "METADATA",
+                                   "===== Received metadata "
+                                   "(for %d requested topics): %s =====",
+                                   rd_list_cnt(topics),
+                                   request->rkbuf_u.Metadata.reason);
+
+                md = rd_kafka_parse_Metadata(rkb, request, rkbuf);
+		if (!md) {
+			if (rd_kafka_buf_retry(rkb, request))
+				return;
+			err = RD_KAFKA_RESP_ERR__BAD_MSG;
+                }
+        }
+
+        if (rko && rko->rko_replyq.q) {
+                /* Reply to metadata requester, passing on the metadata.
+                 * Reuse requesting rko for the reply. */
+                rko->rko_err = err;
+                rko->rko_u.metadata.md = md;
+
+                rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
+                rko = NULL;
+        } else {
+                if (md)
+                        rd_free(md);
+        }
+
+ done:
+        if (rko)
+                rd_kafka_op_destroy(rko);
+}
+
+
+
+/**
+ * @brief Construct MetadataRequest (does not send)
+ *
+ * \p topics is a list of topic names (char *) to request.
+ *
+ * !topics          - only request brokers (if supported by broker, else
+ *                    all topics)
+ *  topics.cnt==0   - all topics in cluster are requested
+ *  topics.cnt >0   - only specified topics are requested
+ *
+ * @param reason    - metadata request reason
+ * @param rko       - (optional) rko with replyq for handling response.
+ *                    Specifying an rko forces a metadata request even if
+ *                    there is already a matching one in-transit.
+ *
+ * If full metadata for all topics is requested (or all brokers, which
+ * results in all-topics on older brokers) and there is already a full request
+ * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
+ * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
+ * is sent regardless.
+ */
+rd_kafka_resp_err_t
+rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
+                          const rd_list_t *topics, const char *reason,
+                          rd_kafka_op_t *rko) {
+        rd_kafka_buf_t *rkbuf;
+        int16_t ApiVersion = 0;
+        int features;
+        int topic_cnt = topics ? rd_list_cnt(topics) : 0;
+        int *full_incr = NULL;
+
+        ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb,
+                                                          RD_KAFKAP_Metadata,
+                                                          0, 2,
+                                                          &features);
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1,
+                                         4 + (50 * topic_cnt));
+
+        if (!reason)
+                reason = "";
+
+        rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);
+
+        if (!topics && ApiVersion >= 1) {
+                /* a null(0) array (in the protocol) represents no topics */
+                rd_kafka_buf_write_i32(rkbuf, 0);
+                rd_rkb_dbg(rkb, METADATA, "METADATA",
+                           "Request metadata for brokers only: %s", reason);
+                full_incr = &rkb->rkb_rk->rk_metadata_cache.
+                        rkmc_full_brokers_sent;
+
+        } else {
+                if (topic_cnt == 0 && !rko)
+                        full_incr = &rkb->rkb_rk->rk_metadata_cache.
+                                rkmc_full_topics_sent;
+
+                if (topic_cnt == 0 && ApiVersion >= 1)
+                        rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/
+                else
+                        rd_kafka_buf_write_i32(rkbuf, topic_cnt);
+
+                if (topic_cnt == 0) {
+                        rkbuf->rkbuf_u.Metadata.all_topics = 1;
+                        rd_rkb_dbg(rkb, METADATA, "METADATA",
+                                   "Request metadata for all topics: "
+                                   "%s", reason);
+                } else
+                        rd_rkb_dbg(rkb, METADATA, "METADATA",
+                                   "Request metadata for %d topic(s): "
+                                   "%s", topic_cnt, reason);
+        }
+
+        if (full_incr) {
+                /* Avoid multiple outstanding full requests
+                 * (since they are redundant and side-effect-less).
+                 * Forced requests (app using metadata() API) are passed
+                 * through regardless. */
+
+                mtx_lock(&rkb->rkb_rk->rk_metadata_cache.
+                         rkmc_full_lock);
+                if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
+                        mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
+                                   rkmc_full_lock);
+                        rd_rkb_dbg(rkb, METADATA, "METADATA",
+                                   "Skipping metadata request: %s: "
+                                   "full request already in-transit",
+                                   reason);
+                        rd_kafka_buf_destroy(rkbuf);
+                        return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
+                }
+
+                (*full_incr)++;
+                mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
+                           rkmc_full_lock);
+                rkbuf->rkbuf_u.Metadata.decr = full_incr;
+                rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk->
+                        rk_metadata_cache.rkmc_full_lock;
+        }
+
+
+        if (topic_cnt > 0) {
+                char *topic;
+                int i;
+
+                /* Maintain a copy of the topics list so we can purge
+                 * hints from the metadata cache on error. */
+                rkbuf->rkbuf_u.Metadata.topics =
+                        rd_list_copy(topics, rd_list_string_copy, NULL);
+
+                RD_LIST_FOREACH(topic, topics, i)
+                        rd_kafka_buf_write_str(rkbuf, topic, -1);
+
+        }
+
+        rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+        /* Metadata requests are part of the important control plane
+         * and should go before other requests (Produce, Fetch, etc). */
+        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLASH;
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
+                                       /* Handle response thru rk_ops,
+                                        * but forward parsed result to
+                                        * rko's replyq when done. */
+                                       RD_KAFKA_REPLYQ(rkb->rkb_rk->
+                                                       rk_ops, 0),
+                                       rd_kafka_handle_Metadata, rko);
+
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+
+
+
+
+
+
+/**
+ * @brief Parses and handles ApiVersion reply.
+ *
+ * @param apis will be allocated, populated and sorted
+ *             with broker's supported APIs.
+ * @param api_cnt will be set to the number of elements in \p *apis
+
+ * @returns 0 on success, else an error.
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
+			    rd_kafka_broker_t *rkb,
+			    rd_kafka_resp_err_t err,
+			    rd_kafka_buf_t *rkbuf,
+			    rd_kafka_buf_t *request,
+			    struct rd_kafka_ApiVersion **apis,
+			    size_t *api_cnt) {
+        const int log_decode_errors = LOG_ERR;
+        int actions;
+	int32_t ApiArrayCnt;
+	int16_t ErrorCode;
+	int i = 0;
+
+	*apis = NULL;
+
+        if (err)
+                goto err;
+
+	rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+	if ((err = ErrorCode))
+		goto err;
+
+        rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt);
+	if (ApiArrayCnt > 1000)
+		rd_kafka_buf_parse_fail(rkbuf,
+					"ApiArrayCnt %"PRId32" out of range",
+					ApiArrayCnt);
+
+	rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
+		   "Broker API support:");
+
+	*apis = malloc(sizeof(**apis) * ApiArrayCnt);
+
+	for (i = 0 ; i < ApiArrayCnt ; i++) {
+		struct rd_kafka_ApiVersion *api = &(*apis)[i];
+
+		rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
+		rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
+		rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);
+
+		rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
+			   "  ApiKey %s (%hd) Versions %hd..%hd",
+			   rd_kafka_ApiKey2str(api->ApiKey),
+			   api->ApiKey, api->MinVer, api->MaxVer);
+        }
+
+	*api_cnt = ApiArrayCnt;
+        qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);
+
+	goto done;
+
+ err_parse:
+        err = rkbuf->rkbuf_err;
+ err:
+	if (*apis)
+		rd_free(*apis);
+
+        actions = rd_kafka_err_action(
+		rkb, err, rkbuf, request,
+		RD_KAFKA_ERR_ACTION_END);
+
+	if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+		if (rd_kafka_buf_retry(rkb, request))
+			return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+		/* FALLTHRU */
+	}
+
+done:
+        return err;
+}
+
+
+
+/**
+ * Send ApiVersionRequest (KIP-35)
+ */
+void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
+				 rd_kafka_replyq_t replyq,
+				 rd_kafka_resp_cb_t *resp_cb,
+				 void *opaque, int flash_msg) {
+        rd_kafka_buf_t *rkbuf;
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4);
+	rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
+	rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */
+
+	/* Non-supporting brokers will tear down the connection when they
+	 * receive an unknown API request, so dont retry request on failure. */
+	rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
+
+	/* 0.9.0.x brokers will not close the connection on unsupported
+	 * API requests, so we minimize the timeout for the request.
+	 * This is a regression on the broker part. */
+	rkbuf->rkbuf_ts_timeout = rd_clock() + (rkb->rkb_rk->rk_conf.api_version_request_timeout_ms * 1000);
+
+        if (replyq.q)
+                rd_kafka_broker_buf_enq_replyq(rkb,
+                                               rkbuf, replyq, resp_cb, opaque);
+	else /* in broker thread */
+		rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
+}
+
+
+/**
+ * Send SaslHandshakeRequest (KIP-43)
+ */
+void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
+				    const char *mechanism,
+				    rd_kafka_replyq_t replyq,
+				    rd_kafka_resp_cb_t *resp_cb,
+				    void *opaque, int flash_msg) {
+        rd_kafka_buf_t *rkbuf;
+	int mechlen = (int)strlen(mechanism);
+
+        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake,
+                                         1, RD_KAFKAP_STR_SIZE0(mechlen));
+	rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
+	rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);
+
+	/* Non-supporting brokers will tear down the conneciton when they
+	 * receive an unknown API request or where the SASL GSSAPI
+	 * token type is not recognized, so dont retry request on failure. */
+	rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
+
+	/* 0.9.0.x brokers will not close the connection on unsupported
+	 * API requests, so we minimize the timeout of the request.
+	 * This is a regression on the broker part. */
+	if (!rkb->rkb_rk->rk_conf.api_version_request &&
+            rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000)
+		rkbuf->rkbuf_ts_timeout = rd_clock() + (10 * 1000 * 1000);
+
+	if (replyq.q)
+		rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq,
+                                               resp_cb, opaque);
+	else /* in broker thread */
+		rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * @brief Parses a Produce reply.
+ * @returns 0 on success or an error code on failure.
+ * @locality broker thread
+ */
+static rd_kafka_resp_err_t
+rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb,
+                               rd_kafka_toppar_t *rktp,
+                               rd_kafka_buf_t *rkbuf,
+                               rd_kafka_buf_t *request,
+                               int64_t *offsetp,
+                               int64_t *timestampp) {
+        int32_t TopicArrayCnt;
+        int32_t PartitionArrayCnt;
+        struct {
+                int32_t Partition;
+                int16_t ErrorCode;
+                int64_t Offset;
+        } hdr;
+        const int log_decode_errors = LOG_ERR;
+
+        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+        if (TopicArrayCnt != 1)
+                goto err;
+
+        /* Since we only produce to one single topic+partition in each
+         * request we assume that the reply only contains one topic+partition
+         * and that it is the same that we requested.
+         * If not the broker is buggy. */
+        rd_kafka_buf_skip_str(rkbuf);
+        rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
+
+        if (PartitionArrayCnt != 1)
+                goto err;
+
+        rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
+        rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
+        rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
+
+        *offsetp = hdr.Offset;
+
+        *timestampp = -1;
+        if (request->rkbuf_reqhdr.ApiVersion >= 2) {
+                rd_kafka_buf_read_i64(rkbuf, timestampp);
+        }
+
+        if (request->rkbuf_reqhdr.ApiVersion >= 1) {
+                int32_t Throttle_Time;
+                rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
+
+                rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
+                                          Throttle_Time);
+        }
+
+
+        return hdr.ErrorCode;
+
+ err_parse:
+        return rkbuf->rkbuf_err;
+ err:
+        return RD_KAFKA_RESP_ERR__BAD_MSG;
+}
+
+
+/**
+ * @brief Handle ProduceResponse
+ *
+ * @locality broker thread
+ */
+static void rd_kafka_handle_Produce (rd_kafka_t *rk,
+                                     rd_kafka_broker_t *rkb,
+                                     rd_kafka_resp_err_t err,
+                                     rd_kafka_buf_t *reply,
+                                     rd_kafka_buf_t *request,
+                                     void *opaque) {
+        shptr_rd_kafka_toppar_t *s_rktp = opaque; /* from ProduceRequest() */
+        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
+        int64_t offset = RD_KAFKA_OFFSET_INVALID;
+        int64_t timestamp = -1;
+
+        /* Parse Produce reply (unless the request errored) */
+        if (!err && reply)
+                err = rd_kafka_handle_Produce_parse(rkb, rktp,
+                                                    reply, request,
+                                                    &offset, &timestamp);
+
+
+        if (likely(!err)) {
+                rd_rkb_dbg(rkb, MSG, "MSGSET",
+                           "%s [%"PRId32"]: MessageSet with %i message(s) "
+                           "delivered",
+                           rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
+                           rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt));
+
+        } else {
+                /* Error */
+                int actions;
+
+                actions = rd_kafka_err_action(
+                        rkb, err, reply, request,
+
+                        RD_KAFKA_ERR_ACTION_REFRESH,
+                        RD_KAFKA_RESP_ERR__TRANSPORT,
+
+                        RD_KAFKA_ERR_ACTION_REFRESH,
+                        RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+
+                        RD_KAFKA_ERR_ACTION_END);
+
+                rd_rkb_dbg(rkb, MSG, "MSGSET",
+                           "%s [%"PRId32"]: MessageSet with %i message(s) "
+                           "encountered error: %s (actions 0x%x)",
+                           rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
+                           rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt),
+                           rd_kafka_err2str(err), actions);
+
+                /* NOTE: REFRESH implies a later retry, which does NOT affect
+                 *       the retry count since refresh-errors are considered
+                 *       to be stale metadata rather than temporary errors.
+                 *
+                 *       This is somewhat problematic since it may cause
+                 *       duplicate messages even with retries=0 if the
+                 *       ProduceRequest made it to the broker but only the
+                 *       response was lost due to network connectivity issues.
+                 *       That problem will be sorted when EoS is implemented.
+                 */
+                if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+                        /* Request metadata information update */
+                        rd_kafka_toppar_leader_unavailable(rktp,
+                                                           "produce", err);
+
+                        /* Move messages (in the rkbuf) back to the partition's
+                         * queue head. They will be resent when a new leader
+                         * is delegated. */
+                        rd_kafka_toppar_insert_msgq(rktp, &request->rkbuf_msgq);
+
+                        /* No need for fallthru here since the request
+                         * no longer has any messages associated with it. */
+                        goto done;
+                }
+
+                if ((actions & RD_KAFKA_ERR_ACTION_RETRY) &&
+                    rd_kafka_buf_retry(rkb, request))
+                        return; /* Scheduled for retry */
+
+                /* Refresh implies a later retry through other means */
+                if (actions & RD_KAFKA_ERR_ACTION_REFRESH)
+                        goto done;
+
+                /* Translate request-level timeout error code
+                 * to message-level timeout error code. */
+                if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
+                        err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
+
+                /* Fatal errors: no message transmission retries */
+                /* FALLTHRU */
+        }
+
+        /* Propagate assigned offset and timestamp back to app. */
+        if (likely(offset != RD_KAFKA_OFFSET_INVALID)) {
+                rd_kafka_msg_t *rkm;
+                if (rktp->rktp_rkt->rkt_conf.produce_offset_report) {
+                        /* produce.offset.report: each message */
+                        TAILQ_FOREACH(rkm, &request->rkbuf_msgq.rkmq_msgs,
+                                      rkm_link) {
+                                rkm->rkm_offset = offset++;
+                                if (timestamp != -1) {
+                                        rkm->rkm_timestamp = timestamp;
+                                        rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
+                                }
+                        }
+                } else {
+                        /* Last message in each batch */
+                        rkm = TAILQ_LAST(&request->rkbuf_msgq.rkmq_msgs,
+                                         rd_kafka_msg_head_s);
+                        rkm->rkm_offset = offset +
+                                rd_atomic32_get(&request->rkbuf_msgq.
+                                                rkmq_msg_cnt) - 1;
+                        if (timestamp != -1) {
+                                rkm->rkm_timestamp = timestamp;
+                                rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
+                        }
+                }
+        }
+
+        /* Enqueue messages for delivery report */
+        rd_kafka_dr_msgq(rktp->rktp_rkt, &request->rkbuf_msgq, err);
+
+ done:
+        rd_kafka_toppar_destroy(s_rktp); /* from ProduceRequest() */
+}
+
+
+/**
+ * @brief Send ProduceRequest for messages in toppar queue.
+ *
+ * @returns the number of messages included, or 0 on error / no messages.
+ *
+ * @locality broker thread
+ */
+int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp) {
+        rd_kafka_buf_t *rkbuf;
+        rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
+        size_t MessageSetSize = 0;
+        int cnt;
+
+        /**
+         * Create ProduceRequest with as many messages from the toppar
+         * transmit queue as possible.
+         */
+        rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
+                                                      &MessageSetSize);
+        if (unlikely(!rkbuf))
+                return 0;
+
+        cnt = rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt);
+        rd_dassert(cnt > 0);
+
+        rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
+        rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize);
+
+        if (!rkt->rkt_conf.required_acks)
+                rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
+
+        /* Use timeout from first message. */
+        rkbuf->rkbuf_ts_timeout =
+                TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->rkm_ts_timeout;
+
+        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
+                                       RD_KAFKA_NO_REPLYQ,
+                                       rd_kafka_handle_Produce,
+                                       /* toppar ref for handle_Produce() */
+                                       rd_kafka_toppar_keep(rktp));
+
+        return cnt;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
new file mode 100644
index 0000000..a6a11e5
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
@@ -0,0 +1,196 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "rdkafka_cgrp.h"
+#include "rdkafka_feature.h"
+
+
+#define RD_KAFKA_ERR_ACTION_PERMANENT  0x1 /* Permanent error */
+#define RD_KAFKA_ERR_ACTION_IGNORE     0x2 /* Error can be ignored */
+#define RD_KAFKA_ERR_ACTION_REFRESH    0x4 /* Refresh state (e.g., metadata) */
+#define RD_KAFKA_ERR_ACTION_RETRY      0x8 /* Retry request after backoff */
+#define RD_KAFKA_ERR_ACTION_INFORM    0x10 /* Inform application about err */
+#define RD_KAFKA_ERR_ACTION_SPECIAL   0x20 /* Special-purpose, depends on context */
+#define RD_KAFKA_ERR_ACTION_END          0 /* var-arg sentinel */
+
+int rd_kafka_err_action (rd_kafka_broker_t *rkb,
+			 rd_kafka_resp_err_t err,
+			 rd_kafka_buf_t *rkbuf,
+			 rd_kafka_buf_t *request, ...);
+
+
+void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
+                                       const rd_kafkap_str_t *cgrp,
+                                       rd_kafka_replyq_t replyq,
+                                       rd_kafka_resp_cb_t *resp_cb,
+                                       void *opaque);
+
+rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
+					    rd_kafka_broker_t *rkb,
+					    rd_kafka_resp_err_t err,
+					    rd_kafka_buf_t *rkbuf,
+					    rd_kafka_buf_t *request,
+                                            rd_kafka_topic_partition_list_t
+                                            *offsets);
+
+void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
+                             rd_kafka_topic_partition_list_t *offsets,
+                             int16_t api_version,
+                             rd_kafka_replyq_t replyq,
+                             rd_kafka_resp_cb_t *resp_cb,
+                             void *opaque);
+
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
+			     rd_kafka_broker_t *rkb,
+			     rd_kafka_resp_err_t err,
+			     rd_kafka_buf_t *rkbuf,
+			     rd_kafka_buf_t *request,
+			     rd_kafka_topic_partition_list_t *offsets,
+			     int update_toppar);
+
+void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
+				     rd_kafka_broker_t *rkb,
+                                     rd_kafka_resp_err_t err,
+                                     rd_kafka_buf_t *rkbuf,
+                                     rd_kafka_buf_t *request,
+                                     void *opaque);
+
+void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
+                                  int16_t api_version,
+                                  rd_kafka_topic_partition_list_t *parts,
+                                  rd_kafka_replyq_t replyq,
+                                  rd_kafka_resp_cb_t *resp_cb,
+                                  void *opaque);
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
+			      rd_kafka_broker_t *rkb,
+			      rd_kafka_resp_err_t err,
+			      rd_kafka_buf_t *rkbuf,
+			      rd_kafka_buf_t *request,
+			      rd_kafka_topic_partition_list_t *offsets);
+int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
+				  rd_kafka_cgrp_t *rkcg,
+				  int16_t api_version,
+				  rd_kafka_topic_partition_list_t *offsets,
+				  rd_kafka_replyq_t replyq,
+				  rd_kafka_resp_cb_t *resp_cb,
+				  void *opaque, const char *reason);
+
+
+
+void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
+                                const rd_kafkap_str_t *group_id,
+                                const rd_kafkap_str_t *member_id,
+                                const rd_kafkap_str_t *protocol_type,
+				const rd_list_t *topics,
+                                rd_kafka_replyq_t replyq,
+                                rd_kafka_resp_cb_t *resp_cb,
+                                void *opaque);
+
+
+void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
+                                 const rd_kafkap_str_t *group_id,
+                                 const rd_kafkap_str_t *member_id,
+                                 rd_kafka_replyq_t replyq,
+                                 rd_kafka_resp_cb_t *resp_cb,
+                                 void *opaque);
+void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
+				 rd_kafka_broker_t *rkb,
+                                 rd_kafka_resp_err_t err,
+                                 rd_kafka_buf_t *rkbuf,
+                                 rd_kafka_buf_t *request,
+                                 void *opaque);
+
+void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
+                                const rd_kafkap_str_t *group_id,
+                                int32_t generation_id,
+                                const rd_kafkap_str_t *member_id,
+                                const rd_kafka_group_member_t
+                                *assignments,
+                                int assignment_cnt,
+                                rd_kafka_replyq_t replyq,
+                                rd_kafka_resp_cb_t *resp_cb,
+                                void *opaque);
+void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
+				rd_kafka_broker_t *rkb,
+                                rd_kafka_resp_err_t err,
+                                rd_kafka_buf_t *rkbuf,
+                                rd_kafka_buf_t *request,
+                                void *opaque);
+
+void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
+                                 rd_kafka_replyq_t replyq,
+                                 rd_kafka_resp_cb_t *resp_cb,
+                                 void *opaque);
+
+void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
+                                     const char **groups, int group_cnt,
+                                     rd_kafka_replyq_t replyq,
+                                     rd_kafka_resp_cb_t *resp_cb,
+                                     void *opaque);
+
+
+void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
+                                const rd_kafkap_str_t *group_id,
+                                int32_t generation_id,
+                                const rd_kafkap_str_t *member_id,
+                                rd_kafka_replyq_t replyq,
+                                rd_kafka_resp_cb_t *resp_cb,
+                                void *opaque);
+
+rd_kafka_resp_err_t
+rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
+                          const rd_list_t *topics, const char *reason,
+                          rd_kafka_op_t *rko);
+
+rd_kafka_resp_err_t
+rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
+			    rd_kafka_broker_t *rkb,
+			    rd_kafka_resp_err_t err,
+			    rd_kafka_buf_t *rkbuf,
+			    rd_kafka_buf_t *request,
+			    struct rd_kafka_ApiVersion **apis,
+			    size_t *api_cnt);
+void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
+				 rd_kafka_replyq_t replyq,
+				 rd_kafka_resp_cb_t *resp_cb,
+				 void *opaque, int flash_msg);
+
+void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
+				    const char *mechanism,
+				    rd_kafka_replyq_t replyq,
+				    rd_kafka_resp_cb_t *resp_cb,
+				    void *opaque, int flash_msg);
+
+int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
new file mode 100644
index 0000000..0482f88
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
@@ -0,0 +1,114 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_assignor.h"
+
+
+/**
+ * Source: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+ *
+ * The roundrobin assignor lays out all the available partitions and all the
+ * available consumers. It then proceeds to do a roundrobin assignment from
+ * partition to consumer. If the subscriptions of all consumer instances are
+ * identical, then the partitions will be uniformly distributed. (i.e., the 
+ * partition ownership counts will be within a delta of exactly one across all
+ * consumers.)
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and
+ * t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
+ * t0p2, t1p0, t1p1, and t1p2.
+ *
+ * The assignment will be:
+ * C0: [t0p0, t0p2, t1p1]
+ * C1: [t0p1, t1p0, t1p2]
+ */
+
+rd_kafka_resp_err_t
+rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
+					const char *member_id,
+					const char *protocol_name,
+					const rd_kafka_metadata_t *metadata,
+					rd_kafka_group_member_t *members,
+					size_t member_cnt,
+					rd_kafka_assignor_topic_t
+					**eligible_topics,
+					size_t eligible_topic_cnt,
+					char *errstr, size_t errstr_size,
+					void *opaque) {
+        unsigned int ti;
+	int next = 0; /* Next member id */
+
+	/* Sort topics by name */
+	qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics),
+	      rd_kafka_assignor_topic_cmp);
+
+	/* Sort members by name */
+	qsort(members, member_cnt, sizeof(*members),
+	      rd_kafka_group_member_cmp);
+
+        for (ti = 0 ; ti < eligible_topic_cnt ; ti++) {
+                rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti];
+		int partition;
+
+		/* For each topic+partition, assign one member (in a cyclic
+		 * iteration) per partition until the partitions are exhausted*/
+		for (partition = 0 ;
+		     partition < eligible_topic->metadata->partition_cnt ;
+		     partition++) {
+			rd_kafka_group_member_t *rkgm;
+
+			/* Scan through members until we find one with a
+			 * subscription to this topic. */
+			while (!rd_kafka_group_member_find_subscription(
+				       rk, &members[next],
+				       eligible_topic->metadata->topic))
+				next++;
+
+			rkgm = &members[next];
+
+			rd_kafka_dbg(rk, CGRP, "ASSIGN",
+				     "roundrobin: Member \"%s\": "
+				     "assigned topic %s partition %d",
+				     rkgm->rkgm_member_id->str,
+				     eligible_topic->metadata->topic,
+				     partition);
+
+			rd_kafka_topic_partition_list_add(
+				rkgm->rkgm_assignment,
+				eligible_topic->metadata->topic, partition);
+
+			next = (next+1) % rd_list_cnt(&eligible_topic->members);
+		}
+	}
+
+
+        return 0;
+}
+
+
+