You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:50 UTC
[20/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c
new file mode 100644
index 0000000..4f052ad
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c
@@ -0,0 +1,3204 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_request.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_assignor.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_metadata.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_interceptor.h"
+
+
+static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
+ const char *reason);
+static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
+ void *arg);
+static void rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment);
+static rd_kafka_resp_err_t rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg);
+static void
+rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t
+ *assignment, int usable_offsets,
+ int line);
+#define rd_kafka_cgrp_partitions_fetch_start(rkcg,assignment,usable_offsets) \
+ rd_kafka_cgrp_partitions_fetch_start0(rkcg,assignment,usable_offsets,\
+ __LINE__)
+static rd_kafka_op_res_t
+rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
+ void *opaque);
+
+static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
+ const char *reason);
+
+/**
+ * @returns true if cgrp can start partition fetchers, which is true if
+ * there is a subscription and the group is fully joined, or there
+ * is no subscription (in which case the join state is irrelevant)
+ * such as for an assign() without subscribe(). */
+#define RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) \
+ ((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED)
+
+/**
+ * @returns true if cgrp is waiting for a rebalance_cb to be handled by
+ * the application.
+ */
+#define RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) \
+ ((rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB || \
+ (rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
+
+
+const char *rd_kafka_cgrp_state_names[] = {
+ "init",
+ "term",
+ "query-coord",
+ "wait-coord",
+ "wait-broker",
+ "wait-broker-transport",
+ "up"
+};
+
+const char *rd_kafka_cgrp_join_state_names[] = {
+ "init",
+ "wait-join",
+ "wait-metadata",
+ "wait-sync",
+ "wait-unassign",
+ "wait-assign-rebalance_cb",
+ "wait-revoke-rebalance_cb",
+ "assigned",
+ "started"
+};
+
+
+static void rd_kafka_cgrp_set_state (rd_kafka_cgrp_t *rkcg, int state) {
+ if ((int)rkcg->rkcg_state == state)
+ return;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE",
+ "Group \"%.*s\" changed state %s -> %s "
+ "(v%d, join-state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_state_names[state],
+ rkcg->rkcg_version,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ rkcg->rkcg_state = state;
+ rkcg->rkcg_ts_statechange = rd_clock();
+
+ rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk);
+}
+
+
+void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state) {
+ if ((int)rkcg->rkcg_join_state == join_state)
+ return;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
+ "Group \"%.*s\" changed join state %s -> %s "
+ "(v%d, state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rd_kafka_cgrp_join_state_names[join_state],
+ rkcg->rkcg_version,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
+ rkcg->rkcg_join_state = join_state;
+}
+
+
+static RD_INLINE void
+rd_kafka_cgrp_version_new_barrier0 (rd_kafka_cgrp_t *rkcg,
+ const char *func, int line) {
+ rkcg->rkcg_version++;
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BARRIER",
+ "Group \"%.*s\": %s:%d: new version barrier v%d",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), func, line,
+ rkcg->rkcg_version);
+}
+
+#define rd_kafka_cgrp_version_new_barrier(rkcg) \
+ rd_kafka_cgrp_version_new_barrier0(rkcg, __FUNCTION__, __LINE__)
+
+
+void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_assignment);
+ rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
+ rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
+ rd_kafka_cgrp_set_member_id(rkcg, NULL);
+
+ rd_kafka_q_destroy(rkcg->rkcg_q);
+ rd_kafka_q_destroy(rkcg->rkcg_ops);
+ rd_kafka_q_destroy(rkcg->rkcg_wait_coord_q);
+ rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics));
+ rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
+ rd_list_destroy(&rkcg->rkcg_toppars);
+ rd_list_destroy(rkcg->rkcg_subscribed_topics);
+ rd_free(rkcg);
+}
+
+
+
+
+rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *client_id) {
+ rd_kafka_cgrp_t *rkcg;
+
+ rkcg = rd_calloc(1, sizeof(*rkcg));
+
+ rkcg->rkcg_rk = rk;
+ rkcg->rkcg_group_id = group_id;
+ rkcg->rkcg_client_id = client_id;
+ rkcg->rkcg_coord_id = -1;
+ rkcg->rkcg_generation_id = -1;
+ rkcg->rkcg_version = 1;
+
+ mtx_init(&rkcg->rkcg_lock, mtx_plain);
+ rkcg->rkcg_ops = rd_kafka_q_new(rk);
+ rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
+ rkcg->rkcg_ops->rkq_opaque = rkcg;
+ rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
+ rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
+ rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
+ rkcg->rkcg_q = rd_kafka_q_new(rk);
+
+ TAILQ_INIT(&rkcg->rkcg_topics);
+ rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+ rkcg->rkcg_subscribed_topics =
+ rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
+ rd_interval_init(&rkcg->rkcg_coord_query_intvl);
+ rd_interval_init(&rkcg->rkcg_heartbeat_intvl);
+ rd_interval_init(&rkcg->rkcg_join_intvl);
+ rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
+
+ if (RD_KAFKAP_STR_IS_NULL(group_id)) {
+ /* No group configured: Operate in legacy/SimpleConsumer mode */
+ rd_kafka_simple_consumer_add(rk);
+ /* no need look up group coordinator (no queries) */
+ rd_interval_disable(&rkcg->rkcg_coord_query_intvl);
+ }
+
+ if (rk->rk_conf.enable_auto_commit &&
+ rk->rk_conf.auto_commit_interval_ms > 0)
+ rd_kafka_timer_start(&rk->rk_timers,
+ &rkcg->rkcg_offset_commit_tmr,
+ rk->rk_conf.
+ auto_commit_interval_ms * 1000ll,
+ rd_kafka_cgrp_offset_commit_tmr_cb,
+ rkcg);
+
+ return rkcg;
+}
+
+
+
+/**
+ * Select a broker to handle this cgrp.
+ * It will prefer the coordinator broker but if that is not available
+ * any other broker that is Up will be used, and if that also fails
+ * uses the internal broker handle.
+ *
+ * NOTE: The returned rkb will have had its refcnt increased.
+ */
+static rd_kafka_broker_t *rd_kafka_cgrp_select_broker (rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_broker_t *rkb = NULL;
+
+
+ /* No need for a managing broker when cgrp is terminated */
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
+ return NULL;
+
+ rd_kafka_rdlock(rkcg->rkcg_rk);
+ /* Try to find the coordinator broker, if it isn't found
+ * move the cgrp to any other Up broker which will
+ * do further coord querying while waiting for the
+ * proper broker to materialise.
+ * If that also fails, go with the internal broker */
+ if (rkcg->rkcg_coord_id != -1)
+ rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk,
+ rkcg->rkcg_coord_id);
+ if (!rkb)
+ rkb = rd_kafka_broker_prefer(rkcg->rkcg_rk,
+ rkcg->rkcg_coord_id,
+ RD_KAFKA_BROKER_STATE_UP);
+ if (!rkb)
+ rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
+
+ rd_kafka_rdunlock(rkcg->rkcg_rk);
+
+ /* Dont change managing broker unless warranted.
+ * This means do not change to another non-coordinator broker
+ * while we are waiting for the proper coordinator broker to
+ * become available. */
+ if (rkb && rkcg->rkcg_rkb && rkb != rkcg->rkcg_rkb) {
+ int old_is_coord, new_is_coord;
+
+ rd_kafka_broker_lock(rkb);
+ new_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb);
+ rd_kafka_broker_unlock(rkb);
+
+ rd_kafka_broker_lock(rkcg->rkcg_rkb);
+ old_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,
+ rkcg->rkcg_rkb);
+ rd_kafka_broker_unlock(rkcg->rkcg_rkb);
+
+ if (!old_is_coord && !new_is_coord &&
+ rkcg->rkcg_rkb->rkb_source != RD_KAFKA_INTERNAL) {
+ rd_kafka_broker_destroy(rkb);
+ rkb = rkcg->rkcg_rkb;
+ rd_kafka_broker_keep(rkb);
+ }
+ }
+
+ return rkb;
+}
+
+
+
+
+/**
+ * Assign cgrp to broker.
+ *
+ * Locality: rdkafka main thread
+ */
+static void rd_kafka_cgrp_assign_broker (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_broker_t *rkb) {
+
+ rd_kafka_assert(NULL, rkcg->rkcg_rkb == NULL);
+
+ rkcg->rkcg_rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKASSIGN",
+ "Group \"%.*s\" management assigned to broker %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_broker_name(rkb));
+
+ /* Reset query interval to trigger an immediate
+ * coord query if required */
+ if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl))
+ rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
+
+ if (RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb))
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
+
+}
+
+
+/**
+ * Unassign cgrp from current broker.
+ *
+ * Locality: main thread
+ */
+static void rd_kafka_cgrp_unassign_broker (rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_broker_t *rkb = rkcg->rkcg_rkb;
+
+ rd_kafka_assert(NULL, rkcg->rkcg_rkb);
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKUNASSIGN",
+ "Group \"%.*s\" management unassigned "
+ "from broker handle %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_broker_name(rkb));
+
+ rkcg->rkcg_rkb = NULL;
+ rd_kafka_broker_destroy(rkb); /* from assign() */
+}
+
+
+/**
+ * Assign cgrp to a broker to handle.
+ * It will prefer the coordinator broker but if that is not available
+ * any other broker that is Up will be used, and if that also fails
+ * uses the internal broker handle.
+ *
+ * Returns 1 if the cgrp was reassigned, else 0.
+ */
+int rd_kafka_cgrp_reassign_broker (rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_broker_t *rkb;
+
+ rkb = rd_kafka_cgrp_select_broker(rkcg);
+
+ if (rkb == rkcg->rkcg_rkb) {
+ int is_coord = 0;
+
+ if (rkb) {
+ rd_kafka_broker_lock(rkb);
+ is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb);
+ rd_kafka_broker_unlock(rkb);
+ }
+ if (is_coord)
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
+ else
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+
+ if (rkb)
+ rd_kafka_broker_destroy(rkb);
+ return 0; /* No change */
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKREASSIGN",
+ "Group \"%.*s\" management reassigned from "
+ "broker %s to %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rkcg->rkcg_rkb ?
+ rd_kafka_broker_name(rkcg->rkcg_rkb) : "(none)",
+ rkb ? rd_kafka_broker_name(rkb) : "(none)");
+
+
+ if (rkcg->rkcg_rkb)
+ rd_kafka_cgrp_unassign_broker(rkcg);
+
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+
+ if (rkb) {
+ rd_kafka_cgrp_assign_broker(rkcg, rkb);
+ rd_kafka_broker_destroy(rkb); /* from select_broker() */
+ }
+
+ return 1;
+}
+
+
+/**
+ * Update the cgrp's coordinator and move it to that broker.
+ */
+void rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg, int32_t coord_id) {
+
+ if (rkcg->rkcg_coord_id == coord_id) {
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_WAIT_COORD)
+ rd_kafka_cgrp_set_state(rkcg,
+ RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+ return;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD",
+ "Group \"%.*s\" changing coordinator %"PRId32" -> %"PRId32,
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id,
+ coord_id);
+ rkcg->rkcg_coord_id = coord_id;
+
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+
+ rd_kafka_cgrp_reassign_broker(rkcg);
+}
+
+
+
+
+
+
+/**
+ * Handle GroupCoordinator response
+ */
+static void rd_kafka_cgrp_handle_GroupCoordinator (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int32_t CoordId;
+ rd_kafkap_str_t CoordHost = RD_ZERO_INIT;
+ int32_t CoordPort;
+ rd_kafka_cgrp_t *rkcg = opaque;
+ struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
+
+ if (likely(!(ErrorCode = err))) {
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ rd_kafka_buf_read_i32(rkbuf, &CoordId);
+ rd_kafka_buf_read_str(rkbuf, &CoordHost);
+ rd_kafka_buf_read_i32(rkbuf, &CoordPort);
+ }
+
+ if (ErrorCode)
+ goto err2;
+
+
+ mdb.id = CoordId;
+ RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost);
+ mdb.port = CoordPort;
+
+ rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
+ "Group \"%.*s\" coordinator is %s:%i id %"PRId32,
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ mdb.host, mdb.port, mdb.id);
+ rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb);
+
+ rd_kafka_cgrp_coord_update(rkcg, CoordId);
+ rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
+ return;
+
+err_parse: /* Parse error */
+ ErrorCode = rkbuf->rkbuf_err;
+ /* FALLTHRU */
+
+err2:
+ rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
+ "Group \"%.*s\" GroupCoordinator response error: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_err2str(ErrorCode));
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+ return;
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
+ rd_kafka_cgrp_coord_update(rkcg, -1);
+ else {
+ if (rkcg->rkcg_last_err != ErrorCode) {
+ rd_kafka_q_op_err(rkcg->rkcg_q,
+ RD_KAFKA_OP_CONSUMER_ERR,
+ ErrorCode, 0, NULL, 0,
+ "GroupCoordinator response error: %s",
+ rd_kafka_err2str(ErrorCode));
+
+ /* Suppress repeated errors */
+ rkcg->rkcg_last_err = ErrorCode;
+ }
+
+ /* Continue querying */
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+ }
+
+ rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
+}
+
+
+/**
+ * Query for coordinator.
+ * Ask any broker in state UP
+ *
+ * Locality: main thread
+ */
+void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
+ const char *reason) {
+ rd_kafka_broker_t *rkb;
+
+ rd_kafka_rdlock(rkcg->rkcg_rk);
+ rkb = rd_kafka_broker_any(rkcg->rkcg_rk, RD_KAFKA_BROKER_STATE_UP,
+ rd_kafka_broker_filter_can_group_query, NULL);
+ rd_kafka_rdunlock(rkcg->rkcg_rk);
+
+ if (!rkb) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY",
+ "Group \"%.*s\": "
+ "no broker available for coordinator query: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
+ return;
+ }
+
+ rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
+ "Group \"%.*s\": querying for coordinator: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
+
+ rd_kafka_GroupCoordinatorRequest(rkb, rkcg->rkcg_group_id,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_GroupCoordinator,
+ rkcg);
+
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
+
+ rd_kafka_broker_destroy(rkb);
+}
+
+/**
+ * @brief Mark the current coordinator as dead.
+ *
+ * @locality main thread
+ */
+void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
+ const char *reason) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD",
+ "Group \"%.*s\": marking the coordinator dead: %s: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_err2str(err), reason);
+
+ rd_kafka_cgrp_coord_update(rkcg, -1);
+
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+ rd_kafka_cgrp_coord_query(rkcg, reason);
+}
+
+
+
+static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg, int ignore_response) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
+ "Group \"%.*s\": leave",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
+ rd_kafka_LeaveGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id,
+ rkcg->rkcg_member_id,
+ ignore_response ?
+ RD_KAFKA_NO_REPLYQ :
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ ignore_response ? NULL :
+ rd_kafka_handle_LeaveGroup, rkcg);
+ else if (!ignore_response)
+ rd_kafka_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_rkb,
+ RD_KAFKA_RESP_ERR__WAIT_COORD,
+ NULL, NULL, rkcg);
+}
+
+
+/**
+ * Enqueue a rebalance op (if configured). 'partitions' is copied.
+ * This delegates the responsibility of assign() and unassign() to the
+ * application.
+ *
+ * Returns 1 if a rebalance op was enqueued, else 0.
+ * Returns 0 if there was no rebalance_cb or 'assignment' is NULL,
+ * in which case rd_kafka_cgrp_assign(rkcg,assignment) is called immediately.
+ */
+static int
+rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *assignment,
+ const char *reason) {
+ rd_kafka_op_t *rko;
+
+ rd_kafka_wrlock(rkcg->rkcg_rk);
+ rkcg->rkcg_c.ts_rebalance = rd_clock();
+ rkcg->rkcg_c.rebalance_cnt++;
+ rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+ /* Pause current partition set consumers until new assign() is called */
+ if (rkcg->rkcg_assignment)
+ rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 1,
+ RD_KAFKA_TOPPAR_F_LIB_PAUSE,
+ rkcg->rkcg_assignment);
+
+ if (!(rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE)
+ || !assignment) {
+ no_delegation:
+ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
+ rd_kafka_cgrp_assign(rkcg, assignment);
+ else
+ rd_kafka_cgrp_unassign(rkcg);
+ return 0;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+ "Group \"%s\": delegating %s of %d partition(s) "
+ "to application rebalance callback on queue %s: %s",
+ rkcg->rkcg_group_id->str,
+ err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
+ "revoke":"assign", assignment->cnt,
+ rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
+
+ rd_kafka_cgrp_set_join_state(
+ rkcg,
+ err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB :
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB);
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
+ rko->rko_err = err;
+ rko->rko_u.rebalance.partitions =
+ rd_kafka_topic_partition_list_copy(assignment);
+
+ if (rd_kafka_q_enq(rkcg->rkcg_q, rko) == 0) {
+ /* Queue disabled, handle assignment here. */
+ goto no_delegation;
+ }
+
+ return 1;
+}
+
+
+/**
+ * @brief Run group assignment.
+ */
+static void
+rd_kafka_cgrp_assignor_run (rd_kafka_cgrp_t *rkcg,
+ const char *protocol_name,
+ rd_kafka_resp_err_t err,
+ rd_kafka_metadata_t *metadata,
+ rd_kafka_group_member_t *members,
+ int member_cnt) {
+ char errstr[512];
+
+ if (err) {
+ rd_snprintf(errstr, sizeof(errstr),
+ "Failed to get cluster metadata: %s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+
+ *errstr = '\0';
+
+ /* Run assignor */
+ err = rd_kafka_assignor_run(rkcg, protocol_name, metadata,
+ members, member_cnt,
+ errstr, sizeof(errstr));
+
+ if (err) {
+ if (!*errstr)
+ rd_snprintf(errstr, sizeof(errstr), "%s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNOR",
+ "Group \"%s\": \"%s\" assignor run for %d member(s)",
+ rkcg->rkcg_group_id->str, protocol_name, member_cnt);
+
+ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
+
+ /* Respond to broker with assignment set or error */
+ rd_kafka_SyncGroupRequest(rkcg->rkcg_rkb,
+ rkcg->rkcg_group_id, rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id,
+ members, err ? 0 : member_cnt,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_handle_SyncGroup, rkcg);
+ return;
+
+err:
+ rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR",
+ "Group \"%s\": failed to run assignor \"%s\" for "
+ "%d member(s): %s",
+ rkcg->rkcg_group_id->str, protocol_name,
+ member_cnt, errstr);
+
+ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
+
+}
+
+
+
+/**
+ * @brief Op callback from handle_JoinGroup
+ */
+static rd_kafka_op_res_t
+rd_kafka_cgrp_assignor_handle_Metadata_op (rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+
+ if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
+ return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
+
+ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)
+ return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */
+
+ if (!rkcg->rkcg_group_leader.protocol) {
+ rd_kafka_dbg(rk, CGRP, "GRPLEADER",
+ "Group \"%.*s\": no longer leader: "
+ "not running assignor",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+ return RD_KAFKA_OP_RES_HANDLED;
+ }
+
+ rd_kafka_cgrp_assignor_run(rkcg,
+ rkcg->rkcg_group_leader.protocol,
+ rko->rko_err, rko->rko_u.metadata.md,
+ rkcg->rkcg_group_leader.members,
+ rkcg->rkcg_group_leader.member_cnt);
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType
+ *
+ * Protocol definition:
+ * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
+ *
+ * Returns 0 on success or -1 on error.
+ */
+static int
+rd_kafka_group_MemberMetadata_consumer_read (
+ rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm,
+ const rd_kafkap_str_t *GroupProtocol,
+ const rd_kafkap_bytes_t *MemberMetadata) {
+
+ rd_kafka_buf_t *rkbuf;
+ int16_t Version;
+ int32_t subscription_cnt;
+ rd_kafkap_bytes_t UserData;
+ const int log_decode_errors = LOG_ERR;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
+
+ /* Create a shadow-buffer pointing to the metadata to ease parsing. */
+ rkbuf = rd_kafka_buf_new_shadow(MemberMetadata->data,
+ RD_KAFKAP_BYTES_LEN(MemberMetadata),
+ NULL);
+
+ rd_kafka_buf_read_i16(rkbuf, &Version);
+ rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
+
+ if (subscription_cnt > 10000 || subscription_cnt <= 0)
+ goto err;
+
+ rkgm->rkgm_subscription =
+ rd_kafka_topic_partition_list_new(subscription_cnt);
+
+ while (subscription_cnt-- > 0) {
+ rd_kafkap_str_t Topic;
+ char *topic_name;
+ rd_kafka_buf_read_str(rkbuf, &Topic);
+ RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
+ rd_kafka_topic_partition_list_add(rkgm->rkgm_subscription,
+ topic_name,
+ RD_KAFKA_PARTITION_UA);
+ }
+
+ rd_kafka_buf_read_bytes(rkbuf, &UserData);
+ rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);
+
+ rd_kafka_buf_destroy(rkbuf);
+
+ return 0;
+
+ err_parse:
+ err = rkbuf->rkbuf_err;
+
+ err:
+ rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
+ "Failed to parse MemberMetadata for \"%.*s\": %s",
+ RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
+ rd_kafka_err2str(err));
+ if (rkgm->rkgm_subscription) {
+ rd_kafka_topic_partition_list_destroy(rkgm->
+ rkgm_subscription);
+ rkgm->rkgm_subscription = NULL;
+ }
+
+ rd_kafka_buf_destroy(rkbuf);
+ return -1;
+}
+
+
+
+
+/**
+ * @brief cgrp handler for JoinGroup responses
+ * opaque must be the cgrp handle.
+ *
+ * @locality cgrp broker thread
+ */
+static void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int32_t GenerationId;
+ rd_kafkap_str_t Protocol, LeaderId, MyMemberId;
+ int32_t member_cnt;
+ int actions;
+ int i_am_leader = 0;
+
+ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) {
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+ "JoinGroup response: discarding outdated request "
+ "(now in join-state %s)",
+ rd_kafka_cgrp_join_state_names[rkcg->
+ rkcg_join_state]);
+ return;
+ }
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ rd_kafka_buf_read_i32(rkbuf, &GenerationId);
+ rd_kafka_buf_read_str(rkbuf, &Protocol);
+ rd_kafka_buf_read_str(rkbuf, &LeaderId);
+ rd_kafka_buf_read_str(rkbuf, &MyMemberId);
+ rd_kafka_buf_read_i32(rkbuf, &member_cnt);
+
+ if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
+ /* Protocol not set, we will not be able to find
+ * a matching assignor so error out early. */
+ ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG;
+ }
+
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+ "JoinGroup response: GenerationId %"PRId32", "
+ "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, "
+ "%"PRId32" members in group: %s",
+ GenerationId,
+ RD_KAFKAP_STR_PR(&Protocol),
+ RD_KAFKAP_STR_PR(&LeaderId),
+ !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "",
+ RD_KAFKAP_STR_PR(&MyMemberId),
+ member_cnt,
+ ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)");
+
+ if (!ErrorCode) {
+ char *my_member_id;
+ RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
+ rkcg->rkcg_generation_id = GenerationId;
+ rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
+ i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId);
+ } else {
+ rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000*1000);
+ goto err;
+ }
+
+ if (i_am_leader) {
+ rd_kafka_group_member_t *members;
+ int i;
+ int sub_cnt = 0;
+ rd_list_t topics;
+ rd_kafka_op_t *rko;
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+ "Elected leader for group \"%s\" "
+ "with %"PRId32" member(s)",
+ rkcg->rkcg_group_id->str, member_cnt);
+
+ if (member_cnt > 100000) {
+ err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ goto err;
+ }
+
+ rd_list_init(&topics, member_cnt, rd_free);
+
+ members = rd_calloc(member_cnt, sizeof(*members));
+
+ for (i = 0 ; i < member_cnt ; i++) {
+ rd_kafkap_str_t MemberId;
+ rd_kafkap_bytes_t MemberMetadata;
+ rd_kafka_group_member_t *rkgm;
+
+ rd_kafka_buf_read_str(rkbuf, &MemberId);
+ rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata);
+
+ rkgm = &members[sub_cnt];
+ rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
+ rd_list_init(&rkgm->rkgm_eligible, 0, NULL);
+
+ if (rd_kafka_group_MemberMetadata_consumer_read(
+ rkb, rkgm, &Protocol, &MemberMetadata)) {
+ /* Failed to parse this member's metadata,
+ * ignore it. */
+ } else {
+ sub_cnt++;
+ rkgm->rkgm_assignment =
+ rd_kafka_topic_partition_list_new(
+ rkgm->rkgm_subscription->size);
+ rd_kafka_topic_partition_list_get_topic_names(
+ rkgm->rkgm_subscription, &topics,
+ 0/*dont include regex*/);
+ }
+
+ }
+
+ /* FIXME: What to do if parsing failed for some/all members?
+ * It is a sign of incompatibility. */
+
+
+ rd_kafka_cgrp_group_leader_reset(rkcg,
+ "JoinGroup response clean-up");
+
+ rkcg->rkcg_group_leader.protocol = RD_KAFKAP_STR_DUP(&Protocol);
+ rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL);
+ rkcg->rkcg_group_leader.members = members;
+ rkcg->rkcg_group_leader.member_cnt = sub_cnt;
+
+ rd_kafka_cgrp_set_join_state(
+ rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
+
+ /* The assignor will need metadata so fetch it asynchronously
+ * and run the assignor when we get a reply.
+ * Create a callback op that the generic metadata code
+ * will trigger when metadata has been parsed. */
+ rko = rd_kafka_op_new_cb(
+ rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
+ rd_kafka_cgrp_assignor_handle_Metadata_op);
+ rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
+
+ rd_kafka_MetadataRequest(rkb, &topics,
+ "partition assignor", rko);
+ rd_list_destroy(&topics);
+
+ } else {
+ rd_kafka_cgrp_set_join_state(
+ rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
+
+ rd_kafka_SyncGroupRequest(rkb, rkcg->rkcg_group_id,
+ rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id,
+ NULL, 0,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_handle_SyncGroup, rkcg);
+
+ }
+
+err:
+ actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+ RD_KAFKA_ERR_ACTION_IGNORE,
+ RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+ }
+
+ if (ErrorCode) {
+ if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+ return; /* Termination */
+
+ if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
+ rd_kafka_q_op_err(rkcg->rkcg_q,
+ RD_KAFKA_OP_CONSUMER_ERR,
+ ErrorCode, 0, NULL, 0,
+ "JoinGroup failed: %s",
+ rd_kafka_err2str(ErrorCode));
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_INIT);
+ }
+
+ return;
+
+ err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+/**
+ * @brief Check subscription against requested Metadata.
+ */
+static rd_kafka_op_res_t
+rd_kafka_cgrp_handle_Metadata_op (rd_kafka_t *rk, rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+
+ if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
+ return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
+
+ rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont rejoin*/);
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * @brief (Async) Refresh metadata (for cgrp's needs)
+ *
+ * @returns 1 if metadata refresh was requested, or 0 if metadata is
+ * up to date, or -1 if no broker is available for metadata requests.
+ *
+ * @locks none
+ * @locality rdkafka main thread
+ */
+static int rd_kafka_cgrp_metadata_refresh (rd_kafka_cgrp_t *rkcg,
+ int *metadata_agep,
+ const char *reason) {
+ rd_kafka_t *rk = rkcg->rkcg_rk;
+ rd_kafka_op_t *rko;
+ rd_list_t topics;
+ rd_kafka_resp_err_t err;
+
+ rd_list_init(&topics, 8, rd_free);
+
+ /* Insert all non-wildcard topics in cache. */
+ rd_kafka_metadata_cache_hint_rktparlist(rkcg->rkcg_rk,
+ rkcg->rkcg_subscription,
+ NULL, 0/*dont replace*/);
+
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
+ /* For wildcard subscriptions make sure the
+ * cached full metadata isn't too old. */
+ int metadata_age = -1;
+
+ if (rk->rk_ts_full_metadata)
+ metadata_age = (int)(rd_clock() -
+ rk->rk_ts_full_metadata)/1000;
+
+ *metadata_agep = metadata_age;
+
+ if (metadata_age != -1 &&
+ metadata_age <=
+ /* The +1000 is since metadata.refresh.interval.ms
+ * can be set to 0. */
+ rk->rk_conf.metadata_refresh_interval_ms + 1000) {
+ rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+ "CGRPMETADATA",
+ "%s: metadata for wildcard subscription "
+ "is up to date (%dms old)",
+ reason, *metadata_agep);
+ rd_list_destroy(&topics);
+ return 0; /* Up-to-date */
+ }
+
+ } else {
+ /* Check that all subscribed topics are in the cache. */
+ int r;
+
+ rd_kafka_topic_partition_list_get_topic_names(
+ rkcg->rkcg_subscription, &topics, 0/*no regexps*/);
+
+ rd_kafka_rdlock(rk);
+ r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics,
+ metadata_agep);
+ rd_kafka_rdunlock(rk);
+
+ if (r == rd_list_cnt(&topics)) {
+ rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+ "CGRPMETADATA",
+ "%s: metadata for subscription "
+ "is up to date (%dms old)", reason,
+ *metadata_agep);
+ rd_list_destroy(&topics);
+ return 0; /* Up-to-date and all topics exist. */
+ }
+
+ rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+ "CGRPMETADATA",
+ "%s: metadata for subscription "
+ "only available for %d/%d topics (%dms old)",
+ reason, r, rd_list_cnt(&topics), *metadata_agep);
+
+ }
+
+ /* Async request, result will be triggered from
+ * rd_kafka_parse_metadata(). */
+ rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
+ rd_kafka_cgrp_handle_Metadata_op);
+ rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
+
+ err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
+ reason, rko);
+ if (err) {
+ rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+ "CGRPMETADATA",
+ "%s: need to refresh metadata (%dms old) "
+ "but no usable brokers available: %s",
+ reason, *metadata_agep, rd_kafka_err2str(err));
+ rd_kafka_op_destroy(rko);
+ }
+
+ rd_list_destroy(&topics);
+
+ return err ? -1 : 1;
+}
+
+
+
+static void rd_kafka_cgrp_join (rd_kafka_cgrp_t *rkcg) {
+ int metadata_age;
+
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
+ rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT)
+ return;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
+ "Group \"%.*s\": join with %d (%d) subscribed topic(s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_list_cnt(rkcg->rkcg_subscribed_topics),
+ rkcg->rkcg_subscription->cnt);
+
+
+ /* See if we need to query metadata to continue:
+ * - if subscription contains wildcards:
+ * * query all topics in cluster
+ *
+ * - if subscription does not contain wildcards but
+ * some topics are missing from the local metadata cache:
+ * * query subscribed topics (all cached ones)
+ *
+ * - otherwise:
+ * * rely on topic metadata cache
+ */
+ /* We need up-to-date full metadata to continue,
+ * refresh metadata if necessary. */
+ if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
+ "consumer join") == 1) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
+ "Group \"%.*s\": "
+ "postponing join until up-to-date "
+ "metadata is available",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+ return; /* ^ async call */
+ }
+
+ if (rd_list_empty(rkcg->rkcg_subscribed_topics))
+ rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont join*/);
+
+ if (rd_list_empty(rkcg->rkcg_subscribed_topics)) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
+ "Group \"%.*s\": "
+ "no matching topics based on %dms old metadata: "
+ "next metadata refresh in %dms",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ metadata_age,
+ rkcg->rkcg_rk->rk_conf.
+ metadata_refresh_interval_ms - metadata_age);
+ return;
+ }
+
+ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN);
+ rd_kafka_JoinGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id,
+ rkcg->rkcg_member_id,
+ rkcg->rkcg_rk->rk_conf.group_protocol_type,
+ rkcg->rkcg_subscribed_topics,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_JoinGroup, rkcg);
+}
+
+/**
+ * Rejoin group on update to effective subscribed topics list
+ */
+static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg) {
+ /*
+ * Clean-up group leader duties, if any.
+ */
+ rd_kafka_cgrp_group_leader_reset(rkcg, "Group rejoin");
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "REJOIN",
+ "Group \"%.*s\" rejoining in join-state %s "
+ "with%s an assignment",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_assignment ? "" : "out");
+
+ /* Remove assignment (async), if any. If there is already an
+ * unassign in progress we dont need to bother. */
+ if (rkcg->rkcg_assignment) {
+ if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
+
+ rd_kafka_rebalance_op(
+ rkcg,
+ RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ rkcg->rkcg_assignment, "unsubscribe");
+ }
+ } else {
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_INIT);
+ rd_kafka_cgrp_join(rkcg);
+ }
+}
+
+/**
+ * Update the effective list of subscribed topics and trigger a rejoin
+ * if it changed.
+ *
+ * Set \p tinfos to NULL for clearing the list.
+ *
+ * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list
+ *
+ * @returns 1 on change, else 0.
+ *
+ * @remark Takes ownership of \p tinfos
+ */
+static int
+rd_kafka_cgrp_update_subscribed_topics (rd_kafka_cgrp_t *rkcg,
+ rd_list_t *tinfos) {
+ rd_kafka_topic_info_t *tinfo;
+ int i;
+
+ if (!tinfos) {
+ if (!rd_list_empty(rkcg->rkcg_subscribed_topics))
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
+ "Group \"%.*s\": "
+ "clearing subscribed topics list (%d)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_list_cnt(rkcg->rkcg_subscribed_topics));
+ tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
+
+ } else {
+ if (rd_list_cnt(tinfos) == 0)
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
+ "Group \"%.*s\": "
+ "no topics in metadata matched "
+ "subscription",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+ }
+
+ /* Sort for comparison */
+ rd_list_sort(tinfos, rd_kafka_topic_info_cmp);
+
+ /* Compare to existing to see if anything changed. */
+ if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos,
+ rd_kafka_topic_info_cmp)) {
+ /* No change */
+ rd_list_destroy(tinfos);
+ return 0;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "SUBSCRIPTION",
+ "Group \"%.*s\": effective subscription list changed "
+ "from %d to %d topic(s):",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_list_cnt(rkcg->rkcg_subscribed_topics),
+ rd_list_cnt(tinfos));
+
+ RD_LIST_FOREACH(tinfo, tinfos, i)
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA,
+ "SUBSCRIPTION",
+ " Topic %s with %d partition(s)",
+ tinfo->topic, tinfo->partition_cnt);
+
+ rd_list_destroy(rkcg->rkcg_subscribed_topics);
+
+ rkcg->rkcg_subscribed_topics = tinfos;
+
+ return 1;
+}
+
+
+
+/**
+ * @brief Handle heart Heartbeat response.
+ */
+void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int actions;
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+err:
+ actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+ RD_KAFKA_ERR_ACTION_END);
+
+ rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+ /* Schedule a retry */
+ if (ErrorCode != RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP) {
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+ rd_kafka_buf_keep(request);
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+ rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
+ }
+ return;
+ }
+
+ if (ErrorCode != 0 && ErrorCode != RD_KAFKA_RESP_ERR__DESTROY)
+ rd_kafka_cgrp_handle_heartbeat_error(rkcg, ErrorCode);
+
+ return;
+
+ err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+
+/**
+ * @brief Send Heartbeat
+ */
+static void rd_kafka_cgrp_heartbeat (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_broker_t *rkb) {
+ /* Skip heartbeat if we have one in transit */
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
+ return;
+
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+ rd_kafka_HeartbeatRequest(rkb, rkcg->rkcg_group_id,
+ rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_Heartbeat, NULL);
+}
+
+/**
+ * Cgrp is now terminated: decommission it and signal back to application.
+ */
+static void rd_kafka_cgrp_terminated (rd_kafka_cgrp_t *rkcg) {
+
+ rd_kafka_assert(NULL, rkcg->rkcg_wait_unassign_cnt == 0);
+ rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt == 0);
+ rd_kafka_assert(NULL, !(rkcg->rkcg_flags&RD_KAFKA_CGRP_F_WAIT_UNASSIGN));
+ rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM);
+
+ rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
+ &rkcg->rkcg_offset_commit_tmr, 1/*lock*/);
+
+ rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
+
+ /* Disable and empty ops queue since there will be no
+ * (broker) thread serving it anymore after the unassign_broker
+ * below.
+ * This prevents hang on destroy where responses are enqueued on rkcg_ops
+ * without anything serving the queue. */
+ rd_kafka_q_disable(rkcg->rkcg_ops);
+ rd_kafka_q_purge(rkcg->rkcg_ops);
+
+ if (rkcg->rkcg_rkb)
+ rd_kafka_cgrp_unassign_broker(rkcg);
+
+ if (rkcg->rkcg_reply_rko) {
+ /* Signal back to application. */
+ rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq,
+ rkcg->rkcg_reply_rko, 0);
+ rkcg->rkcg_reply_rko = NULL;
+ }
+}
+
+
+/**
+ * If a cgrp is terminating and all outstanding ops are now finished
+ * then progress to final termination and return 1.
+ * Else returns 0.
+ */
+static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg) {
+
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
+ return 1;
+
+ if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
+ return 0;
+
+ /* Check if wait-coord queue has timed out. */
+ if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
+ rkcg->rkcg_ts_terminate +
+ (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
+ rd_clock()) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
+ "Group \"%s\": timing out %d op(s) in "
+ "wait-for-coordinator queue",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
+ rd_kafka_q_disable(rkcg->rkcg_wait_coord_q);
+ if (rd_kafka_q_concat(rkcg->rkcg_ops,
+ rkcg->rkcg_wait_coord_q) == -1) {
+ /* ops queue shut down, purge coord queue */
+ rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
+ }
+ }
+
+ if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
+ rd_list_empty(&rkcg->rkcg_toppars) &&
+ rkcg->rkcg_wait_unassign_cnt == 0 &&
+ rkcg->rkcg_wait_commit_cnt == 0 &&
+ !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
+ /* Since we might be deep down in a 'rko' handler
+ * called from cgrp_op_serve() we cant call terminated()
+ * directly since it will decommission the rkcg_ops queue
+ * that might be locked by intermediate functions.
+ * Instead set the TERM state and let the cgrp terminate
+ * at its own discretion. */
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM);
+ return 1;
+ } else {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
+ "Group \"%s\": "
+ "waiting for %s%d toppar(s), %d unassignment(s), "
+ "%d commit(s)%s (state %s, join-state %s) "
+ "before terminating",
+ rkcg->rkcg_group_id->str,
+ RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) ?
+ "rebalance_cb, ": "",
+ rd_list_cnt(&rkcg->rkcg_toppars),
+ rkcg->rkcg_wait_unassign_cnt,
+ rkcg->rkcg_wait_commit_cnt,
+ (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
+ ", wait-unassign flag," : "",
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ return 0;
+ }
+}
+
+
+/**
+ * Add partition to this cgrp management
+ */
+static void rd_kafka_cgrp_partition_add (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_toppar_t *rktp) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP,"PARTADD",
+ "Group \"%s\": add %s [%"PRId32"]",
+ rkcg->rkcg_group_id->str,
+ rktp->rktp_rkt->rkt_topic->str,
+ rktp->rktp_partition);
+
+ rd_kafka_assert(rkcg->rkcg_rk, !rktp->rktp_s_for_cgrp);
+ rktp->rktp_s_for_cgrp = rd_kafka_toppar_keep(rktp);
+ rd_list_add(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
+}
+
+/**
+ * Remove partition from this cgrp management
+ */
+static void rd_kafka_cgrp_partition_del (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_toppar_t *rktp) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
+ "Group \"%s\": delete %s [%"PRId32"]",
+ rkcg->rkcg_group_id->str,
+ rktp->rktp_rkt->rkt_topic->str,
+ rktp->rktp_partition);
+ rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_s_for_cgrp);
+
+ rd_list_remove(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
+ rd_kafka_toppar_destroy(rktp->rktp_s_for_cgrp);
+ rktp->rktp_s_for_cgrp = NULL;
+
+ rd_kafka_cgrp_try_terminate(rkcg);
+}
+
+
+
+/**
+ * Reply for OffsetFetch from call below.
+ */
+static void rd_kafka_cgrp_offsets_fetch_response (
+ rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_topic_partition_list_t *offsets = opaque;
+ rd_kafka_cgrp_t *rkcg;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* Termination, quick cleanup. */
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return;
+ }
+
+ rkcg = rd_kafka_cgrp_get(rk);
+
+ if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version)) {
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return;
+ }
+
+ rd_kafka_topic_partition_list_log(rk, "OFFSETFETCH", offsets);
+ /* If all partitions already had usable offsets then there
+ * was no request sent and thus no reply, the offsets list is
+ * good to go. */
+ if (reply)
+ err = rd_kafka_handle_OffsetFetch(rk, rkb, err,
+ reply, request, offsets,
+ 1/* Update toppars */);
+ if (err) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
+ "Offset fetch error: %s",
+ rd_kafka_err2str(err));
+
+ if (err != RD_KAFKA_RESP_ERR__WAIT_COORD)
+ rd_kafka_q_op_err(rkcg->rkcg_q,
+ RD_KAFKA_OP_CONSUMER_ERR, err, 0,
+ NULL, 0,
+ "Failed to fetch offsets: %s",
+ rd_kafka_err2str(err));
+ } else {
+ if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
+ rd_kafka_cgrp_partitions_fetch_start(
+ rkcg, offsets, 1 /* usable offsets */);
+ else
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
+ "Group \"%.*s\": "
+ "ignoring Offset fetch response for "
+ "%d partition(s): in state %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ offsets ? offsets->cnt : -1,
+ rd_kafka_cgrp_join_state_names[
+ rkcg->rkcg_join_state]);
+ }
+
+ rd_kafka_topic_partition_list_destroy(offsets);
+}
+
+/**
+ * Fetch offsets for a list of partitions
+ */
+static void
+rd_kafka_cgrp_offsets_fetch (rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb,
+ rd_kafka_topic_partition_list_t *offsets) {
+ rd_kafka_topic_partition_list_t *use_offsets;
+
+ /* Make a copy of the offsets */
+ use_offsets = rd_kafka_topic_partition_list_copy(offsets);
+
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkb)
+ rd_kafka_cgrp_offsets_fetch_response(
+ rkcg->rkcg_rk, rkb, RD_KAFKA_RESP_ERR__WAIT_COORD,
+ NULL, NULL, use_offsets);
+ else {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
+ "Fetch %d offsets with v%d",
+ use_offsets->cnt, rkcg->rkcg_version);
+ rd_kafka_OffsetFetchRequest(
+ rkb, 1, offsets,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, rkcg->rkcg_version),
+ rd_kafka_cgrp_offsets_fetch_response,
+ use_offsets);
+ }
+
+}
+
+
+/**
+ * Start fetching all partitions in 'assignment' (async)
+ */
+static void
+rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t
+ *assignment, int usable_offsets,
+ int line) {
+ int i;
+
+ /* If waiting for offsets to commit we need that to finish first
+ * before starting fetchers (which might fetch those stored offsets).*/
+ if (rkcg->rkcg_wait_commit_cnt > 0) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
+ "Group \"%s\": not starting fetchers "
+ "for %d assigned partition(s) in join-state %s "
+ "(usable_offsets=%s, v%"PRId32", line %d): "
+ "waiting for %d commit(s)",
+ rkcg->rkcg_group_id->str, assignment->cnt,
+ rd_kafka_cgrp_join_state_names[rkcg->
+ rkcg_join_state],
+ usable_offsets ? "yes":"no",
+ rkcg->rkcg_version, line,
+ rkcg->rkcg_wait_commit_cnt);
+ return;
+ }
+
+ rd_kafka_cgrp_version_new_barrier(rkcg);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
+ "Group \"%s\": starting fetchers for %d assigned "
+ "partition(s) in join-state %s "
+ "(usable_offsets=%s, v%"PRId32", line %d)",
+ rkcg->rkcg_group_id->str, assignment->cnt,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ usable_offsets ? "yes":"no",
+ rkcg->rkcg_version, line);
+
+ rd_kafka_topic_partition_list_log(rkcg->rkcg_rk,
+ "FETCHSTART", assignment);
+
+ if (assignment->cnt == 0)
+ return;
+
+ /* Check if offsets are really unusable, this is to catch the
+ * case where the entire assignment has absolute offsets set which
+ * should make us skip offset lookups. */
+ if (!usable_offsets)
+ usable_offsets =
+ rd_kafka_topic_partition_list_count_abs_offsets(
+ assignment) == assignment->cnt;
+
+ if (!usable_offsets &&
+ rkcg->rkcg_rk->rk_conf.offset_store_method ==
+ RD_KAFKA_OFFSET_METHOD_BROKER) {
+
+ /* Fetch offsets for all assigned partitions */
+ rd_kafka_cgrp_offsets_fetch(rkcg, rkcg->rkcg_rkb, assignment);
+
+ } else {
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_STARTED);
+
+ for (i = 0 ; i < assignment->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar =
+ &assignment->elems[i];
+ shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
+ rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
+
+ if (!rktp->rktp_assigned) {
+ rktp->rktp_assigned = 1;
+ rkcg->rkcg_assigned_cnt++;
+
+ /* Start fetcher for partition and
+ * forward partition's fetchq to
+ * consumer groups queue. */
+ rd_kafka_toppar_op_fetch_start(
+ rktp, rktpar->offset,
+ rkcg->rkcg_q, RD_KAFKA_NO_REPLYQ);
+ } else {
+ int64_t offset;
+ /* Fetcher already started,
+ * just do seek to update offset */
+ rd_kafka_toppar_lock(rktp);
+ if (rktpar->offset < rktp->rktp_app_offset)
+ offset = rktp->rktp_app_offset;
+ else
+ offset = rktpar->offset;
+ rd_kafka_toppar_unlock(rktp);
+ rd_kafka_toppar_op_seek(rktp, offset,
+ RD_KAFKA_NO_REPLYQ);
+ }
+ }
+ }
+
+ rd_kafka_assert(NULL, rkcg->rkcg_assigned_cnt <=
+ (rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0));
+}
+
+
+
+
+
+/**
+ * @brief Defer offset commit (rko) until coordinator is available.
+ *
+ * @returns 1 if the rko was deferred or 0 if the defer queue is disabled
+ * or rko already deferred.
+ */
+static int rd_kafka_cgrp_defer_offset_commit (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_op_t *rko,
+ const char *reason) {
+
+ /* wait_coord_q is disabled session.timeout.ms after
+ * group close() has been initated. */
+ if (rko->rko_u.offset_commit.ts_timeout != 0 ||
+ !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
+ return 0;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
+ "Group \"%s\": "
+ "unable to OffsetCommit in state %s: %s: "
+ "coordinator (%s) is unavailable: "
+ "retrying later",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ reason,
+ rkcg->rkcg_rkb ?
+ rd_kafka_broker_name(rkcg->rkcg_rkb) :
+ "none");
+
+ rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
+ rko->rko_u.offset_commit.ts_timeout = rd_clock() +
+ (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms
+ * 1000);
+ rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
+
+ return 1;
+}
+
+
+/**
+ * @brief Handler of OffsetCommit response (after parsing).
+ * @remark \p offsets may be NULL if \p err is set
+ * @returns the number of partitions with errors encountered
+ */
+static int
+rd_kafka_cgrp_handle_OffsetCommit (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t
+ *offsets) {
+ int i;
+ int errcnt = 0;
+
+ if (!err) {
+ /* Update toppars' committed offset */
+ for (i = 0 ; i < offsets->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar =&offsets->elems[i];
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+
+ if (unlikely(rktpar->err)) {
+ rd_kafka_dbg(rkcg->rkcg_rk, TOPIC,
+ "OFFSET",
+ "OffsetCommit failed for "
+ "%s [%"PRId32"] at offset "
+ "%"PRId64": %s",
+ rktpar->topic, rktpar->partition,
+ rktpar->offset,
+ rd_kafka_err2str(rktpar->err));
+ errcnt++;
+ continue;
+ } else if (unlikely(rktpar->offset < 0))
+ continue;
+
+ s_rktp = rd_kafka_topic_partition_list_get_toppar(
+ rkcg->rkcg_rk, rktpar);
+ if (!s_rktp)
+ continue;
+
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+ rd_kafka_toppar_lock(rktp);
+ rktp->rktp_committed_offset = rktpar->offset;
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_kafka_toppar_destroy(s_rktp);
+ }
+ }
+
+ if (rd_kafka_cgrp_try_terminate(rkcg))
+ return errcnt; /* terminated */
+
+ if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
+ rd_kafka_cgrp_check_unassign_done(rkcg,
+ "OffsetCommit done");
+
+ return errcnt;
+}
+
+
+
+
+/**
+ * Handle OffsetCommitResponse
+ * Takes the original 'rko' as opaque argument.
+ * @remark \p rkb, rkbuf, and request may be NULL in a number of
+ * error cases (e.g., _NO_OFFSET, _WAIT_COORD)
+ */
+static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ rd_kafka_op_t *rko_orig = opaque;
+ rd_kafka_topic_partition_list_t *offsets =
+ rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */
+ int errcnt;
+ int offset_commit_cb_served = 0;
+
+ RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT);
+
+ if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version))
+ err = RD_KAFKA_RESP_ERR__DESTROY;
+
+ err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf,
+ request, offsets);
+
+ if (rkb)
+ rd_rkb_dbg(rkb, CGRP, "COMMIT",
+ "OffsetCommit for %d partition(s): %s: returned: %s",
+ offsets ? offsets->cnt : -1,
+ rko_orig->rko_u.offset_commit.reason,
+ rd_kafka_err2str(err));
+ else
+ rd_kafka_dbg(rk, CGRP, "COMMIT",
+ "OffsetCommit for %d partition(s): %s: returned: %s",
+ offsets ? offsets->cnt : -1,
+ rko_orig->rko_u.offset_commit.reason,
+ rd_kafka_err2str(err));
+
+ if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+ return; /* Retrying */
+ else if (err == RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP ||
+ err == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) {
+
+ /* future-proofing, see timeout_scan(). */
+ rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD);
+
+ if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig,
+ rd_kafka_err2str(err)))
+ return;
+
+ /* FALLTHRU and error out */
+ }
+
+ rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt > 0);
+ rkcg->rkcg_wait_commit_cnt--;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY ||
+ (err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
+ rko_orig->rko_u.offset_commit.silent_empty)) {
+ rd_kafka_op_destroy(rko_orig);
+ rd_kafka_cgrp_check_unassign_done(
+ rkcg,
+ err == RD_KAFKA_RESP_ERR__DESTROY ?
+ "OffsetCommit done (__DESTROY)" :
+ "OffsetCommit done (__NO_OFFSET)");
+ return;
+ }
+
+ /* Call on_commit interceptors */
+ if (err != RD_KAFKA_RESP_ERR__NO_OFFSET &&
+ err != RD_KAFKA_RESP_ERR__DESTROY &&
+ offsets && offsets->cnt > 0)
+ rd_kafka_interceptors_on_commit(rk, offsets, err);
+
+
+ /* If no special callback is set but a offset_commit_cb has
+ * been set in conf then post an event for the latter. */
+ if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
+ rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
+
+ rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
+
+ if (offsets)
+ rko_reply->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(offsets);
+
+ rko_reply->rko_u.offset_commit.cb =
+ rk->rk_conf.offset_commit_cb;
+ rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
+
+ rd_kafka_q_enq(rk->rk_rep, rko_reply);
+ offset_commit_cb_served++;
+ }
+
+
+ /* Enqueue reply to requester's queue, if any. */
+ if (rko_orig->rko_replyq.q) {
+ rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
+
+ rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
+
+ /* Copy offset & partitions & callbacks to reply op */
+ rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
+ if (offsets)
+ rko_reply->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(offsets);
+ if (rko_reply->rko_u.offset_commit.reason)
+ rko_reply->rko_u.offset_commit.reason =
+ rd_strdup(rko_reply->rko_u.offset_commit.reason);
+
+ rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0);
+ offset_commit_cb_served++;
+ }
+
+ errcnt = rd_kafka_cgrp_handle_OffsetCommit(rkcg, err, offsets);
+
+ if (!offset_commit_cb_served &&
+ err != RD_KAFKA_RESP_ERR_NO_ERROR &&
+ err != RD_KAFKA_RESP_ERR__NO_OFFSET) {
+ /* If there is no callback or handler for this (auto)
+ * commit then raise an error to the application (#1043) */
+ char tmp[512];
+
+ rd_kafka_topic_partition_list_str(
+ offsets, tmp, sizeof(tmp),
+ /*no partition-errs if a global error*/
+ RD_KAFKA_FMT_F_OFFSET |
+ (err ? 0 : RD_KAFKA_FMT_F_ONLY_ERR));
+
+ rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL",
+ "Offset commit (%s) failed "
+ "for %d/%d partition(s): "
+ "%s%s%s",
+ rko_orig->rko_u.offset_commit.reason,
+ err ? offsets->cnt : errcnt, offsets->cnt,
+ err ? rd_kafka_err2str(err) : "",
+ err ? ": " : "",
+ tmp);
+ }
+
+ rd_kafka_op_destroy(rko_orig);
+}
+
+
+static size_t rd_kafka_topic_partition_has_absolute_offset (
+ const rd_kafka_topic_partition_t *rktpar, void *opaque) {
+ return rktpar->offset >= 0 ? 1 : 0;
+}
+
+
+/**
+ * Commit a list of offsets.
+ * Reuse the orignating 'rko' for the async reply.
+ * 'rko->rko_payload' should either by NULL (to commit current assignment) or
+ * a proper topic_partition_list_t with offsets to commit.
+ * The offset list will be altered.
+ *
+ * \p rko...silent_empty: if there are no offsets to commit bail out
+ * silently without posting an op on the reply queue.
+ * \p set_offsets: set offsets in rko->rko_u.offset_commit.partitions
+ *
+ * \p op_version: cgrp's op version to use (or 0)
+ *
+ * Locality: cgrp thread
+ */
+static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_op_t *rko,
+ int set_offsets,
+ const char *reason,
+ int op_version) {
+ rd_kafka_topic_partition_list_t *offsets;
+ rd_kafka_resp_err_t err;
+ int valid_offsets = 0;
+
+ /* If offsets is NULL we shall use the current assignment. */
+ if (!rko->rko_u.offset_commit.partitions && rkcg->rkcg_assignment)
+ rko->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(
+ rkcg->rkcg_assignment);
+
+ offsets = rko->rko_u.offset_commit.partitions;
+
+ if (offsets) {
+ /* Set offsets to commits */
+ if (set_offsets)
+ rd_kafka_topic_partition_list_set_offsets(
+ rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1,
+ RD_KAFKA_OFFSET_INVALID/* def */,
+ 1 /* is commit */);
+
+ /* Check the number of valid offsets to commit. */
+ valid_offsets = (int)rd_kafka_topic_partition_list_sum(
+ offsets,
+ rd_kafka_topic_partition_has_absolute_offset, NULL);
+ }
+
+ if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
+ /* wait_commit_cnt has already been increased for
+ * reprocessed ops. */
+ rkcg->rkcg_wait_commit_cnt++;
+ }
+
+ if (!valid_offsets) {
+ /* No valid offsets */
+ err = RD_KAFKA_RESP_ERR__NO_OFFSET;
+ goto err;
+ }
+
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_rkb ||
+ rkcg->rkcg_rkb->rkb_source == RD_KAFKA_INTERNAL) {
+
+ if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason))
+ return;
+
+ err = RD_KAFKA_RESP_ERR__WAIT_COORD;
+
+ } else {
+ int r;
+
+ /* Send OffsetCommit */
+ r = rd_kafka_OffsetCommitRequest(
+ rkcg->rkcg_rkb, rkcg, 1, offsets,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, op_version),
+ rd_kafka_cgrp_op_handle_OffsetCommit, rko,
+ reason);
+
+ /* Must have valid offsets to commit if we get here */
+ rd_kafka_assert(NULL, r != 0);
+
+ return;
+ }
+
+
+
+ err:
+ /* Propagate error to whoever wanted offset committed. */
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
+ "OffsetCommit internal error: %s", rd_kafka_err2str(err));
+ rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL, err,
+ NULL, NULL, rko);
+}
+
+
+/**
+ * Commit offsets for all assigned partitions.
+ */
+static void
+rd_kafka_cgrp_assigned_offsets_commit (rd_kafka_cgrp_t *rkcg,
+ const rd_kafka_topic_partition_list_t
+ *offsets, const char *reason) {
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
+ rko->rko_u.offset_commit.reason = rd_strdup(reason);
+ if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT) {
+ rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0);
+ rko->rko_u.offset_commit.cb =
+ rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/
+ rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque;
+ }
+ /* NULL partitions means current assignment */
+ if (offsets)
+ rko->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(offsets);
+ rko->rko_u.offset_commit.silent_empty = 1;
+ rd_kafka_cgrp_offsets_commit(rkcg, rko, 1/* set offsets */, reason,
+ rkcg->rkcg_version);
+}
+
+
+/**
+ * auto.commit.interval.ms commit timer callback.
+ *
+ * Trigger a group offset commit.
+ *
+ * Locality: rdkafka main thread
+ */
+static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
+ void *arg) {
+ rd_kafka_cgrp_t *rkcg = arg;
+
+ rd_kafka_cgrp_assigned_offsets_commit(rkcg, NULL,
+ "cgrp auto commit timer");
+}
+
+
+
+
+/**
+ * Call when all unassign operations are done to transition to the next state
+ */
+static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg,
+ const char *reason) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+ "Group \"%s\": unassign done in state %s (join state %s): "
+ "%s: %s",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_assignment ?
+ "with new assignment" : "without new assignment",
+ reason);
+
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN) {
+ rd_kafka_cgrp_leave(rkcg, 1/*ignore response*/);
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN;
+ }
+
+ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) {
+ rd_kafka_cgrp_try_terminate(rkcg);
+ return;
+ }
+
+ if (rkcg->rkcg_assignment) {
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
+ if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
+ rd_kafka_cgrp_partitions_fetch_start(
+ rkcg, rkcg->rkcg_assignment, 0);
+ } else {
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_INIT);
+ }
+
+ rd_kafka_cgrp_try_terminate(rkcg);
+}
+
+
+/**
+ * Checks if the current unassignment is done and if so
+ * calls .._done().
+ * Else does nothing.
+ */
+static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
+ const char *reason) {
+ if (rkcg->rkcg_wait_unassign_cnt > 0 ||
+ rkcg->rkcg_assigned_cnt > 0 ||
+ rkcg->rkcg_wait_commit_cnt > 0 ||
+ rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+ "Unassign not done yet "
+ "(%d wait_unassign, %d assigned, %d wait commit"
+ "%s): %s",
+ rkcg->rkcg_wait_unassign_cnt,
+ rkcg->rkcg_assigned_cnt,
+ rkcg->rkcg_wait_commit_cnt,
+ (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
+ ", F_WAIT_UNASSIGN" : "", reason);
+ return;
+ }
+
+ rd_kafka_cgrp_unassign_done(rkcg, reason);
+}
+
+
+
+/**
+ * Remove existing assignment.
+ */
+static rd_kafka_resp_err_t
+rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) {
+ int i;
+ rd_kafka_topic_partition_list_t *old_assignment;
+
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN);
+
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
+ old_assignment = rkcg->rkcg_assignment;
+ if (!old_assignment) {
+ rd_kafka_cgrp_check_unassign_done(
+ rkcg, "unassign (no previous assignment)");
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+ rkcg->rkcg_assignment = NULL;
+
+ rd_kafka_cgrp_version_new_barrier(rkcg);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+ "Group \"%s\": unassigning %d partition(s) (v%"PRId32")",
+ rkcg->rkcg_group_id->str, old_assignment->cnt,
+ rkcg->rkcg_version);
+
+ if (rkcg->rkcg_rk->rk_conf.offset_store_method ==
+ RD_KAFKA_OFFSET_METHOD_BROKER &&
+ rkcg->rkcg_rk->rk_conf.enable_auto_commit) {
+ /* Commit all offsets for all assigned partitions to broker */
+ rd_kafka_cgrp_assigned_offsets_commit(rkcg, old_assignment,
+ "unassign");
+ }
+
+ for (i = 0 ; i < old_assignment->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar;
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+
+ rktpar = &old_assignment->elems[i];
+ s_rktp = rktpar->_private;
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+
+ if (rktp->rktp_assigned) {
+ rd_kafka_toppar_op_fetch_stop(
+ rktp, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0));
+ rkcg->rkcg_wait_unassign_cnt++;
+ }
+
+ rd_kafka_toppar_lock(rktp);
+ rd_kafka_toppar_desired_del(rktp);
+ rd_kafka_toppar_unlock(rktp);
+ }
+
+ /* Resume partition consumption. */
+ rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 0/*resume*/,
+ RD_KAFKA_TOPPAR_F_LIB_PAUSE,
+ old_assignment);
+
+ rd_kafka_topic_partition_list_destroy(old_assignment);
+
+ rd_kafka_cgrp_check_unassign_done(rkcg, "unassign");
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * Set new atomic partition assignment
+ * May update \p assignment but will not hold on to it.
+ */
+static void
+rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment) {
+ int i;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+ "Group \"%s\": new assignment of %d partition(s) "
+ "in join state %s",
+ rkcg->rkcg_group_id->str,
+ assignment ? assignment->cnt : 0,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ /* Get toppar object for each partition.
+ * This is to make sure the rktp stays alive during unassign(). */
+ for (i = 0 ; assignment && i < assignment->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar;
+ shptr_rd_kafka_toppar_t *s_rktp;
+
+ rktpar = &assignment->elems[i];
+
+ /* Use existing toppar if set */
+ if (rktpar->_private)
+ continue;
+
+ s_rktp = rd_kafka_toppar_get2(rkcg->rkcg_rk,
+ rktpar->topic,
+ rktpar->partition,
+ 0/*no-ua*/, 1/*create-on-miss*/);
+ if (s_rktp)
+ rktpar->_private = s_rktp;
+ }
+
+ rd_kafka_cgrp_version_new_barrier(rkcg);
+
+ rd_kafka_wrlock(rkcg->rkcg_rk);
+ rkcg->rkcg_c.assignment_size = assignment ? assignment->cnt : 0;
+ rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+
+ /* Remove existing assignment (async operation) */
+ if (rkcg->rkcg_assignment)
+ rd_kafka_cgrp_unassign(rkcg);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+ "Group \"%s\": assigning %d partition(s) in join state %s",
+ rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+
+ if (assignment) {
+ rkcg->rkcg_assignment =
+ rd_kafka_topic_partition_list_copy(assignment);
+
+ /* Mark partition(s) as desired */
+ for (i = 0 ; i < rkcg->rkcg_assignment->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar =
+ &rkcg->rkcg_assignment->elems[i];
+ shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
+ rd_kafka_toppar_t *rktp =
+ rd_kafka_toppar_s2i(s_rktp);
+ rd_kafka_toppar_lock(rktp);
+ rd_kafka_toppar_desired_add0(rktp);
+ rd_kafka_toppar_unlock(rktp);
+ }
+ }
+
+ if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
+ return;
+
+ rd_dassert(rkcg->rkcg_wait_unassign_cnt == 0);
+
+ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
+
+ if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) && rkcg->rkcg_assignment) {
+ /* No existing assignment that needs to be decommissioned,
+ * start partition fetchers right away */
+ rd_kafka_cgrp_partitions_fetch_start(
+ rkcg, rkcg->rkcg_assignment, 0);
+ }
+}
+
+
+
+
+/**
+ * Handle a rebalance-triggered partition assignment.
+ *
+ * If a rebalance_cb has been registered we enqueue an op for the app
+ * and let the app perform the actual assign() call.
+ * Otherwise we assign() directly from here.
+ *
+ * This provides the most flexibility, allowing the app to perform any
+ * operation it seem fit (e.g., offset writes or reads) before actually
+ * updating the assign():ment.
+ */
+static void
+rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment) {
+
+ rd_kafka_rebalance_op(rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ assignment, "new assignment");
+}
+
+
+/**
+ * Handle HeartbeatResponse errors.
+ *
+ * If an IllegalGeneration error code is returned in the
+ * HeartbeatResponse, it indicates that the co-ordinator has
+ * initiated a rebalance. The consumer then stops fetching data,
+ * commits offsets and sends a JoinGroupRequest to it's co-ordinator
+ * broker */
+void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err) {
+
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
+ "Group \"%s\" heartbeat error response in "
+ "state %s (join state %s, %d partition(s) assigned): %s",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0,
+ rd_kafka_err2str(err));
+
+ if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
+ "Heartbeat response: discarding outdated "
+ "request (now in join-state %s)",
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ return;
+ }
+
+ switch (err)
+ {
+ case RD_KAFKA_RESP_ERR__DESTROY:
+ /* quick cleanup */
+ break;
+ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
+ case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR__TRANSPORT:
+ /* Remain in joined state and keep querying for coordinator */
+ rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0);
+ break;
+
+ case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+ case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
+ case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
+ default:
+ /* Just revert to INIT state if join state is active. */
+ if (rkcg->rkcg_join_state <
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB ||
+ rkcg->rkcg_join_state ==
+ RD_KAFKA_CGRP
<TRUNCATED>