You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:39 UTC
[09/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
new file mode 100644
index 0000000..2d023b4
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
@@ -0,0 +1,1848 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdarg.h>
+
+#include "rdkafka_int.h"
+#include "rdkafka_request.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_metadata.h"
+#include "rdkafka_msgset.h"
+
+#include "rdrand.h"
+
+/**
+ * Kafka protocol request and response handling.
+ * All of this code runs in the broker thread and uses op queues for
+ * propagating results back to the various sub-systems operating in
+ * other threads.
+ */
+
+
+/**
+ * @brief Decide action(s) to take based on the returned error code.
+ *
+ * The optional var-args is a .._ACTION_END terminated list
+ * of action,error tuples which overrides the general behaviour.
+ * It is to be read as: for \p error, return \p action(s).
+ */
+int rd_kafka_err_action (rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request, ...) {
+ va_list ap;
+ int actions = 0;
+ int exp_act;
+
+ /* Match explicitly defined error mappings first. */
+ va_start(ap, request);
+ while ((exp_act = va_arg(ap, int))) {
+ int exp_err = va_arg(ap, int);
+
+ if (err == exp_err)
+ actions |= exp_act;
+ }
+ va_end(ap);
+
+ if (err && rkb && request)
+ rd_rkb_dbg(rkb, BROKER, "REQERR",
+ "%sRequest failed: %s: explicit actions 0x%x",
+ rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
+ rd_kafka_err2str(err), actions);
+
+ /* Explicit error match. */
+ if (actions)
+ return actions;
+
+ /* Default error matching */
+ switch (err)
+ {
+ case RD_KAFKA_RESP_ERR_NO_ERROR:
+ break;
+ case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
+ case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
+ case RD_KAFKA_RESP_ERR__WAIT_COORD:
+ /* Request metadata information update */
+ actions |= RD_KAFKA_ERR_ACTION_REFRESH;
+ break;
+ case RD_KAFKA_RESP_ERR__TIMED_OUT:
+ case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
+ /* Broker-side request handling timeout */
+ case RD_KAFKA_RESP_ERR__TRANSPORT:
+ /* Broker connection down */
+ actions |= RD_KAFKA_ERR_ACTION_RETRY;
+ break;
+ case RD_KAFKA_RESP_ERR__DESTROY:
+ case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
+ case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
+ default:
+ actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
+ break;
+ }
+
+ return actions;
+}
+
+
+/**
+ * Send GroupCoordinatorRequest
+ */
+void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *cgrp,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1,
+ RD_KAFKAP_STR_SIZE(cgrp));
+ rd_kafka_buf_write_kstr(rkbuf, cgrp);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * @brief Parses and handles Offset replies.
+ *
+ * Returns the parsed offsets (and errors) in \p offsets
+ *
+ * @returns 0 on success, else an error.
+ */
+rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t
+ *offsets) {
+
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int32_t TopicArrayCnt;
+ int actions;
+ int16_t api_version;
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ api_version = request->rkbuf_reqhdr.ApiVersion;
+
+ /* NOTE:
+ * Broker may return offsets in a different constellation than
+ * in the original request .*/
+
+ rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+ while (TopicArrayCnt-- > 0) {
+ rd_kafkap_str_t ktopic;
+ int32_t PartArrayCnt;
+ char *topic_name;
+
+ rd_kafka_buf_read_str(rkbuf, &ktopic);
+ rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+ RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);
+
+ while (PartArrayCnt-- > 0) {
+ int32_t kpartition;
+ int32_t OffsetArrayCnt;
+ int64_t Offset = -1;
+ rd_kafka_topic_partition_t *rktpar;
+
+ rd_kafka_buf_read_i32(rkbuf, &kpartition);
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+ if (api_version == 1) {
+ int64_t Timestamp;
+ rd_kafka_buf_read_i64(rkbuf, &Timestamp);
+ rd_kafka_buf_read_i64(rkbuf, &Offset);
+ } else if (api_version == 0) {
+ rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
+ /* We only request one offset so just grab
+ * the first one. */
+ while (OffsetArrayCnt-- > 0)
+ rd_kafka_buf_read_i64(rkbuf, &Offset);
+ } else {
+ rd_kafka_assert(NULL, !*"NOTREACHED");
+ }
+
+ rktpar = rd_kafka_topic_partition_list_add(
+ offsets, topic_name, kpartition);
+ rktpar->err = ErrorCode;
+ rktpar->offset = Offset;
+ }
+ }
+
+ goto done;
+
+ err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ err:
+ actions = rd_kafka_err_action(
+ rkb, ErrorCode, rkbuf, request,
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+
+ RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ char tmp[256];
+ /* Re-query for leader */
+ rd_snprintf(tmp, sizeof(tmp),
+ "OffsetRequest failed: %s",
+ rd_kafka_err2str(ErrorCode));
+ rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
+ tmp);
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+ if (rd_kafka_buf_retry(rkb, request))
+ return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+ /* FALLTHRU */
+ }
+
+done:
+ return ErrorCode;
+}
+
+
+
+
+
+
+/**
+ * Send OffsetRequest for toppar 'rktp'.
+ */
+void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_topic_partition_list_t *partitions,
+ int16_t api_version,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int i;
+ size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
+ const char *last_topic = "";
+ int32_t topic_cnt = 0, part_cnt = 0;
+
+ rd_kafka_topic_partition_list_sort_by_topic(partitions);
+
+ rkbuf = rd_kafka_buf_new_request(
+ rkb, RD_KAFKAP_Offset, 1,
+ /* ReplicaId+TopicArrayCnt+Topic */
+ 4+4+100+
+ /* PartArrayCnt */
+ 4 +
+ /* partition_cnt * Partition+Time+MaxNumOffs */
+ (partitions->cnt * (4+8+4)));
+
+ /* ReplicaId */
+ rd_kafka_buf_write_i32(rkbuf, -1);
+ /* TopicArrayCnt */
+ of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */
+
+ for (i = 0 ; i < partitions->cnt ; i++) {
+ const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+
+ if (strcmp(rktpar->topic, last_topic)) {
+ /* Finish last topic, if any. */
+ if (of_PartArrayCnt > 0)
+ rd_kafka_buf_update_i32(rkbuf,
+ of_PartArrayCnt,
+ part_cnt);
+
+ /* Topic */
+ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+ topic_cnt++;
+ last_topic = rktpar->topic;
+ /* New topic so reset partition count */
+ part_cnt = 0;
+
+ /* PartitionArrayCnt: updated later */
+ of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+ }
+
+ /* Partition */
+ rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+ part_cnt++;
+
+ /* Time/Offset */
+ rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
+
+ if (api_version == 0) {
+ /* MaxNumberOfOffsets */
+ rd_kafka_buf_write_i32(rkbuf, 1);
+ }
+ }
+
+ if (of_PartArrayCnt > 0) {
+ rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
+ rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, api_version,
+ api_version == 1 ?
+ RD_KAFKA_FEATURE_OFFSET_TIME : 0);
+
+ rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+ "OffsetRequest (v%hd, opv %d) "
+ "for %"PRId32" topic(s) and %"PRId32" partition(s)",
+ api_version, rkbuf->rkbuf_replyq.version,
+ topic_cnt, partitions->cnt);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * Generic handler for OffsetFetch responses.
+ * Offsets for included partitions will be propagated through the passed
+ * 'offsets' list.
+ *
+ * \p update_toppar: update toppar's committed_offset
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t *offsets,
+ int update_toppar) {
+ const int log_decode_errors = LOG_ERR;
+ int32_t TopicArrayCnt;
+ int64_t offset = RD_KAFKA_OFFSET_INVALID;
+ rd_kafkap_str_t metadata;
+ int i;
+ int actions;
+ int seen_cnt = 0;
+
+ if (err)
+ goto err;
+
+ /* Set default offset for all partitions. */
+ rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0,
+ RD_KAFKA_OFFSET_INVALID,
+ 0 /* !is commit */);
+
+ rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+ for (i = 0 ; i < TopicArrayCnt ; i++) {
+ rd_kafkap_str_t topic;
+ int32_t PartArrayCnt;
+ char *topic_name;
+ int j;
+
+ rd_kafka_buf_read_str(rkbuf, &topic);
+ rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+ RD_KAFKAP_STR_DUPA(&topic_name, &topic);
+
+ for (j = 0 ; j < PartArrayCnt ; j++) {
+ int32_t partition;
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_topic_partition_t *rktpar;
+ int16_t err2;
+
+ rd_kafka_buf_read_i32(rkbuf, &partition);
+ rd_kafka_buf_read_i64(rkbuf, &offset);
+ rd_kafka_buf_read_str(rkbuf, &metadata);
+ rd_kafka_buf_read_i16(rkbuf, &err2);
+
+ rktpar = rd_kafka_topic_partition_list_find(offsets,
+ topic_name,
+ partition);
+ if (!rktpar) {
+ rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
+ "OffsetFetchResponse: %s [%"PRId32"] "
+ "not found in local list: ignoring",
+ topic_name, partition);
+ continue;
+ }
+
+ seen_cnt++;
+
+ if (!(s_rktp = rktpar->_private)) {
+ s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk,
+ topic_name,
+ partition, 0, 0);
+ /* May be NULL if topic is not locally known */
+ rktpar->_private = s_rktp;
+ }
+
+ /* broker reports invalid offset as -1 */
+ if (offset == -1)
+ rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+ else
+ rktpar->offset = offset;
+ rktpar->err = err2;
+
+ rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
+ "OffsetFetchResponse: %s [%"PRId32"] offset %"PRId64,
+ topic_name, partition, offset);
+
+ if (update_toppar && !err2 && s_rktp) {
+ rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
+ /* Update toppar's committed offset */
+ rd_kafka_toppar_lock(rktp);
+ rktp->rktp_committed_offset = rktpar->offset;
+ rd_kafka_toppar_unlock(rktp);
+ }
+
+
+ if (rktpar->metadata)
+ rd_free(rktpar->metadata);
+
+ if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
+ rktpar->metadata = NULL;
+ rktpar->metadata_size = 0;
+ } else {
+ rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
+ rktpar->metadata_size =
+ RD_KAFKAP_STR_LEN(&metadata);
+ }
+ }
+ }
+
+
+err:
+ rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
+ "OffsetFetch for %d/%d partition(s) returned %s",
+ seen_cnt,
+ offsets ? offsets->cnt : -1, rd_kafka_err2str(err));
+
+ actions = rd_kafka_err_action(rkb, err, rkbuf, request,
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL,
+ RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, err);
+ if (request) {
+ /* Schedule a retry */
+ rd_kafka_buf_keep(request);
+ rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
+ }
+ }
+
+ return err;
+
+ err_parse:
+ err = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+
+/**
+ * opaque=rko wrapper for handle_OffsetFetch.
+ * rko->rko_payload MUST be a `rd_kafka_topic_partition_list_t *` which will
+ * be filled in with fetch offsets.
+ *
+ * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
+ *
+ * Locality: cgrp's broker thread
+ */
+void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_op_t *rko = opaque;
+ rd_kafka_op_t *rko_reply;
+ rd_kafka_topic_partition_list_t *offsets;
+
+ RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* Termination, quick cleanup. */
+ rd_kafka_op_destroy(rko);
+ return;
+ }
+
+ offsets = rd_kafka_topic_partition_list_copy(
+ rko->rko_u.offset_fetch.partitions);
+
+ rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY);
+ rko_reply->rko_err = err;
+ rko_reply->rko_u.offset_fetch.partitions = offsets;
+ rko_reply->rko_u.offset_fetch.do_free = 1;
+ if (rko->rko_rktp)
+ rko_reply->rko_rktp = rd_kafka_toppar_keep(
+ rd_kafka_toppar_s2i(rko->rko_rktp));
+
+ /* If all partitions already had usable offsets then there
+ * was no request sent and thus no reply, the offsets list is
+ * good to go. */
+ if (rkbuf)
+ rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf,
+ request, offsets, 0);
+
+ rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);
+
+ rd_kafka_op_destroy(rko);
+}
+
+
+
+
+
+
+/**
+ * Send OffsetFetchRequest for toppar.
+ *
+ * Any partition with a usable offset will be ignored, if all partitions
+ * have usable offsets then no request is sent at all but an empty
+ * reply is enqueued on the replyq.
+ */
+void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
+ int16_t api_version,
+ rd_kafka_topic_partition_list_t *parts,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ size_t of_TopicCnt;
+ int TopicCnt = 0;
+ ssize_t of_PartCnt = -1;
+ const char *last_topic = NULL;
+ int PartCnt = 0;
+ int tot_PartCnt = 0;
+ int i;
+
+ rkbuf = rd_kafka_buf_new_request(
+ rkb, RD_KAFKAP_OffsetFetch, 1,
+ RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) +
+ 4 +
+ (parts->cnt * 32));
+
+
+ /* ConsumerGroup */
+ rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id);
+
+ /* Sort partitions by topic */
+ rd_kafka_topic_partition_list_sort_by_topic(parts);
+
+ /* TopicArrayCnt */
+ of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
+
+ for (i = 0 ; i < parts->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
+
+ /* Ignore partitions with a usable offset. */
+ if (rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
+ rktpar->offset != RD_KAFKA_OFFSET_STORED) {
+ rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+ "OffsetFetchRequest: skipping %s [%"PRId32"] "
+ "with valid offset %s",
+ rktpar->topic, rktpar->partition,
+ rd_kafka_offset2str(rktpar->offset));
+ continue;
+ }
+
+ if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
+ /* New topic */
+
+ /* Finalize previous PartitionCnt */
+ if (PartCnt > 0)
+ rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
+ PartCnt);
+
+ /* TopicName */
+ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+ /* PartitionCnt, finalized later */
+ of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+ PartCnt = 0;
+ last_topic = rktpar->topic;
+ TopicCnt++;
+ }
+
+ /* Partition */
+ rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+ PartCnt++;
+ tot_PartCnt++;
+ }
+
+ /* Finalize previous PartitionCnt */
+ if (PartCnt > 0)
+ rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
+
+ /* Finalize TopicCnt */
+ rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
+
+ rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+ "OffsetFetchRequest(v%d) for %d/%d partition(s)",
+ api_version, tot_PartCnt, parts->cnt);
+
+ if (tot_PartCnt == 0) {
+ /* No partitions needs OffsetFetch, enqueue empty
+ * response right away. */
+ rkbuf->rkbuf_replyq = replyq;
+ rkbuf->rkbuf_cb = resp_cb;
+ rkbuf->rkbuf_opaque = opaque;
+ rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
+ return;
+ }
+
+
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * @remark \p offsets may be NULL if \p err is set
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t *offsets) {
+ const int log_decode_errors = LOG_ERR;
+ int32_t TopicArrayCnt;
+ int16_t ErrorCode = 0, last_ErrorCode = 0;
+ int errcnt = 0;
+ int i;
+ int actions;
+
+ if (err)
+ goto err;
+
+ rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+ for (i = 0 ; i < TopicArrayCnt ; i++) {
+ rd_kafkap_str_t topic;
+ char *topic_str;
+ int32_t PartArrayCnt;
+ int j;
+
+ rd_kafka_buf_read_str(rkbuf, &topic);
+ rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+ RD_KAFKAP_STR_DUPA(&topic_str, &topic);
+
+ for (j = 0 ; j < PartArrayCnt ; j++) {
+ int32_t partition;
+ rd_kafka_topic_partition_t *rktpar;
+
+ rd_kafka_buf_read_i32(rkbuf, &partition);
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+ rktpar = rd_kafka_topic_partition_list_find(
+ offsets, topic_str, partition);
+
+ if (!rktpar) {
+ /* Received offset for topic/partition we didn't
+ * ask for, this shouldn't really happen. */
+ continue;
+ }
+
+ rktpar->err = ErrorCode;
+ if (ErrorCode) {
+ last_ErrorCode = ErrorCode;
+ errcnt++;
+ }
+ }
+ }
+
+ /* If all partitions failed use error code
+ * from last partition as the global error. */
+ if (offsets && errcnt == offsets->cnt)
+ err = last_ErrorCode;
+ goto done;
+
+ err_parse:
+ err = rkbuf->rkbuf_err;
+
+ err:
+ actions = rd_kafka_err_action(
+ rkb, err, rkbuf, request,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
+
+ RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
+
+ RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
+ RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
+
+ RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
+
+ RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+
+ RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+
+ RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) {
+ /* Mark coordinator dead or re-query for coordinator.
+ * ..dead() will trigger a re-query. */
+ if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
+ rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
+ "OffsetCommitRequest failed");
+ else
+ rd_kafka_cgrp_coord_query(rk->rk_cgrp,
+ "OffsetCommitRequest failed");
+ }
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+ if (rd_kafka_buf_retry(rkb, request))
+ return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+ /* FALLTHRU */
+ }
+
+ done:
+ return err;
+}
+
+
+
+
+/**
+ * @brief Send OffsetCommitRequest for a list of partitions.
+ *
+ * @returns 0 if none of the partitions in \p offsets had valid offsets,
+ * else 1.
+ */
+int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_cgrp_t *rkcg,
+ int16_t api_version,
+ rd_kafka_topic_partition_list_t *offsets,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque, const char *reason) {
+ rd_kafka_buf_t *rkbuf;
+ ssize_t of_TopicCnt = -1;
+ int TopicCnt = 0;
+ const char *last_topic = NULL;
+ ssize_t of_PartCnt = -1;
+ int PartCnt = 0;
+ int tot_PartCnt = 0;
+ int i;
+
+ rd_kafka_assert(NULL, offsets != NULL);
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit,
+ 1, 100 + (offsets->cnt * 128));
+
+ /* ConsumerGroup */
+ rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id);
+
+ /* v1,v2 */
+ if (api_version >= 1) {
+ /* ConsumerGroupGenerationId */
+ rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id);
+ /* ConsumerId */
+ rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id);
+ /* v2: RetentionTime */
+ if (api_version == 2)
+ rd_kafka_buf_write_i64(rkbuf, -1);
+ }
+
+ /* Sort offsets by topic */
+ rd_kafka_topic_partition_list_sort_by_topic(offsets);
+
+ /* TopicArrayCnt: Will be updated when we know the number of topics. */
+ of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+
+ for (i = 0 ; i < offsets->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
+
+ /* Skip partitions with invalid offset. */
+ if (rktpar->offset < 0)
+ continue;
+
+ if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
+ /* New topic */
+
+ /* Finalize previous PartitionCnt */
+ if (PartCnt > 0)
+ rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
+ PartCnt);
+
+ /* TopicName */
+ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+ /* PartitionCnt, finalized later */
+ of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+ PartCnt = 0;
+ last_topic = rktpar->topic;
+ TopicCnt++;
+ }
+
+ /* Partition */
+ rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+ PartCnt++;
+ tot_PartCnt++;
+
+ /* Offset */
+ rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
+
+ /* v1: TimeStamp */
+ if (api_version == 1)
+ rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time
+
+ /* Metadata */
+ /* Java client 0.9.0 and broker <0.10.0 can't parse
+ * Null metadata fields, so as a workaround we send an
+ * empty string if it's Null. */
+ if (!rktpar->metadata)
+ rd_kafka_buf_write_str(rkbuf, "", 0);
+ else
+ rd_kafka_buf_write_str(rkbuf,
+ rktpar->metadata,
+ rktpar->metadata_size);
+ }
+
+ if (tot_PartCnt == 0) {
+ /* No topic+partitions had valid offsets to commit. */
+ rd_kafka_replyq_destroy(&replyq);
+ rd_kafka_buf_destroy(rkbuf);
+ return 0;
+ }
+
+ /* Finalize previous PartitionCnt */
+ if (PartCnt > 0)
+ rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
+
+ /* Finalize TopicCnt */
+ rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
+
+ rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+ "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
+ api_version, tot_PartCnt, offsets->cnt, reason);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return 1;
+
+}
+
+
+
+/**
+ * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
+ * enveloping buffer \p rkbuf.
+ */
+static void rd_kafka_group_MemberState_consumer_write (
+ rd_kafka_buf_t *env_rkbuf,
+ const rd_kafka_group_member_t *rkgm) {
+ rd_kafka_buf_t *rkbuf;
+ int i;
+ const char *last_topic = NULL;
+ size_t of_TopicCnt;
+ ssize_t of_PartCnt = -1;
+ int TopicCnt = 0;
+ int PartCnt = 0;
+ rd_slice_t slice;
+
+ rkbuf = rd_kafka_buf_new(1, 100);
+ rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
+ of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
+ for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) {
+ const rd_kafka_topic_partition_t *rktpar;
+
+ rktpar = &rkgm->rkgm_assignment->elems[i];
+
+ if (!last_topic || strcmp(last_topic,
+ rktpar->topic)) {
+ if (last_topic)
+ /* Finalize previous PartitionCnt */
+ rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
+ PartCnt);
+ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+ /* Updated later */
+ of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+ PartCnt = 0;
+ last_topic = rktpar->topic;
+ TopicCnt++;
+ }
+
+ rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+ PartCnt++;
+ }
+
+ if (of_PartCnt != -1)
+ rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt);
+ rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);
+
+ rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);
+
+ /* Get pointer to binary buffer */
+ rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);
+
+ /* Write binary buffer as Kafka Bytes to enveloping buffer. */
+ rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
+ rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);
+
+ rd_kafka_buf_destroy(rkbuf);
+}
+
+/**
+ * Send SyncGroupRequest
+ */
+void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ int32_t generation_id,
+ const rd_kafkap_str_t *member_id,
+ const rd_kafka_group_member_t
+ *assignments,
+ int assignment_cnt,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int i;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup,
+ 1,
+ RD_KAFKAP_STR_SIZE(group_id) +
+ 4 /* GenerationId */ +
+ RD_KAFKAP_STR_SIZE(member_id) +
+ 4 /* array size group_assignment */ +
+ (assignment_cnt * 100/*guess*/));
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_i32(rkbuf, generation_id);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+ rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
+
+ for (i = 0 ; i < assignment_cnt ; i++) {
+ const rd_kafka_group_member_t *rkgm = &assignments[i];
+
+ rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
+ rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
+ }
+
+ /* This is a blocking request */
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
+ rkbuf->rkbuf_ts_timeout = rd_clock() +
+ (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000) +
+ (3*1000*1000/* 3s grace period*/);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+/**
+ * Handler for SyncGroup responses
+ * opaque must be the cgrp handle.
+ */
+void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ rd_kafkap_bytes_t MemberState = RD_ZERO_INIT;
+ int actions;
+
+ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
+ "SyncGroup response: discarding outdated request "
+ "(now in join-state %s)",
+ rd_kafka_cgrp_join_state_names[rkcg->
+ rkcg_join_state]);
+ return;
+ }
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ rd_kafka_buf_read_bytes(rkbuf, &MemberState);
+
+err:
+ actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY,
+ ErrorCode);
+ /* FALLTHRU */
+ }
+
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
+ "SyncGroup response: %s (%d bytes of MemberState data)",
+ rd_kafka_err2str(ErrorCode),
+ RD_KAFKAP_BYTES_LEN(&MemberState));
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+ return; /* Termination */
+
+ rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState);
+
+ return;
+
+ err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+/**
+ * Send JoinGroupRequest
+ */
+void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *member_id,
+ const rd_kafkap_str_t *protocol_type,
+ const rd_list_t *topics,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_assignor_t *rkas;
+ int i;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup,
+ 1,
+ RD_KAFKAP_STR_SIZE(group_id) +
+ 4 /* sessionTimeoutMs */ +
+ RD_KAFKAP_STR_SIZE(member_id) +
+ RD_KAFKAP_STR_SIZE(protocol_type) +
+ 4 /* array count GroupProtocols */ +
+ (rd_list_cnt(topics) * 100));
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+ rd_kafka_buf_write_kstr(rkbuf, protocol_type);
+ rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
+
+ RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
+ rd_kafkap_bytes_t *member_metadata;
+ if (!rkas->rkas_enabled)
+ continue;
+ rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
+ member_metadata = rkas->rkas_get_metadata_cb(rkas, topics);
+ rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
+ rd_kafkap_bytes_destroy(member_metadata);
+ }
+
+ /* This is a blocking request */
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
+ rkbuf->rkbuf_ts_timeout = rd_clock() +
+ (rk->rk_conf.group_session_timeout_ms * 1000) +
+ (3*1000*1000/* 3s grace period*/);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+
+
+/**
+ * Send LeaveGroupRequest
+ */
+void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *member_id,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup,
+ 1,
+ RD_KAFKAP_STR_SIZE(group_id) +
+ RD_KAFKAP_STR_SIZE(member_id));
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * Handler for LeaveGroup responses
+ * opaque must be the cgrp handle.
+ */
+void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int actions;
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+
+err:
+ actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+ /* Schedule a retry */
+ rd_kafka_buf_keep(request);
+ rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
+ return;
+ }
+
+ if (ErrorCode)
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
+ "LeaveGroup response: %s",
+ rd_kafka_err2str(ErrorCode));
+
+ err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+
+
+
+
+/**
+ * Send HeartbeatRequest
+ */
+void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ int32_t generation_id,
+ const rd_kafkap_str_t *member_id,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+
+ rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
+ "Heartbeat for group \"%s\" generation id %"PRId32,
+ group_id->str, generation_id);
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat,
+ 1,
+ RD_KAFKAP_STR_SIZE(group_id) +
+ 4 /* GenerationId */ +
+ RD_KAFKAP_STR_SIZE(member_id));
+
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_i32(rkbuf, generation_id);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+
+ rkbuf->rkbuf_ts_timeout = rd_clock() +
+ (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * Send ListGroupsRequest
+ */
+void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * Send DescribeGroupsRequest
+ */
+void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
+ const char **groups, int group_cnt,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups,
+ 1, 32*group_cnt);
+
+ rd_kafka_buf_write_i32(rkbuf, group_cnt);
+ while (group_cnt-- > 0)
+ rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * @brief Generic handler for Metadata responses
+ *
+ * @locality rdkafka main thread
+ */
+static void rd_kafka_handle_Metadata (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_op_t *rko = opaque; /* Possibly NULL */
+ struct rd_kafka_metadata *md = NULL;
+ const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
+
+ rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
+ thrd_is_current(rk->rk_thread));
+
+ /* Avoid metadata updates when we're terminating. */
+ if (rd_kafka_terminating(rkb->rkb_rk))
+ err = RD_KAFKA_RESP_ERR__DESTROY;
+
+ if (unlikely(err)) {
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* Terminating */
+ goto done;
+ }
+
+ /* FIXME: handle errors */
+ rd_rkb_log(rkb, LOG_WARNING, "METADATA",
+ "Metadata request failed: %s (%dms)",
+ rd_kafka_err2str(err),
+ (int)(request->rkbuf_ts_sent/1000));
+ } else {
+
+ if (!topics)
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "===== Received metadata: %s =====",
+ request->rkbuf_u.Metadata.reason);
+ else
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "===== Received metadata "
+ "(for %d requested topics): %s =====",
+ rd_list_cnt(topics),
+ request->rkbuf_u.Metadata.reason);
+
+ md = rd_kafka_parse_Metadata(rkb, request, rkbuf);
+ if (!md) {
+ if (rd_kafka_buf_retry(rkb, request))
+ return;
+ err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ }
+ }
+
+ if (rko && rko->rko_replyq.q) {
+ /* Reply to metadata requester, passing on the metadata.
+ * Reuse requesting rko for the reply. */
+ rko->rko_err = err;
+ rko->rko_u.metadata.md = md;
+
+ rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
+ rko = NULL;
+ } else {
+ if (md)
+ rd_free(md);
+ }
+
+ done:
+ if (rko)
+ rd_kafka_op_destroy(rko);
+}
+
+
+
+/**
+ * @brief Construct MetadataRequest (does not send)
+ *
+ * \p topics is a list of topic names (char *) to request.
+ *
+ * !topics - only request brokers (if supported by broker, else
+ * all topics)
+ * topics.cnt==0 - all topics in cluster are requested
+ * topics.cnt >0 - only specified topics are requested
+ *
+ * @param reason - metadata request reason
+ * @param rko - (optional) rko with replyq for handling response.
+ * Specifying an rko forces a metadata request even if
+ * there is already a matching one in-transit.
+ *
+ * If full metadata for all topics is requested (or all brokers, which
+ * results in all-topics on older brokers) and there is already a full request
+ * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
+ * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
+ * is sent regardless.
+ */
+rd_kafka_resp_err_t
+rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
+ const rd_list_t *topics, const char *reason,
+ rd_kafka_op_t *rko) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+ int topic_cnt = topics ? rd_list_cnt(topics) : 0;
+ int *full_incr = NULL;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb,
+ RD_KAFKAP_Metadata,
+ 0, 2,
+ &features);
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1,
+ 4 + (50 * topic_cnt));
+
+ if (!reason)
+ reason = "";
+
+ rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);
+
+ if (!topics && ApiVersion >= 1) {
+ /* a null(0) array (in the protocol) represents no topics */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Request metadata for brokers only: %s", reason);
+ full_incr = &rkb->rkb_rk->rk_metadata_cache.
+ rkmc_full_brokers_sent;
+
+ } else {
+ if (topic_cnt == 0 && !rko)
+ full_incr = &rkb->rkb_rk->rk_metadata_cache.
+ rkmc_full_topics_sent;
+
+ if (topic_cnt == 0 && ApiVersion >= 1)
+ rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/
+ else
+ rd_kafka_buf_write_i32(rkbuf, topic_cnt);
+
+ if (topic_cnt == 0) {
+ rkbuf->rkbuf_u.Metadata.all_topics = 1;
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Request metadata for all topics: "
+ "%s", reason);
+ } else
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Request metadata for %d topic(s): "
+ "%s", topic_cnt, reason);
+ }
+
+ if (full_incr) {
+ /* Avoid multiple outstanding full requests
+ * (since they are redundant and side-effect-less).
+ * Forced requests (app using metadata() API) are passed
+ * through regardless. */
+
+ mtx_lock(&rkb->rkb_rk->rk_metadata_cache.
+ rkmc_full_lock);
+ if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
+ mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
+ rkmc_full_lock);
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Skipping metadata request: %s: "
+ "full request already in-transit",
+ reason);
+ rd_kafka_buf_destroy(rkbuf);
+ return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
+ }
+
+ (*full_incr)++;
+ mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
+ rkmc_full_lock);
+ rkbuf->rkbuf_u.Metadata.decr = full_incr;
+ rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk->
+ rk_metadata_cache.rkmc_full_lock;
+ }
+
+
+ if (topic_cnt > 0) {
+ char *topic;
+ int i;
+
+ /* Maintain a copy of the topics list so we can purge
+ * hints from the metadata cache on error. */
+ rkbuf->rkbuf_u.Metadata.topics =
+ rd_list_copy(topics, rd_list_string_copy, NULL);
+
+ RD_LIST_FOREACH(topic, topics, i)
+ rd_kafka_buf_write_str(rkbuf, topic, -1);
+
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ /* Metadata requests are part of the important control plane
+ * and should go before other requests (Produce, Fetch, etc). */
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLASH;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
+ /* Handle response thru rk_ops,
+ * but forward parsed result to
+ * rko's replyq when done. */
+ RD_KAFKA_REPLYQ(rkb->rkb_rk->
+ rk_ops, 0),
+ rd_kafka_handle_Metadata, rko);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+
+
+
+
+
+
+/**
+ * @brief Parses and handles ApiVersion reply.
+ *
+ * @param apis will be allocated, populated and sorted
+ * with broker's supported APIs.
+ * @param api_cnt will be set to the number of elements in \p *apis
+
+ * @returns 0 on success, else an error.
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ struct rd_kafka_ApiVersion **apis,
+ size_t *api_cnt) {
+ const int log_decode_errors = LOG_ERR;
+ int actions;
+ int32_t ApiArrayCnt;
+ int16_t ErrorCode;
+ int i = 0;
+
+ *apis = NULL;
+
+ if (err)
+ goto err;
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ if ((err = ErrorCode))
+ goto err;
+
+ rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt);
+ if (ApiArrayCnt > 1000)
+ rd_kafka_buf_parse_fail(rkbuf,
+ "ApiArrayCnt %"PRId32" out of range",
+ ApiArrayCnt);
+
+ rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
+ "Broker API support:");
+
+ *apis = malloc(sizeof(**apis) * ApiArrayCnt);
+
+ for (i = 0 ; i < ApiArrayCnt ; i++) {
+ struct rd_kafka_ApiVersion *api = &(*apis)[i];
+
+ rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
+ rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
+ rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);
+
+ rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
+ " ApiKey %s (%hd) Versions %hd..%hd",
+ rd_kafka_ApiKey2str(api->ApiKey),
+ api->ApiKey, api->MinVer, api->MaxVer);
+ }
+
+ *api_cnt = ApiArrayCnt;
+ qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);
+
+ goto done;
+
+ err_parse:
+ err = rkbuf->rkbuf_err;
+ err:
+ if (*apis)
+ rd_free(*apis);
+
+ actions = rd_kafka_err_action(
+ rkb, err, rkbuf, request,
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+ if (rd_kafka_buf_retry(rkb, request))
+ return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+ /* FALLTHRU */
+ }
+
+done:
+ return err;
+}
+
+
+
+/**
+ * Send ApiVersionRequest (KIP-35)
+ */
+void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque, int flash_msg) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4);
+ rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
+ rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */
+
+ /* Non-supporting brokers will tear down the connection when they
+ * receive an unknown API request, so dont retry request on failure. */
+ rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
+
+ /* 0.9.0.x brokers will not close the connection on unsupported
+ * API requests, so we minimize the timeout for the request.
+ * This is a regression on the broker part. */
+ rkbuf->rkbuf_ts_timeout = rd_clock() + (rkb->rkb_rk->rk_conf.api_version_request_timeout_ms * 1000);
+
+ if (replyq.q)
+ rd_kafka_broker_buf_enq_replyq(rkb,
+ rkbuf, replyq, resp_cb, opaque);
+ else /* in broker thread */
+ rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
+}
+
+
+/**
+ * Send SaslHandshakeRequest (KIP-43)
+ */
+void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
+ const char *mechanism,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque, int flash_msg) {
+ rd_kafka_buf_t *rkbuf;
+ int mechlen = (int)strlen(mechanism);
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake,
+ 1, RD_KAFKAP_STR_SIZE0(mechlen));
+ rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
+ rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);
+
+ /* Non-supporting brokers will tear down the conneciton when they
+ * receive an unknown API request or where the SASL GSSAPI
+ * token type is not recognized, so dont retry request on failure. */
+ rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
+
+ /* 0.9.0.x brokers will not close the connection on unsupported
+ * API requests, so we minimize the timeout of the request.
+ * This is a regression on the broker part. */
+ if (!rkb->rkb_rk->rk_conf.api_version_request &&
+ rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000)
+ rkbuf->rkbuf_ts_timeout = rd_clock() + (10 * 1000 * 1000);
+
+ if (replyq.q)
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq,
+ resp_cb, opaque);
+ else /* in broker thread */
+ rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
+}
+
+
+
+
+/**
+ * @brief Parses a Produce reply.
+ * @returns 0 on success or an error code on failure.
+ * @locality broker thread
+ */
+static rd_kafka_resp_err_t
+rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ int64_t *offsetp,
+ int64_t *timestampp) {
+ int32_t TopicArrayCnt;
+ int32_t PartitionArrayCnt;
+ struct {
+ int32_t Partition;
+ int16_t ErrorCode;
+ int64_t Offset;
+ } hdr;
+ const int log_decode_errors = LOG_ERR;
+
+ rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+ if (TopicArrayCnt != 1)
+ goto err;
+
+ /* Since we only produce to one single topic+partition in each
+ * request we assume that the reply only contains one topic+partition
+ * and that it is the same that we requested.
+ * If not the broker is buggy. */
+ rd_kafka_buf_skip_str(rkbuf);
+ rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
+
+ if (PartitionArrayCnt != 1)
+ goto err;
+
+ rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
+ rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
+ rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
+
+ *offsetp = hdr.Offset;
+
+ *timestampp = -1;
+ if (request->rkbuf_reqhdr.ApiVersion >= 2) {
+ rd_kafka_buf_read_i64(rkbuf, timestampp);
+ }
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1) {
+ int32_t Throttle_Time;
+ rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
+
+ rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
+ Throttle_Time);
+ }
+
+
+ return hdr.ErrorCode;
+
+ err_parse:
+ return rkbuf->rkbuf_err;
+ err:
+ return RD_KAFKA_RESP_ERR__BAD_MSG;
+}
+
+
+/**
+ * @brief Handle ProduceResponse
+ *
+ * @locality broker thread
+ */
+static void rd_kafka_handle_Produce (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ shptr_rd_kafka_toppar_t *s_rktp = opaque; /* from ProduceRequest() */
+ rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
+ int64_t offset = RD_KAFKA_OFFSET_INVALID;
+ int64_t timestamp = -1;
+
+ /* Parse Produce reply (unless the request errored) */
+ if (!err && reply)
+ err = rd_kafka_handle_Produce_parse(rkb, rktp,
+ reply, request,
+ &offset, ×tamp);
+
+
+ if (likely(!err)) {
+ rd_rkb_dbg(rkb, MSG, "MSGSET",
+ "%s [%"PRId32"]: MessageSet with %i message(s) "
+ "delivered",
+ rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
+ rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt));
+
+ } else {
+ /* Error */
+ int actions;
+
+ actions = rd_kafka_err_action(
+ rkb, err, reply, request,
+
+ RD_KAFKA_ERR_ACTION_REFRESH,
+ RD_KAFKA_RESP_ERR__TRANSPORT,
+
+ RD_KAFKA_ERR_ACTION_REFRESH,
+ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ rd_rkb_dbg(rkb, MSG, "MSGSET",
+ "%s [%"PRId32"]: MessageSet with %i message(s) "
+ "encountered error: %s (actions 0x%x)",
+ rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
+ rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt),
+ rd_kafka_err2str(err), actions);
+
+ /* NOTE: REFRESH implies a later retry, which does NOT affect
+ * the retry count since refresh-errors are considered
+ * to be stale metadata rather than temporary errors.
+ *
+ * This is somewhat problematic since it may cause
+ * duplicate messages even with retries=0 if the
+ * ProduceRequest made it to the broker but only the
+ * response was lost due to network connectivity issues.
+ * That problem will be sorted when EoS is implemented.
+ */
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Request metadata information update */
+ rd_kafka_toppar_leader_unavailable(rktp,
+ "produce", err);
+
+ /* Move messages (in the rkbuf) back to the partition's
+ * queue head. They will be resent when a new leader
+ * is delegated. */
+ rd_kafka_toppar_insert_msgq(rktp, &request->rkbuf_msgq);
+
+ /* No need for fallthru here since the request
+ * no longer has any messages associated with it. */
+ goto done;
+ }
+
+ if ((actions & RD_KAFKA_ERR_ACTION_RETRY) &&
+ rd_kafka_buf_retry(rkb, request))
+ return; /* Scheduled for retry */
+
+ /* Refresh implies a later retry through other means */
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH)
+ goto done;
+
+ /* Translate request-level timeout error code
+ * to message-level timeout error code. */
+ if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
+ err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
+
+ /* Fatal errors: no message transmission retries */
+ /* FALLTHRU */
+ }
+
+ /* Propagate assigned offset and timestamp back to app. */
+ if (likely(offset != RD_KAFKA_OFFSET_INVALID)) {
+ rd_kafka_msg_t *rkm;
+ if (rktp->rktp_rkt->rkt_conf.produce_offset_report) {
+ /* produce.offset.report: each message */
+ TAILQ_FOREACH(rkm, &request->rkbuf_msgq.rkmq_msgs,
+ rkm_link) {
+ rkm->rkm_offset = offset++;
+ if (timestamp != -1) {
+ rkm->rkm_timestamp = timestamp;
+ rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
+ }
+ }
+ } else {
+ /* Last message in each batch */
+ rkm = TAILQ_LAST(&request->rkbuf_msgq.rkmq_msgs,
+ rd_kafka_msg_head_s);
+ rkm->rkm_offset = offset +
+ rd_atomic32_get(&request->rkbuf_msgq.
+ rkmq_msg_cnt) - 1;
+ if (timestamp != -1) {
+ rkm->rkm_timestamp = timestamp;
+ rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
+ }
+ }
+ }
+
+ /* Enqueue messages for delivery report */
+ rd_kafka_dr_msgq(rktp->rktp_rkt, &request->rkbuf_msgq, err);
+
+ done:
+ rd_kafka_toppar_destroy(s_rktp); /* from ProduceRequest() */
+}
+
+
+/**
+ * @brief Send ProduceRequest for messages in toppar queue.
+ *
+ * @returns the number of messages included, or 0 on error / no messages.
+ *
+ * @locality broker thread
+ */
+int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp) {
+ rd_kafka_buf_t *rkbuf;
+ rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
+ size_t MessageSetSize = 0;
+ int cnt;
+
+ /**
+ * Create ProduceRequest with as many messages from the toppar
+ * transmit queue as possible.
+ */
+ rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
+ &MessageSetSize);
+ if (unlikely(!rkbuf))
+ return 0;
+
+ cnt = rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt);
+ rd_dassert(cnt > 0);
+
+ rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
+ rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize);
+
+ if (!rkt->rkt_conf.required_acks)
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
+
+ /* Use timeout from first message. */
+ rkbuf->rkbuf_ts_timeout =
+ TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->rkm_ts_timeout;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
+ RD_KAFKA_NO_REPLYQ,
+ rd_kafka_handle_Produce,
+ /* toppar ref for handle_Produce() */
+ rd_kafka_toppar_keep(rktp));
+
+ return cnt;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
new file mode 100644
index 0000000..a6a11e5
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
@@ -0,0 +1,196 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "rdkafka_cgrp.h"
+#include "rdkafka_feature.h"
+
+
+#define RD_KAFKA_ERR_ACTION_PERMANENT 0x1 /* Permanent error */
+#define RD_KAFKA_ERR_ACTION_IGNORE 0x2 /* Error can be ignored */
+#define RD_KAFKA_ERR_ACTION_REFRESH 0x4 /* Refresh state (e.g., metadata) */
+#define RD_KAFKA_ERR_ACTION_RETRY 0x8 /* Retry request after backoff */
+#define RD_KAFKA_ERR_ACTION_INFORM 0x10 /* Inform application about err */
+#define RD_KAFKA_ERR_ACTION_SPECIAL 0x20 /* Special-purpose, depends on context */
+#define RD_KAFKA_ERR_ACTION_END 0 /* var-arg sentinel */
+
+int rd_kafka_err_action (rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request, ...);
+
+
+void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *cgrp,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t
+ *offsets);
+
+void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_topic_partition_list_t *offsets,
+ int16_t api_version,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t *offsets,
+ int update_toppar);
+
+void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque);
+
+void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
+ int16_t api_version,
+ rd_kafka_topic_partition_list_t *parts,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t *offsets);
+int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_cgrp_t *rkcg,
+ int16_t api_version,
+ rd_kafka_topic_partition_list_t *offsets,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque, const char *reason);
+
+
+
+void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *member_id,
+ const rd_kafkap_str_t *protocol_type,
+ const rd_list_t *topics,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+
+void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *member_id,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque);
+
+void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ int32_t generation_id,
+ const rd_kafkap_str_t *member_id,
+ const rd_kafka_group_member_t
+ *assignments,
+ int assignment_cnt,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque);
+
+void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
+ const char **groups, int group_cnt,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+
+void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ int32_t generation_id,
+ const rd_kafkap_str_t *member_id,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+rd_kafka_resp_err_t
+rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
+ const rd_list_t *topics, const char *reason,
+ rd_kafka_op_t *rko);
+
+rd_kafka_resp_err_t
+rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ struct rd_kafka_ApiVersion **apis,
+ size_t *api_cnt);
+void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque, int flash_msg);
+
+void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
+ const char *mechanism,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque, int flash_msg);
+
+int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
new file mode 100644
index 0000000..0482f88
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
@@ -0,0 +1,114 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_assignor.h"
+
+
+/**
+ * Source: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+ *
+ * The roundrobin assignor lays out all the available partitions and all the
+ * available consumers. It then proceeds to do a roundrobin assignment from
+ * partition to consumer. If the subscriptions of all consumer instances are
+ * identical, then the partitions will be uniformly distributed. (i.e., the
+ * partition ownership counts will be within a delta of exactly one across all
+ * consumers.)
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and
+ * t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
+ * t0p2, t1p0, t1p1, and t1p2.
+ *
+ * The assignment will be:
+ * C0: [t0p0, t0p2, t1p1]
+ * C1: [t0p1, t1p0, t1p2]
+ */
+
+rd_kafka_resp_err_t
+rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
+ const char *member_id,
+ const char *protocol_name,
+ const rd_kafka_metadata_t *metadata,
+ rd_kafka_group_member_t *members,
+ size_t member_cnt,
+ rd_kafka_assignor_topic_t
+ **eligible_topics,
+ size_t eligible_topic_cnt,
+ char *errstr, size_t errstr_size,
+ void *opaque) {
+ unsigned int ti;
+ int next = 0; /* Next member id */
+
+ /* Sort topics by name */
+ qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics),
+ rd_kafka_assignor_topic_cmp);
+
+ /* Sort members by name */
+ qsort(members, member_cnt, sizeof(*members),
+ rd_kafka_group_member_cmp);
+
+ for (ti = 0 ; ti < eligible_topic_cnt ; ti++) {
+ rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti];
+ int partition;
+
+ /* For each topic+partition, assign one member (in a cyclic
+ * iteration) per partition until the partitions are exhausted*/
+ for (partition = 0 ;
+ partition < eligible_topic->metadata->partition_cnt ;
+ partition++) {
+ rd_kafka_group_member_t *rkgm;
+
+ /* Scan through members until we find one with a
+ * subscription to this topic. */
+ while (!rd_kafka_group_member_find_subscription(
+ rk, &members[next],
+ eligible_topic->metadata->topic))
+ next++;
+
+ rkgm = &members[next];
+
+ rd_kafka_dbg(rk, CGRP, "ASSIGN",
+ "roundrobin: Member \"%s\": "
+ "assigned topic %s partition %d",
+ rkgm->rkgm_member_id->str,
+ eligible_topic->metadata->topic,
+ partition);
+
+ rd_kafka_topic_partition_list_add(
+ rkgm->rkgm_assignment,
+ eligible_topic->metadata->topic, partition);
+
+ next = (next+1) % rd_list_cnt(&eligible_topic->members);
+ }
+ }
+
+
+ return 0;
+}
+
+
+