You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:47 UTC
[19/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
deleted file mode 100644
index 2d023b4..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.c
+++ /dev/null
@@ -1,1848 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include <stdarg.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_request.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_metadata.h"
-#include "rdkafka_msgset.h"
-
-#include "rdrand.h"
-
-/**
- * Kafka protocol request and response handling.
- * All of this code runs in the broker thread and uses op queues for
- * propagating results back to the various sub-systems operating in
- * other threads.
- */
-
-
-/**
- * @brief Decide action(s) to take based on the returned error code.
- *
- * The optional var-args is a .._ACTION_END terminated list
- * of action,error tuples which overrides the general behaviour.
- * It is to be read as: for \p error, return \p action(s).
- */
-int rd_kafka_err_action (rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request, ...) {
- va_list ap;
- int actions = 0;
- int exp_act;
-
- /* Match explicitly defined error mappings first. */
- va_start(ap, request);
- while ((exp_act = va_arg(ap, int))) {
- int exp_err = va_arg(ap, int);
-
- if (err == exp_err)
- actions |= exp_act;
- }
- va_end(ap);
-
- if (err && rkb && request)
- rd_rkb_dbg(rkb, BROKER, "REQERR",
- "%sRequest failed: %s: explicit actions 0x%x",
- rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
- rd_kafka_err2str(err), actions);
-
- /* Explicit error match. */
- if (actions)
- return actions;
-
- /* Default error matching */
- switch (err)
- {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- break;
- case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
- case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
- case RD_KAFKA_RESP_ERR__WAIT_COORD:
- /* Request metadata information update */
- actions |= RD_KAFKA_ERR_ACTION_REFRESH;
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
- /* Broker-side request handling timeout */
- case RD_KAFKA_RESP_ERR__TRANSPORT:
- /* Broker connection down */
- actions |= RD_KAFKA_ERR_ACTION_RETRY;
- break;
- case RD_KAFKA_RESP_ERR__DESTROY:
- case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
- case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
- default:
- actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
- break;
- }
-
- return actions;
-}
-
-
-/**
- * Send GroupCoordinatorRequest
- */
-void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *cgrp,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1,
- RD_KAFKAP_STR_SIZE(cgrp));
- rd_kafka_buf_write_kstr(rkbuf, cgrp);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-/**
- * @brief Parses and handles Offset replies.
- *
- * Returns the parsed offsets (and errors) in \p offsets
- *
- * @returns 0 on success, else an error.
- */
-rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t
- *offsets) {
-
- const int log_decode_errors = LOG_ERR;
- int16_t ErrorCode = 0;
- int32_t TopicArrayCnt;
- int actions;
- int16_t api_version;
-
- if (err) {
- ErrorCode = err;
- goto err;
- }
-
- api_version = request->rkbuf_reqhdr.ApiVersion;
-
- /* NOTE:
- * Broker may return offsets in a different constellation than
- * in the original request .*/
-
- rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
- while (TopicArrayCnt-- > 0) {
- rd_kafkap_str_t ktopic;
- int32_t PartArrayCnt;
- char *topic_name;
-
- rd_kafka_buf_read_str(rkbuf, &ktopic);
- rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
- RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);
-
- while (PartArrayCnt-- > 0) {
- int32_t kpartition;
- int32_t OffsetArrayCnt;
- int64_t Offset = -1;
- rd_kafka_topic_partition_t *rktpar;
-
- rd_kafka_buf_read_i32(rkbuf, &kpartition);
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
- if (api_version == 1) {
- int64_t Timestamp;
- rd_kafka_buf_read_i64(rkbuf, &Timestamp);
- rd_kafka_buf_read_i64(rkbuf, &Offset);
- } else if (api_version == 0) {
- rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
- /* We only request one offset so just grab
- * the first one. */
- while (OffsetArrayCnt-- > 0)
- rd_kafka_buf_read_i64(rkbuf, &Offset);
- } else {
- rd_kafka_assert(NULL, !*"NOTREACHED");
- }
-
- rktpar = rd_kafka_topic_partition_list_add(
- offsets, topic_name, kpartition);
- rktpar->err = ErrorCode;
- rktpar->offset = Offset;
- }
- }
-
- goto done;
-
- err_parse:
- ErrorCode = rkbuf->rkbuf_err;
- err:
- actions = rd_kafka_err_action(
- rkb, ErrorCode, rkbuf, request,
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-
- RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
-
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- char tmp[256];
- /* Re-query for leader */
- rd_snprintf(tmp, sizeof(tmp),
- "OffsetRequest failed: %s",
- rd_kafka_err2str(ErrorCode));
- rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
- tmp);
- }
-
- if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
- if (rd_kafka_buf_retry(rkb, request))
- return RD_KAFKA_RESP_ERR__IN_PROGRESS;
- /* FALLTHRU */
- }
-
-done:
- return ErrorCode;
-}
-
-
-
-
-
-
-/**
- * Send OffsetRequest for toppar 'rktp'.
- */
-void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
- rd_kafka_topic_partition_list_t *partitions,
- int16_t api_version,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int i;
- size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
- const char *last_topic = "";
- int32_t topic_cnt = 0, part_cnt = 0;
-
- rd_kafka_topic_partition_list_sort_by_topic(partitions);
-
- rkbuf = rd_kafka_buf_new_request(
- rkb, RD_KAFKAP_Offset, 1,
- /* ReplicaId+TopicArrayCnt+Topic */
- 4+4+100+
- /* PartArrayCnt */
- 4 +
- /* partition_cnt * Partition+Time+MaxNumOffs */
- (partitions->cnt * (4+8+4)));
-
- /* ReplicaId */
- rd_kafka_buf_write_i32(rkbuf, -1);
- /* TopicArrayCnt */
- of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */
-
- for (i = 0 ; i < partitions->cnt ; i++) {
- const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
-
- if (strcmp(rktpar->topic, last_topic)) {
- /* Finish last topic, if any. */
- if (of_PartArrayCnt > 0)
- rd_kafka_buf_update_i32(rkbuf,
- of_PartArrayCnt,
- part_cnt);
-
- /* Topic */
- rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
- topic_cnt++;
- last_topic = rktpar->topic;
- /* New topic so reset partition count */
- part_cnt = 0;
-
- /* PartitionArrayCnt: updated later */
- of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
- }
-
- /* Partition */
- rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
- part_cnt++;
-
- /* Time/Offset */
- rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
-
- if (api_version == 0) {
- /* MaxNumberOfOffsets */
- rd_kafka_buf_write_i32(rkbuf, 1);
- }
- }
-
- if (of_PartArrayCnt > 0) {
- rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
- rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, api_version,
- api_version == 1 ?
- RD_KAFKA_FEATURE_OFFSET_TIME : 0);
-
- rd_rkb_dbg(rkb, TOPIC, "OFFSET",
- "OffsetRequest (v%hd, opv %d) "
- "for %"PRId32" topic(s) and %"PRId32" partition(s)",
- api_version, rkbuf->rkbuf_replyq.version,
- topic_cnt, partitions->cnt);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * Generic handler for OffsetFetch responses.
- * Offsets for included partitions will be propagated through the passed
- * 'offsets' list.
- *
- * \p update_toppar: update toppar's committed_offset
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t *offsets,
- int update_toppar) {
- const int log_decode_errors = LOG_ERR;
- int32_t TopicArrayCnt;
- int64_t offset = RD_KAFKA_OFFSET_INVALID;
- rd_kafkap_str_t metadata;
- int i;
- int actions;
- int seen_cnt = 0;
-
- if (err)
- goto err;
-
- /* Set default offset for all partitions. */
- rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0,
- RD_KAFKA_OFFSET_INVALID,
- 0 /* !is commit */);
-
- rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
- for (i = 0 ; i < TopicArrayCnt ; i++) {
- rd_kafkap_str_t topic;
- int32_t PartArrayCnt;
- char *topic_name;
- int j;
-
- rd_kafka_buf_read_str(rkbuf, &topic);
- rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
- RD_KAFKAP_STR_DUPA(&topic_name, &topic);
-
- for (j = 0 ; j < PartArrayCnt ; j++) {
- int32_t partition;
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_topic_partition_t *rktpar;
- int16_t err2;
-
- rd_kafka_buf_read_i32(rkbuf, &partition);
- rd_kafka_buf_read_i64(rkbuf, &offset);
- rd_kafka_buf_read_str(rkbuf, &metadata);
- rd_kafka_buf_read_i16(rkbuf, &err2);
-
- rktpar = rd_kafka_topic_partition_list_find(offsets,
- topic_name,
- partition);
- if (!rktpar) {
- rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
- "OffsetFetchResponse: %s [%"PRId32"] "
- "not found in local list: ignoring",
- topic_name, partition);
- continue;
- }
-
- seen_cnt++;
-
- if (!(s_rktp = rktpar->_private)) {
- s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk,
- topic_name,
- partition, 0, 0);
- /* May be NULL if topic is not locally known */
- rktpar->_private = s_rktp;
- }
-
- /* broker reports invalid offset as -1 */
- if (offset == -1)
- rktpar->offset = RD_KAFKA_OFFSET_INVALID;
- else
- rktpar->offset = offset;
- rktpar->err = err2;
-
- rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
- "OffsetFetchResponse: %s [%"PRId32"] offset %"PRId64,
- topic_name, partition, offset);
-
- if (update_toppar && !err2 && s_rktp) {
- rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
- /* Update toppar's committed offset */
- rd_kafka_toppar_lock(rktp);
- rktp->rktp_committed_offset = rktpar->offset;
- rd_kafka_toppar_unlock(rktp);
- }
-
-
- if (rktpar->metadata)
- rd_free(rktpar->metadata);
-
- if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
- rktpar->metadata = NULL;
- rktpar->metadata_size = 0;
- } else {
- rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
- rktpar->metadata_size =
- RD_KAFKAP_STR_LEN(&metadata);
- }
- }
- }
-
-
-err:
- rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
- "OffsetFetch for %d/%d partition(s) returned %s",
- seen_cnt,
- offsets ? offsets->cnt : -1, rd_kafka_err2str(err));
-
- actions = rd_kafka_err_action(rkb, err, rkbuf, request,
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- /* Re-query for coordinator */
- rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL,
- RD_KAFKA_NO_REPLYQ,
- RD_KAFKA_OP_COORD_QUERY, err);
- if (request) {
- /* Schedule a retry */
- rd_kafka_buf_keep(request);
- rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
- }
- }
-
- return err;
-
- err_parse:
- err = rkbuf->rkbuf_err;
- goto err;
-}
-
-
-
-/**
- * opaque=rko wrapper for handle_OffsetFetch.
- * rko->rko_payload MUST be a `rd_kafka_topic_partition_list_t *` which will
- * be filled in with fetch offsets.
- *
- * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
- *
- * Locality: cgrp's broker thread
- */
-void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_op_t *rko = opaque;
- rd_kafka_op_t *rko_reply;
- rd_kafka_topic_partition_list_t *offsets;
-
- RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);
-
- if (err == RD_KAFKA_RESP_ERR__DESTROY) {
- /* Termination, quick cleanup. */
- rd_kafka_op_destroy(rko);
- return;
- }
-
- offsets = rd_kafka_topic_partition_list_copy(
- rko->rko_u.offset_fetch.partitions);
-
- rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY);
- rko_reply->rko_err = err;
- rko_reply->rko_u.offset_fetch.partitions = offsets;
- rko_reply->rko_u.offset_fetch.do_free = 1;
- if (rko->rko_rktp)
- rko_reply->rko_rktp = rd_kafka_toppar_keep(
- rd_kafka_toppar_s2i(rko->rko_rktp));
-
- /* If all partitions already had usable offsets then there
- * was no request sent and thus no reply, the offsets list is
- * good to go. */
- if (rkbuf)
- rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf,
- request, offsets, 0);
-
- rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);
-
- rd_kafka_op_destroy(rko);
-}
-
-
-
-
-
-
-/**
- * Send OffsetFetchRequest for toppar.
- *
- * Any partition with a usable offset will be ignored, if all partitions
- * have usable offsets then no request is sent at all but an empty
- * reply is enqueued on the replyq.
- */
-void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
- int16_t api_version,
- rd_kafka_topic_partition_list_t *parts,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- size_t of_TopicCnt;
- int TopicCnt = 0;
- ssize_t of_PartCnt = -1;
- const char *last_topic = NULL;
- int PartCnt = 0;
- int tot_PartCnt = 0;
- int i;
-
- rkbuf = rd_kafka_buf_new_request(
- rkb, RD_KAFKAP_OffsetFetch, 1,
- RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) +
- 4 +
- (parts->cnt * 32));
-
-
- /* ConsumerGroup */
- rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id);
-
- /* Sort partitions by topic */
- rd_kafka_topic_partition_list_sort_by_topic(parts);
-
- /* TopicArrayCnt */
- of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
-
- for (i = 0 ; i < parts->cnt ; i++) {
- rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
-
- /* Ignore partitions with a usable offset. */
- if (rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
- rktpar->offset != RD_KAFKA_OFFSET_STORED) {
- rd_rkb_dbg(rkb, TOPIC, "OFFSET",
- "OffsetFetchRequest: skipping %s [%"PRId32"] "
- "with valid offset %s",
- rktpar->topic, rktpar->partition,
- rd_kafka_offset2str(rktpar->offset));
- continue;
- }
-
- if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
- /* New topic */
-
- /* Finalize previous PartitionCnt */
- if (PartCnt > 0)
- rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
- PartCnt);
-
- /* TopicName */
- rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
- /* PartitionCnt, finalized later */
- of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
- PartCnt = 0;
- last_topic = rktpar->topic;
- TopicCnt++;
- }
-
- /* Partition */
- rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
- PartCnt++;
- tot_PartCnt++;
- }
-
- /* Finalize previous PartitionCnt */
- if (PartCnt > 0)
- rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
-
- /* Finalize TopicCnt */
- rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
-
- rd_rkb_dbg(rkb, TOPIC, "OFFSET",
- "OffsetFetchRequest(v%d) for %d/%d partition(s)",
- api_version, tot_PartCnt, parts->cnt);
-
- if (tot_PartCnt == 0) {
- /* No partitions needs OffsetFetch, enqueue empty
- * response right away. */
- rkbuf->rkbuf_replyq = replyq;
- rkbuf->rkbuf_cb = resp_cb;
- rkbuf->rkbuf_opaque = opaque;
- rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
- return;
- }
-
-
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * @remark \p offsets may be NULL if \p err is set
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t *offsets) {
- const int log_decode_errors = LOG_ERR;
- int32_t TopicArrayCnt;
- int16_t ErrorCode = 0, last_ErrorCode = 0;
- int errcnt = 0;
- int i;
- int actions;
-
- if (err)
- goto err;
-
- rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
- for (i = 0 ; i < TopicArrayCnt ; i++) {
- rd_kafkap_str_t topic;
- char *topic_str;
- int32_t PartArrayCnt;
- int j;
-
- rd_kafka_buf_read_str(rkbuf, &topic);
- rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
- RD_KAFKAP_STR_DUPA(&topic_str, &topic);
-
- for (j = 0 ; j < PartArrayCnt ; j++) {
- int32_t partition;
- rd_kafka_topic_partition_t *rktpar;
-
- rd_kafka_buf_read_i32(rkbuf, &partition);
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
- rktpar = rd_kafka_topic_partition_list_find(
- offsets, topic_str, partition);
-
- if (!rktpar) {
- /* Received offset for topic/partition we didn't
- * ask for, this shouldn't really happen. */
- continue;
- }
-
- rktpar->err = ErrorCode;
- if (ErrorCode) {
- last_ErrorCode = ErrorCode;
- errcnt++;
- }
- }
- }
-
- /* If all partitions failed use error code
- * from last partition as the global error. */
- if (offsets && errcnt == offsets->cnt)
- err = last_ErrorCode;
- goto done;
-
- err_parse:
- err = rkbuf->rkbuf_err;
-
- err:
- actions = rd_kafka_err_action(
- rkb, err, rkbuf, request,
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
-
- RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
-
- RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
- RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
-
- RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
-
- RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
-
- RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
-
- RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
-
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) {
- /* Mark coordinator dead or re-query for coordinator.
- * ..dead() will trigger a re-query. */
- if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
- rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
- "OffsetCommitRequest failed");
- else
- rd_kafka_cgrp_coord_query(rk->rk_cgrp,
- "OffsetCommitRequest failed");
- }
- if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
- if (rd_kafka_buf_retry(rkb, request))
- return RD_KAFKA_RESP_ERR__IN_PROGRESS;
- /* FALLTHRU */
- }
-
- done:
- return err;
-}
-
-
-
-
-/**
- * @brief Send OffsetCommitRequest for a list of partitions.
- *
- * @returns 0 if none of the partitions in \p offsets had valid offsets,
- * else 1.
- */
-int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
- rd_kafka_cgrp_t *rkcg,
- int16_t api_version,
- rd_kafka_topic_partition_list_t *offsets,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque, const char *reason) {
- rd_kafka_buf_t *rkbuf;
- ssize_t of_TopicCnt = -1;
- int TopicCnt = 0;
- const char *last_topic = NULL;
- ssize_t of_PartCnt = -1;
- int PartCnt = 0;
- int tot_PartCnt = 0;
- int i;
-
- rd_kafka_assert(NULL, offsets != NULL);
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit,
- 1, 100 + (offsets->cnt * 128));
-
- /* ConsumerGroup */
- rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id);
-
- /* v1,v2 */
- if (api_version >= 1) {
- /* ConsumerGroupGenerationId */
- rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id);
- /* ConsumerId */
- rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id);
- /* v2: RetentionTime */
- if (api_version == 2)
- rd_kafka_buf_write_i64(rkbuf, -1);
- }
-
- /* Sort offsets by topic */
- rd_kafka_topic_partition_list_sort_by_topic(offsets);
-
- /* TopicArrayCnt: Will be updated when we know the number of topics. */
- of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-
- for (i = 0 ; i < offsets->cnt ; i++) {
- rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
-
- /* Skip partitions with invalid offset. */
- if (rktpar->offset < 0)
- continue;
-
- if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
- /* New topic */
-
- /* Finalize previous PartitionCnt */
- if (PartCnt > 0)
- rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
- PartCnt);
-
- /* TopicName */
- rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
- /* PartitionCnt, finalized later */
- of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
- PartCnt = 0;
- last_topic = rktpar->topic;
- TopicCnt++;
- }
-
- /* Partition */
- rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
- PartCnt++;
- tot_PartCnt++;
-
- /* Offset */
- rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
-
- /* v1: TimeStamp */
- if (api_version == 1)
- rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time
-
- /* Metadata */
- /* Java client 0.9.0 and broker <0.10.0 can't parse
- * Null metadata fields, so as a workaround we send an
- * empty string if it's Null. */
- if (!rktpar->metadata)
- rd_kafka_buf_write_str(rkbuf, "", 0);
- else
- rd_kafka_buf_write_str(rkbuf,
- rktpar->metadata,
- rktpar->metadata_size);
- }
-
- if (tot_PartCnt == 0) {
- /* No topic+partitions had valid offsets to commit. */
- rd_kafka_replyq_destroy(&replyq);
- rd_kafka_buf_destroy(rkbuf);
- return 0;
- }
-
- /* Finalize previous PartitionCnt */
- if (PartCnt > 0)
- rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
-
- /* Finalize TopicCnt */
- rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
-
- rd_rkb_dbg(rkb, TOPIC, "OFFSET",
- "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
- api_version, tot_PartCnt, offsets->cnt, reason);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return 1;
-
-}
-
-
-
-/**
- * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
- * enveloping buffer \p rkbuf.
- */
-static void rd_kafka_group_MemberState_consumer_write (
- rd_kafka_buf_t *env_rkbuf,
- const rd_kafka_group_member_t *rkgm) {
- rd_kafka_buf_t *rkbuf;
- int i;
- const char *last_topic = NULL;
- size_t of_TopicCnt;
- ssize_t of_PartCnt = -1;
- int TopicCnt = 0;
- int PartCnt = 0;
- rd_slice_t slice;
-
- rkbuf = rd_kafka_buf_new(1, 100);
- rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
- of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
- for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) {
- const rd_kafka_topic_partition_t *rktpar;
-
- rktpar = &rkgm->rkgm_assignment->elems[i];
-
- if (!last_topic || strcmp(last_topic,
- rktpar->topic)) {
- if (last_topic)
- /* Finalize previous PartitionCnt */
- rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
- PartCnt);
- rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
- /* Updated later */
- of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
- PartCnt = 0;
- last_topic = rktpar->topic;
- TopicCnt++;
- }
-
- rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
- PartCnt++;
- }
-
- if (of_PartCnt != -1)
- rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt);
- rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);
-
- rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);
-
- /* Get pointer to binary buffer */
- rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);
-
- /* Write binary buffer as Kafka Bytes to enveloping buffer. */
- rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
- rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);
-
- rd_kafka_buf_destroy(rkbuf);
-}
-
-/**
- * Send SyncGroupRequest
- */
-void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- int32_t generation_id,
- const rd_kafkap_str_t *member_id,
- const rd_kafka_group_member_t
- *assignments,
- int assignment_cnt,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int i;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup,
- 1,
- RD_KAFKAP_STR_SIZE(group_id) +
- 4 /* GenerationId */ +
- RD_KAFKAP_STR_SIZE(member_id) +
- 4 /* array size group_assignment */ +
- (assignment_cnt * 100/*guess*/));
- rd_kafka_buf_write_kstr(rkbuf, group_id);
- rd_kafka_buf_write_i32(rkbuf, generation_id);
- rd_kafka_buf_write_kstr(rkbuf, member_id);
- rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
-
- for (i = 0 ; i < assignment_cnt ; i++) {
- const rd_kafka_group_member_t *rkgm = &assignments[i];
-
- rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
- rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
- }
-
- /* This is a blocking request */
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
- rkbuf->rkbuf_ts_timeout = rd_clock() +
- (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000) +
- (3*1000*1000/* 3s grace period*/);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-/**
- * Handler for SyncGroup responses
- * opaque must be the cgrp handle.
- */
-void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_cgrp_t *rkcg = opaque;
- const int log_decode_errors = LOG_ERR;
- int16_t ErrorCode = 0;
- rd_kafkap_bytes_t MemberState = RD_ZERO_INIT;
- int actions;
-
- if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
- rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
- "SyncGroup response: discarding outdated request "
- "(now in join-state %s)",
- rd_kafka_cgrp_join_state_names[rkcg->
- rkcg_join_state]);
- return;
- }
-
- if (err) {
- ErrorCode = err;
- goto err;
- }
-
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
- rd_kafka_buf_read_bytes(rkbuf, &MemberState);
-
-err:
- actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- /* Re-query for coordinator */
- rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
- RD_KAFKA_OP_COORD_QUERY,
- ErrorCode);
- /* FALLTHRU */
- }
-
- rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
- "SyncGroup response: %s (%d bytes of MemberState data)",
- rd_kafka_err2str(ErrorCode),
- RD_KAFKAP_BYTES_LEN(&MemberState));
-
- if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
- return; /* Termination */
-
- rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState);
-
- return;
-
- err_parse:
- ErrorCode = rkbuf->rkbuf_err;
- goto err;
-}
-
-
-/**
- * Send JoinGroupRequest
- */
-void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- const rd_kafkap_str_t *member_id,
- const rd_kafkap_str_t *protocol_type,
- const rd_list_t *topics,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- rd_kafka_t *rk = rkb->rkb_rk;
- rd_kafka_assignor_t *rkas;
- int i;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup,
- 1,
- RD_KAFKAP_STR_SIZE(group_id) +
- 4 /* sessionTimeoutMs */ +
- RD_KAFKAP_STR_SIZE(member_id) +
- RD_KAFKAP_STR_SIZE(protocol_type) +
- 4 /* array count GroupProtocols */ +
- (rd_list_cnt(topics) * 100));
- rd_kafka_buf_write_kstr(rkbuf, group_id);
- rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
- rd_kafka_buf_write_kstr(rkbuf, member_id);
- rd_kafka_buf_write_kstr(rkbuf, protocol_type);
- rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
-
- RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
- rd_kafkap_bytes_t *member_metadata;
- if (!rkas->rkas_enabled)
- continue;
- rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
- member_metadata = rkas->rkas_get_metadata_cb(rkas, topics);
- rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
- rd_kafkap_bytes_destroy(member_metadata);
- }
-
- /* This is a blocking request */
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
- rkbuf->rkbuf_ts_timeout = rd_clock() +
- (rk->rk_conf.group_session_timeout_ms * 1000) +
- (3*1000*1000/* 3s grace period*/);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-
-
-/**
- * Send LeaveGroupRequest
- */
-void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- const rd_kafkap_str_t *member_id,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup,
- 1,
- RD_KAFKAP_STR_SIZE(group_id) +
- RD_KAFKAP_STR_SIZE(member_id));
- rd_kafka_buf_write_kstr(rkbuf, group_id);
- rd_kafka_buf_write_kstr(rkbuf, member_id);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * Handler for LeaveGroup responses
- * opaque must be the cgrp handle.
- */
-void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_cgrp_t *rkcg = opaque;
- const int log_decode_errors = LOG_ERR;
- int16_t ErrorCode = 0;
- int actions;
-
- if (err) {
- ErrorCode = err;
- goto err;
- }
-
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
-
-err:
- actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- /* Re-query for coordinator */
- rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
- RD_KAFKA_OP_COORD_QUERY, ErrorCode);
- /* Schedule a retry */
- rd_kafka_buf_keep(request);
- rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
- return;
- }
-
- if (ErrorCode)
- rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
- "LeaveGroup response: %s",
- rd_kafka_err2str(ErrorCode));
-
- err_parse:
- ErrorCode = rkbuf->rkbuf_err;
- goto err;
-}
-
-
-
-
-
-
-/**
- * Send HeartbeatRequest
- */
-void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- int32_t generation_id,
- const rd_kafkap_str_t *member_id,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
-
- rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
- "Heartbeat for group \"%s\" generation id %"PRId32,
- group_id->str, generation_id);
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat,
- 1,
- RD_KAFKAP_STR_SIZE(group_id) +
- 4 /* GenerationId */ +
- RD_KAFKAP_STR_SIZE(member_id));
-
- rd_kafka_buf_write_kstr(rkbuf, group_id);
- rd_kafka_buf_write_i32(rkbuf, generation_id);
- rd_kafka_buf_write_kstr(rkbuf, member_id);
-
- rkbuf->rkbuf_ts_timeout = rd_clock() +
- (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-/**
- * Send ListGroupsRequest
- */
-void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * Send DescribeGroupsRequest
- */
-void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
- const char **groups, int group_cnt,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups,
- 1, 32*group_cnt);
-
- rd_kafka_buf_write_i32(rkbuf, group_cnt);
- while (group_cnt-- > 0)
- rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-
-/**
- * @brief Generic handler for Metadata responses
- *
- * @locality rdkafka main thread
- */
-static void rd_kafka_handle_Metadata (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_op_t *rko = opaque; /* Possibly NULL */
- struct rd_kafka_metadata *md = NULL;
- const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
-
- rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
- thrd_is_current(rk->rk_thread));
-
- /* Avoid metadata updates when we're terminating. */
- if (rd_kafka_terminating(rkb->rkb_rk))
- err = RD_KAFKA_RESP_ERR__DESTROY;
-
- if (unlikely(err)) {
- if (err == RD_KAFKA_RESP_ERR__DESTROY) {
- /* Terminating */
- goto done;
- }
-
- /* FIXME: handle errors */
- rd_rkb_log(rkb, LOG_WARNING, "METADATA",
- "Metadata request failed: %s (%dms)",
- rd_kafka_err2str(err),
- (int)(request->rkbuf_ts_sent/1000));
- } else {
-
- if (!topics)
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "===== Received metadata: %s =====",
- request->rkbuf_u.Metadata.reason);
- else
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "===== Received metadata "
- "(for %d requested topics): %s =====",
- rd_list_cnt(topics),
- request->rkbuf_u.Metadata.reason);
-
- md = rd_kafka_parse_Metadata(rkb, request, rkbuf);
- if (!md) {
- if (rd_kafka_buf_retry(rkb, request))
- return;
- err = RD_KAFKA_RESP_ERR__BAD_MSG;
- }
- }
-
- if (rko && rko->rko_replyq.q) {
- /* Reply to metadata requester, passing on the metadata.
- * Reuse requesting rko for the reply. */
- rko->rko_err = err;
- rko->rko_u.metadata.md = md;
-
- rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
- rko = NULL;
- } else {
- if (md)
- rd_free(md);
- }
-
- done:
- if (rko)
- rd_kafka_op_destroy(rko);
-}
-
-
-
-/**
- * @brief Construct MetadataRequest (does not send)
- *
- * \p topics is a list of topic names (char *) to request.
- *
- * !topics - only request brokers (if supported by broker, else
- * all topics)
- * topics.cnt==0 - all topics in cluster are requested
- * topics.cnt >0 - only specified topics are requested
- *
- * @param reason - metadata request reason
- * @param rko - (optional) rko with replyq for handling response.
- * Specifying an rko forces a metadata request even if
- * there is already a matching one in-transit.
- *
- * If full metadata for all topics is requested (or all brokers, which
- * results in all-topics on older brokers) and there is already a full request
- * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
- * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
- * is sent regardless.
- */
-rd_kafka_resp_err_t
-rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
- const rd_list_t *topics, const char *reason,
- rd_kafka_op_t *rko) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
- int topic_cnt = topics ? rd_list_cnt(topics) : 0;
- int *full_incr = NULL;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb,
- RD_KAFKAP_Metadata,
- 0, 2,
- &features);
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1,
- 4 + (50 * topic_cnt));
-
- if (!reason)
- reason = "";
-
- rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);
-
- if (!topics && ApiVersion >= 1) {
- /* a null(0) array (in the protocol) represents no topics */
- rd_kafka_buf_write_i32(rkbuf, 0);
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Request metadata for brokers only: %s", reason);
- full_incr = &rkb->rkb_rk->rk_metadata_cache.
- rkmc_full_brokers_sent;
-
- } else {
- if (topic_cnt == 0 && !rko)
- full_incr = &rkb->rkb_rk->rk_metadata_cache.
- rkmc_full_topics_sent;
-
- if (topic_cnt == 0 && ApiVersion >= 1)
- rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/
- else
- rd_kafka_buf_write_i32(rkbuf, topic_cnt);
-
- if (topic_cnt == 0) {
- rkbuf->rkbuf_u.Metadata.all_topics = 1;
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Request metadata for all topics: "
- "%s", reason);
- } else
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Request metadata for %d topic(s): "
- "%s", topic_cnt, reason);
- }
-
- if (full_incr) {
- /* Avoid multiple outstanding full requests
- * (since they are redundant and side-effect-less).
- * Forced requests (app using metadata() API) are passed
- * through regardless. */
-
- mtx_lock(&rkb->rkb_rk->rk_metadata_cache.
- rkmc_full_lock);
- if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
- mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
- rkmc_full_lock);
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Skipping metadata request: %s: "
- "full request already in-transit",
- reason);
- rd_kafka_buf_destroy(rkbuf);
- return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
- }
-
- (*full_incr)++;
- mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
- rkmc_full_lock);
- rkbuf->rkbuf_u.Metadata.decr = full_incr;
- rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk->
- rk_metadata_cache.rkmc_full_lock;
- }
-
-
- if (topic_cnt > 0) {
- char *topic;
- int i;
-
- /* Maintain a copy of the topics list so we can purge
- * hints from the metadata cache on error. */
- rkbuf->rkbuf_u.Metadata.topics =
- rd_list_copy(topics, rd_list_string_copy, NULL);
-
- RD_LIST_FOREACH(topic, topics, i)
- rd_kafka_buf_write_str(rkbuf, topic, -1);
-
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- /* Metadata requests are part of the important control plane
- * and should go before other requests (Produce, Fetch, etc). */
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLASH;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
- /* Handle response thru rk_ops,
- * but forward parsed result to
- * rko's replyq when done. */
- RD_KAFKA_REPLYQ(rkb->rkb_rk->
- rk_ops, 0),
- rd_kafka_handle_Metadata, rko);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-
-
-
-
-
-/**
- * @brief Parses and handles ApiVersion reply.
- *
- * @param apis will be allocated, populated and sorted
- * with broker's supported APIs.
- * @param api_cnt will be set to the number of elements in \p *apis
-
- * @returns 0 on success, else an error.
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- struct rd_kafka_ApiVersion **apis,
- size_t *api_cnt) {
- const int log_decode_errors = LOG_ERR;
- int actions;
- int32_t ApiArrayCnt;
- int16_t ErrorCode;
- int i = 0;
-
- *apis = NULL;
-
- if (err)
- goto err;
-
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
- if ((err = ErrorCode))
- goto err;
-
- rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt);
- if (ApiArrayCnt > 1000)
- rd_kafka_buf_parse_fail(rkbuf,
- "ApiArrayCnt %"PRId32" out of range",
- ApiArrayCnt);
-
- rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
- "Broker API support:");
-
- *apis = malloc(sizeof(**apis) * ApiArrayCnt);
-
- for (i = 0 ; i < ApiArrayCnt ; i++) {
- struct rd_kafka_ApiVersion *api = &(*apis)[i];
-
- rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
- rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
- rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);
-
- rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
- " ApiKey %s (%hd) Versions %hd..%hd",
- rd_kafka_ApiKey2str(api->ApiKey),
- api->ApiKey, api->MinVer, api->MaxVer);
- }
-
- *api_cnt = ApiArrayCnt;
- qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);
-
- goto done;
-
- err_parse:
- err = rkbuf->rkbuf_err;
- err:
- if (*apis)
- rd_free(*apis);
-
- actions = rd_kafka_err_action(
- rkb, err, rkbuf, request,
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
- if (rd_kafka_buf_retry(rkb, request))
- return RD_KAFKA_RESP_ERR__IN_PROGRESS;
- /* FALLTHRU */
- }
-
-done:
- return err;
-}
-
-
-
-/**
- * Send ApiVersionRequest (KIP-35)
- */
-void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque, int flash_msg) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4);
- rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
- rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */
-
- /* Non-supporting brokers will tear down the connection when they
- * receive an unknown API request, so dont retry request on failure. */
- rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
-
- /* 0.9.0.x brokers will not close the connection on unsupported
- * API requests, so we minimize the timeout for the request.
- * This is a regression on the broker part. */
- rkbuf->rkbuf_ts_timeout = rd_clock() + (rkb->rkb_rk->rk_conf.api_version_request_timeout_ms * 1000);
-
- if (replyq.q)
- rd_kafka_broker_buf_enq_replyq(rkb,
- rkbuf, replyq, resp_cb, opaque);
- else /* in broker thread */
- rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
-}
-
-
-/**
- * Send SaslHandshakeRequest (KIP-43)
- */
-void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
- const char *mechanism,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque, int flash_msg) {
- rd_kafka_buf_t *rkbuf;
- int mechlen = (int)strlen(mechanism);
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake,
- 1, RD_KAFKAP_STR_SIZE0(mechlen));
- rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
- rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);
-
- /* Non-supporting brokers will tear down the conneciton when they
- * receive an unknown API request or where the SASL GSSAPI
- * token type is not recognized, so dont retry request on failure. */
- rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
-
- /* 0.9.0.x brokers will not close the connection on unsupported
- * API requests, so we minimize the timeout of the request.
- * This is a regression on the broker part. */
- if (!rkb->rkb_rk->rk_conf.api_version_request &&
- rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000)
- rkbuf->rkbuf_ts_timeout = rd_clock() + (10 * 1000 * 1000);
-
- if (replyq.q)
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq,
- resp_cb, opaque);
- else /* in broker thread */
- rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
-}
-
-
-
-
-/**
- * @brief Parses a Produce reply.
- * @returns 0 on success or an error code on failure.
- * @locality broker thread
- */
-static rd_kafka_resp_err_t
-rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb,
- rd_kafka_toppar_t *rktp,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- int64_t *offsetp,
- int64_t *timestampp) {
- int32_t TopicArrayCnt;
- int32_t PartitionArrayCnt;
- struct {
- int32_t Partition;
- int16_t ErrorCode;
- int64_t Offset;
- } hdr;
- const int log_decode_errors = LOG_ERR;
-
- rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
- if (TopicArrayCnt != 1)
- goto err;
-
- /* Since we only produce to one single topic+partition in each
- * request we assume that the reply only contains one topic+partition
- * and that it is the same that we requested.
- * If not the broker is buggy. */
- rd_kafka_buf_skip_str(rkbuf);
- rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
-
- if (PartitionArrayCnt != 1)
- goto err;
-
- rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
- rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
- rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
-
- *offsetp = hdr.Offset;
-
- *timestampp = -1;
- if (request->rkbuf_reqhdr.ApiVersion >= 2) {
- rd_kafka_buf_read_i64(rkbuf, timestampp);
- }
-
- if (request->rkbuf_reqhdr.ApiVersion >= 1) {
- int32_t Throttle_Time;
- rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
-
- rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
- Throttle_Time);
- }
-
-
- return hdr.ErrorCode;
-
- err_parse:
- return rkbuf->rkbuf_err;
- err:
- return RD_KAFKA_RESP_ERR__BAD_MSG;
-}
-
-
-/**
- * @brief Handle ProduceResponse
- *
- * @locality broker thread
- */
-static void rd_kafka_handle_Produce (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *reply,
- rd_kafka_buf_t *request,
- void *opaque) {
- shptr_rd_kafka_toppar_t *s_rktp = opaque; /* from ProduceRequest() */
- rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
- int64_t offset = RD_KAFKA_OFFSET_INVALID;
- int64_t timestamp = -1;
-
- /* Parse Produce reply (unless the request errored) */
- if (!err && reply)
- err = rd_kafka_handle_Produce_parse(rkb, rktp,
- reply, request,
- &offset, ×tamp);
-
-
- if (likely(!err)) {
- rd_rkb_dbg(rkb, MSG, "MSGSET",
- "%s [%"PRId32"]: MessageSet with %i message(s) "
- "delivered",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt));
-
- } else {
- /* Error */
- int actions;
-
- actions = rd_kafka_err_action(
- rkb, err, reply, request,
-
- RD_KAFKA_ERR_ACTION_REFRESH,
- RD_KAFKA_RESP_ERR__TRANSPORT,
-
- RD_KAFKA_ERR_ACTION_REFRESH,
- RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-
- RD_KAFKA_ERR_ACTION_END);
-
- rd_rkb_dbg(rkb, MSG, "MSGSET",
- "%s [%"PRId32"]: MessageSet with %i message(s) "
- "encountered error: %s (actions 0x%x)",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt),
- rd_kafka_err2str(err), actions);
-
- /* NOTE: REFRESH implies a later retry, which does NOT affect
- * the retry count since refresh-errors are considered
- * to be stale metadata rather than temporary errors.
- *
- * This is somewhat problematic since it may cause
- * duplicate messages even with retries=0 if the
- * ProduceRequest made it to the broker but only the
- * response was lost due to network connectivity issues.
- * That problem will be sorted when EoS is implemented.
- */
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- /* Request metadata information update */
- rd_kafka_toppar_leader_unavailable(rktp,
- "produce", err);
-
- /* Move messages (in the rkbuf) back to the partition's
- * queue head. They will be resent when a new leader
- * is delegated. */
- rd_kafka_toppar_insert_msgq(rktp, &request->rkbuf_msgq);
-
- /* No need for fallthru here since the request
- * no longer has any messages associated with it. */
- goto done;
- }
-
- if ((actions & RD_KAFKA_ERR_ACTION_RETRY) &&
- rd_kafka_buf_retry(rkb, request))
- return; /* Scheduled for retry */
-
- /* Refresh implies a later retry through other means */
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH)
- goto done;
-
- /* Translate request-level timeout error code
- * to message-level timeout error code. */
- if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
- err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
-
- /* Fatal errors: no message transmission retries */
- /* FALLTHRU */
- }
-
- /* Propagate assigned offset and timestamp back to app. */
- if (likely(offset != RD_KAFKA_OFFSET_INVALID)) {
- rd_kafka_msg_t *rkm;
- if (rktp->rktp_rkt->rkt_conf.produce_offset_report) {
- /* produce.offset.report: each message */
- TAILQ_FOREACH(rkm, &request->rkbuf_msgq.rkmq_msgs,
- rkm_link) {
- rkm->rkm_offset = offset++;
- if (timestamp != -1) {
- rkm->rkm_timestamp = timestamp;
- rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
- }
- }
- } else {
- /* Last message in each batch */
- rkm = TAILQ_LAST(&request->rkbuf_msgq.rkmq_msgs,
- rd_kafka_msg_head_s);
- rkm->rkm_offset = offset +
- rd_atomic32_get(&request->rkbuf_msgq.
- rkmq_msg_cnt) - 1;
- if (timestamp != -1) {
- rkm->rkm_timestamp = timestamp;
- rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
- }
- }
- }
-
- /* Enqueue messages for delivery report */
- rd_kafka_dr_msgq(rktp->rktp_rkt, &request->rkbuf_msgq, err);
-
- done:
- rd_kafka_toppar_destroy(s_rktp); /* from ProduceRequest() */
-}
-
-
-/**
- * @brief Send ProduceRequest for messages in toppar queue.
- *
- * @returns the number of messages included, or 0 on error / no messages.
- *
- * @locality broker thread
- */
-int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp) {
- rd_kafka_buf_t *rkbuf;
- rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
- size_t MessageSetSize = 0;
- int cnt;
-
- /**
- * Create ProduceRequest with as many messages from the toppar
- * transmit queue as possible.
- */
- rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
- &MessageSetSize);
- if (unlikely(!rkbuf))
- return 0;
-
- cnt = rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt);
- rd_dassert(cnt > 0);
-
- rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
- rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize);
-
- if (!rkt->rkt_conf.required_acks)
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
-
- /* Use timeout from first message. */
- rkbuf->rkbuf_ts_timeout =
- TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->rkm_ts_timeout;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
- RD_KAFKA_NO_REPLYQ,
- rd_kafka_handle_Produce,
- /* toppar ref for handle_Produce() */
- rd_kafka_toppar_keep(rktp));
-
- return cnt;
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
deleted file mode 100644
index a6a11e5..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_request.h
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdkafka_cgrp.h"
-#include "rdkafka_feature.h"
-
-
-#define RD_KAFKA_ERR_ACTION_PERMANENT 0x1 /* Permanent error */
-#define RD_KAFKA_ERR_ACTION_IGNORE 0x2 /* Error can be ignored */
-#define RD_KAFKA_ERR_ACTION_REFRESH 0x4 /* Refresh state (e.g., metadata) */
-#define RD_KAFKA_ERR_ACTION_RETRY 0x8 /* Retry request after backoff */
-#define RD_KAFKA_ERR_ACTION_INFORM 0x10 /* Inform application about err */
-#define RD_KAFKA_ERR_ACTION_SPECIAL 0x20 /* Special-purpose, depends on context */
-#define RD_KAFKA_ERR_ACTION_END 0 /* var-arg sentinel */
-
-int rd_kafka_err_action (rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request, ...);
-
-
-void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *cgrp,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t
- *offsets);
-
-void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
- rd_kafka_topic_partition_list_t *offsets,
- int16_t api_version,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t *offsets,
- int update_toppar);
-
-void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque);
-
-void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
- int16_t api_version,
- rd_kafka_topic_partition_list_t *parts,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t *offsets);
-int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
- rd_kafka_cgrp_t *rkcg,
- int16_t api_version,
- rd_kafka_topic_partition_list_t *offsets,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque, const char *reason);
-
-
-
-void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- const rd_kafkap_str_t *member_id,
- const rd_kafkap_str_t *protocol_type,
- const rd_list_t *topics,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-
-void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- const rd_kafkap_str_t *member_id,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque);
-
-void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- int32_t generation_id,
- const rd_kafkap_str_t *member_id,
- const rd_kafka_group_member_t
- *assignments,
- int assignment_cnt,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque);
-
-void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
- const char **groups, int group_cnt,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-
-void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- int32_t generation_id,
- const rd_kafkap_str_t *member_id,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque);
-
-rd_kafka_resp_err_t
-rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
- const rd_list_t *topics, const char *reason,
- rd_kafka_op_t *rko);
-
-rd_kafka_resp_err_t
-rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- struct rd_kafka_ApiVersion **apis,
- size_t *api_cnt);
-void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque, int flash_msg);
-
-void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
- const char *mechanism,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque, int flash_msg);
-
-int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
deleted file mode 100644
index 0482f88..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_roundrobin_assignor.c
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2015 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#include "rdkafka_int.h"
-#include "rdkafka_assignor.h"
-
-
-/**
- * Source: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
- *
- * The roundrobin assignor lays out all the available partitions and all the
- * available consumers. It then proceeds to do a roundrobin assignment from
- * partition to consumer. If the subscriptions of all consumer instances are
- * identical, then the partitions will be uniformly distributed. (i.e., the
- * partition ownership counts will be within a delta of exactly one across all
- * consumers.)
- *
- * For example, suppose there are two consumers C0 and C1, two topics t0 and
- * t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
- * t0p2, t1p0, t1p1, and t1p2.
- *
- * The assignment will be:
- * C0: [t0p0, t0p2, t1p1]
- * C1: [t0p1, t1p0, t1p2]
- */
-
-rd_kafka_resp_err_t
-rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
- const char *member_id,
- const char *protocol_name,
- const rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members,
- size_t member_cnt,
- rd_kafka_assignor_topic_t
- **eligible_topics,
- size_t eligible_topic_cnt,
- char *errstr, size_t errstr_size,
- void *opaque) {
- unsigned int ti;
- int next = 0; /* Next member id */
-
- /* Sort topics by name */
- qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics),
- rd_kafka_assignor_topic_cmp);
-
- /* Sort members by name */
- qsort(members, member_cnt, sizeof(*members),
- rd_kafka_group_member_cmp);
-
- for (ti = 0 ; ti < eligible_topic_cnt ; ti++) {
- rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti];
- int partition;
-
- /* For each topic+partition, assign one member (in a cyclic
- * iteration) per partition until the partitions are exhausted*/
- for (partition = 0 ;
- partition < eligible_topic->metadata->partition_cnt ;
- partition++) {
- rd_kafka_group_member_t *rkgm;
-
- /* Scan through members until we find one with a
- * subscription to this topic. */
- while (!rd_kafka_group_member_find_subscription(
- rk, &members[next],
- eligible_topic->metadata->topic))
- next++;
-
- rkgm = &members[next];
-
- rd_kafka_dbg(rk, CGRP, "ASSIGN",
- "roundrobin: Member \"%s\": "
- "assigned topic %s partition %d",
- rkgm->rkgm_member_id->str,
- eligible_topic->metadata->topic,
- partition);
-
- rd_kafka_topic_partition_list_add(
- rkgm->rkgm_assignment,
- eligible_topic->metadata->topic, partition);
-
- next = (next+1) % rd_list_cnt(&eligible_topic->members);
- }
- }
-
-
- return 0;
-}
-
-
-