You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:50 UTC

[20/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c
new file mode 100644
index 0000000..4f052ad
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.c
@@ -0,0 +1,3204 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_request.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_assignor.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_metadata.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_interceptor.h"
+
+
+static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
+                                               const char *reason);
+static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
+                                                void *arg);
+static void rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
+				  rd_kafka_topic_partition_list_t *assignment);
+static rd_kafka_resp_err_t rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg);
+static void
+rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
+				       rd_kafka_topic_partition_list_t
+				       *assignment, int usable_offsets,
+				       int line);
+#define rd_kafka_cgrp_partitions_fetch_start(rkcg,assignment,usable_offsets) \
+	rd_kafka_cgrp_partitions_fetch_start0(rkcg,assignment,usable_offsets,\
+					      __LINE__)
+static rd_kafka_op_res_t
+rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
+                        rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
+                        void *opaque);
+
+static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
+                                              const char *reason);
+
+/**
+ * @returns true if cgrp can start partition fetchers, which is true if
+ *          there is a subscription and the group is fully joined, or there
+ *          is no subscription (in which case the join state is irrelevant)
+ *          such as for an assign() without subscribe(). */
+#define RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) \
+	((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED)
+
+/**
+ * @returns true if cgrp is waiting for a rebalance_cb to be handled by
+ *          the application.
+ */
+#define RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg)			\
+	((rkcg)->rkcg_join_state ==				\
+	 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB ||	\
+	 (rkcg)->rkcg_join_state ==				\
+	 RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
+
+
+const char *rd_kafka_cgrp_state_names[] = {
+        "init",
+        "term",
+        "query-coord",
+        "wait-coord",
+        "wait-broker",
+        "wait-broker-transport",
+        "up"
+};
+
+const char *rd_kafka_cgrp_join_state_names[] = {
+        "init",
+        "wait-join",
+        "wait-metadata",
+        "wait-sync",
+        "wait-unassign",
+        "wait-assign-rebalance_cb",
+	"wait-revoke-rebalance_cb",
+        "assigned",
+	"started"
+};
+
+
+static void rd_kafka_cgrp_set_state (rd_kafka_cgrp_t *rkcg, int state) {
+        if ((int)rkcg->rkcg_state == state)
+                return;
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE",
+                     "Group \"%.*s\" changed state %s -> %s "
+                     "(v%d, join-state %s)",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+                     rd_kafka_cgrp_state_names[state],
+		     rkcg->rkcg_version,
+                     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+        rkcg->rkcg_state = state;
+        rkcg->rkcg_ts_statechange = rd_clock();
+
+	rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk);
+}
+
+
+void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state) {
+        if ((int)rkcg->rkcg_join_state == join_state)
+                return;
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
+                     "Group \"%.*s\" changed join state %s -> %s "
+                     "(v%d, state %s)",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+                     rd_kafka_cgrp_join_state_names[join_state],
+		     rkcg->rkcg_version,
+                     rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
+        rkcg->rkcg_join_state = join_state;
+}
+
+
+static RD_INLINE void
+rd_kafka_cgrp_version_new_barrier0 (rd_kafka_cgrp_t *rkcg,
+				    const char *func, int line) {
+	rkcg->rkcg_version++;
+	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BARRIER",
+		     "Group \"%.*s\": %s:%d: new version barrier v%d",
+		     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), func, line,
+		     rkcg->rkcg_version);
+}
+
+#define rd_kafka_cgrp_version_new_barrier(rkcg) \
+	rd_kafka_cgrp_version_new_barrier0(rkcg, __FUNCTION__, __LINE__)
+
+
+void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {
+        rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_assignment);
+        rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
+        rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
+        rd_kafka_cgrp_set_member_id(rkcg, NULL);
+
+        rd_kafka_q_destroy(rkcg->rkcg_q);
+        rd_kafka_q_destroy(rkcg->rkcg_ops);
+	rd_kafka_q_destroy(rkcg->rkcg_wait_coord_q);
+        rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics));
+        rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
+        rd_list_destroy(&rkcg->rkcg_toppars);
+        rd_list_destroy(rkcg->rkcg_subscribed_topics);
+        rd_free(rkcg);
+}
+
+
+
+
+rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
+                                    const rd_kafkap_str_t *group_id,
+                                    const rd_kafkap_str_t *client_id) {
+        rd_kafka_cgrp_t *rkcg;
+
+        rkcg = rd_calloc(1, sizeof(*rkcg));
+
+        rkcg->rkcg_rk = rk;
+        rkcg->rkcg_group_id = group_id;
+        rkcg->rkcg_client_id = client_id;
+        rkcg->rkcg_coord_id = -1;
+        rkcg->rkcg_generation_id = -1;
+	rkcg->rkcg_version = 1;
+
+        mtx_init(&rkcg->rkcg_lock, mtx_plain);
+        rkcg->rkcg_ops = rd_kafka_q_new(rk);
+        rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
+        rkcg->rkcg_ops->rkq_opaque = rkcg;
+        rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
+        rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
+        rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
+        rkcg->rkcg_q = rd_kafka_q_new(rk);
+
+        TAILQ_INIT(&rkcg->rkcg_topics);
+        rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
+        rd_kafka_cgrp_set_member_id(rkcg, "");
+        rkcg->rkcg_subscribed_topics =
+                rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
+        rd_interval_init(&rkcg->rkcg_coord_query_intvl);
+        rd_interval_init(&rkcg->rkcg_heartbeat_intvl);
+        rd_interval_init(&rkcg->rkcg_join_intvl);
+        rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
+
+        if (RD_KAFKAP_STR_IS_NULL(group_id)) {
+                /* No group configured: Operate in legacy/SimpleConsumer mode */
+                rd_kafka_simple_consumer_add(rk);
+                /* no need look up group coordinator (no queries) */
+                rd_interval_disable(&rkcg->rkcg_coord_query_intvl);
+        }
+
+        if (rk->rk_conf.enable_auto_commit &&
+            rk->rk_conf.auto_commit_interval_ms > 0)
+                rd_kafka_timer_start(&rk->rk_timers,
+                                     &rkcg->rkcg_offset_commit_tmr,
+                                     rk->rk_conf.
+				     auto_commit_interval_ms * 1000ll,
+                                     rd_kafka_cgrp_offset_commit_tmr_cb,
+                                     rkcg);
+
+        return rkcg;
+}
+
+
+
+/**
+ * Select a broker to handle this cgrp.
+ * It will prefer the coordinator broker but if that is not available
+ * any other broker that is Up will be used, and if that also fails
+ * uses the internal broker handle.
+ *
+ * NOTE: The returned rkb will have had its refcnt increased.
+ */
+static rd_kafka_broker_t *rd_kafka_cgrp_select_broker (rd_kafka_cgrp_t *rkcg) {
+        rd_kafka_broker_t *rkb = NULL;
+
+
+        /* No need for a managing broker when cgrp is terminated */
+        if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
+                return NULL;
+
+        rd_kafka_rdlock(rkcg->rkcg_rk);
+        /* Try to find the coordinator broker, if it isn't found
+         * move the cgrp to any other Up broker which will
+         * do further coord querying while waiting for the
+         * proper broker to materialise.
+         * If that also fails, go with the internal broker */
+        if (rkcg->rkcg_coord_id != -1)
+                rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk,
+                                                     rkcg->rkcg_coord_id);
+        if (!rkb)
+                rkb = rd_kafka_broker_prefer(rkcg->rkcg_rk,
+                                             rkcg->rkcg_coord_id,
+                                             RD_KAFKA_BROKER_STATE_UP);
+        if (!rkb)
+                rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
+
+        rd_kafka_rdunlock(rkcg->rkcg_rk);
+
+        /* Dont change managing broker unless warranted.
+         * This means do not change to another non-coordinator broker
+         * while we are waiting for the proper coordinator broker to
+         * become available. */
+        if (rkb && rkcg->rkcg_rkb && rkb != rkcg->rkcg_rkb) {
+		int old_is_coord, new_is_coord;
+
+		rd_kafka_broker_lock(rkb);
+		new_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb);
+		rd_kafka_broker_unlock(rkb);
+
+		rd_kafka_broker_lock(rkcg->rkcg_rkb);
+		old_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,
+							     rkcg->rkcg_rkb);
+		rd_kafka_broker_unlock(rkcg->rkcg_rkb);
+
+		if (!old_is_coord && !new_is_coord &&
+		    rkcg->rkcg_rkb->rkb_source != RD_KAFKA_INTERNAL) {
+			rd_kafka_broker_destroy(rkb);
+			rkb = rkcg->rkcg_rkb;
+			rd_kafka_broker_keep(rkb);
+		}
+        }
+
+        return rkb;
+}
+
+
+
+
+/**
+ * Assign cgrp to broker.
+ *
+ * Locality: rdkafka main thread
+ */
+static void rd_kafka_cgrp_assign_broker (rd_kafka_cgrp_t *rkcg,
+					 rd_kafka_broker_t *rkb) {
+
+	rd_kafka_assert(NULL, rkcg->rkcg_rkb == NULL);
+
+	rkcg->rkcg_rkb = rkb;
+	rd_kafka_broker_keep(rkb);
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKASSIGN",
+                     "Group \"%.*s\" management assigned to broker %s",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rd_kafka_broker_name(rkb));
+
+        /* Reset query interval to trigger an immediate
+         * coord query if required */
+        if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl))
+                rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
+
+        if (RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb))
+                rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
+
+}
+
+
+/**
+ * Unassign cgrp from current broker.
+ *
+ * Locality: main thread
+ */
+static void rd_kafka_cgrp_unassign_broker (rd_kafka_cgrp_t *rkcg) {
+        rd_kafka_broker_t *rkb = rkcg->rkcg_rkb;
+
+	rd_kafka_assert(NULL, rkcg->rkcg_rkb);
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKUNASSIGN",
+                     "Group \"%.*s\" management unassigned "
+                     "from broker handle %s",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rd_kafka_broker_name(rkb));
+
+        rkcg->rkcg_rkb = NULL;
+        rd_kafka_broker_destroy(rkb); /* from assign() */
+}
+
+
+/**
+ * Assign cgrp to a broker to handle.
+ * It will prefer the coordinator broker but if that is not available
+ * any other broker that is Up will be used, and if that also fails
+ * uses the internal broker handle.
+ *
+ * Returns 1 if the cgrp was reassigned, else 0.
+ */
+int rd_kafka_cgrp_reassign_broker (rd_kafka_cgrp_t *rkcg) {
+        rd_kafka_broker_t *rkb;
+
+        rkb = rd_kafka_cgrp_select_broker(rkcg);
+
+        if (rkb == rkcg->rkcg_rkb) {
+		int is_coord = 0;
+
+		if (rkb) {
+			rd_kafka_broker_lock(rkb);
+			is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb);
+			rd_kafka_broker_unlock(rkb);
+		}
+		if (is_coord)
+                        rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
+                else
+                        rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+
+                if (rkb)
+                        rd_kafka_broker_destroy(rkb);
+                return 0; /* No change */
+        }
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKREASSIGN",
+                     "Group \"%.*s\" management reassigned from "
+                     "broker %s to %s",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rkcg->rkcg_rkb ?
+                     rd_kafka_broker_name(rkcg->rkcg_rkb) : "(none)",
+                     rkb ? rd_kafka_broker_name(rkb) : "(none)");
+
+
+        if (rkcg->rkcg_rkb)
+                rd_kafka_cgrp_unassign_broker(rkcg);
+
+        rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+
+        if (rkb) {
+		rd_kafka_cgrp_assign_broker(rkcg, rkb);
+		rd_kafka_broker_destroy(rkb); /* from select_broker() */
+	}
+
+        return 1;
+}
+
+
+/**
+ * Update the cgrp's coordinator and move it to that broker.
+ */
+void rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg, int32_t coord_id) {
+
+        if (rkcg->rkcg_coord_id == coord_id) {
+		if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_WAIT_COORD)
+			rd_kafka_cgrp_set_state(rkcg,
+						RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+                return;
+	}
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD",
+                     "Group \"%.*s\" changing coordinator %"PRId32" -> %"PRId32,
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id,
+                     coord_id);
+        rkcg->rkcg_coord_id = coord_id;
+
+        rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+
+        rd_kafka_cgrp_reassign_broker(rkcg);
+}
+
+
+
+
+
+
+/**
+ * Handle GroupCoordinator response
+ */
+static void rd_kafka_cgrp_handle_GroupCoordinator (rd_kafka_t *rk,
+						   rd_kafka_broker_t *rkb,
+                                                   rd_kafka_resp_err_t err,
+                                                   rd_kafka_buf_t *rkbuf,
+                                                   rd_kafka_buf_t *request,
+                                                   void *opaque) {
+        const int log_decode_errors = LOG_ERR;
+        int16_t ErrorCode = 0;
+        int32_t CoordId;
+        rd_kafkap_str_t CoordHost = RD_ZERO_INIT;
+        int32_t CoordPort;
+        rd_kafka_cgrp_t *rkcg = opaque;
+        struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
+
+        if (likely(!(ErrorCode = err))) {
+                rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+                rd_kafka_buf_read_i32(rkbuf, &CoordId);
+                rd_kafka_buf_read_str(rkbuf, &CoordHost);
+                rd_kafka_buf_read_i32(rkbuf, &CoordPort);
+        }
+
+        if (ErrorCode)
+                goto err2;
+
+
+        mdb.id = CoordId;
+	RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost);
+	mdb.port = CoordPort;
+
+        rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
+                   "Group \"%.*s\" coordinator is %s:%i id %"PRId32,
+                   RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                   mdb.host, mdb.port, mdb.id);
+        rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb);
+
+        rd_kafka_cgrp_coord_update(rkcg, CoordId);
+        rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
+        return;
+
+err_parse: /* Parse error */
+        ErrorCode = rkbuf->rkbuf_err;
+        /* FALLTHRU */
+
+err2:
+        rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
+                   "Group \"%.*s\" GroupCoordinator response error: %s",
+                   RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                   rd_kafka_err2str(ErrorCode));
+
+        if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+                return;
+
+        if (ErrorCode == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
+                rd_kafka_cgrp_coord_update(rkcg, -1);
+	else {
+                if (rkcg->rkcg_last_err != ErrorCode) {
+                        rd_kafka_q_op_err(rkcg->rkcg_q,
+                                          RD_KAFKA_OP_CONSUMER_ERR,
+                                          ErrorCode, 0, NULL, 0,
+                                          "GroupCoordinator response error: %s",
+                                          rd_kafka_err2str(ErrorCode));
+
+                        /* Suppress repeated errors */
+                        rkcg->rkcg_last_err = ErrorCode;
+                }
+
+		/* Continue querying */
+		rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+        }
+
+        rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
+}
+
+
+/**
+ * Query for coordinator.
+ * Ask any broker in state UP
+ *
+ * Locality: main thread
+ */
+void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
+				const char *reason) {
+	rd_kafka_broker_t *rkb;
+
+	rd_kafka_rdlock(rkcg->rkcg_rk);
+	rkb = rd_kafka_broker_any(rkcg->rkcg_rk, RD_KAFKA_BROKER_STATE_UP,
+				  rd_kafka_broker_filter_can_group_query, NULL);
+	rd_kafka_rdunlock(rkcg->rkcg_rk);
+
+	if (!rkb) {
+		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY",
+			     "Group \"%.*s\": "
+			     "no broker available for coordinator query: %s",
+			     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
+		return;
+	}
+
+        rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
+                   "Group \"%.*s\": querying for coordinator: %s",
+                   RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
+
+        rd_kafka_GroupCoordinatorRequest(rkb, rkcg->rkcg_group_id,
+                                         RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+                                         rd_kafka_cgrp_handle_GroupCoordinator,
+                                         rkcg);
+
+        if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
+                rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
+
+	rd_kafka_broker_destroy(rkb);
+}
+
+/**
+ * @brief Mark the current coordinator as dead.
+ *
+ * @locality main thread
+ */
+void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
+			       const char *reason) {
+	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD",
+		     "Group \"%.*s\": marking the coordinator dead: %s: %s",
+		     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+		     rd_kafka_err2str(err), reason);
+
+	rd_kafka_cgrp_coord_update(rkcg, -1);
+
+	/* Re-query for coordinator */
+	rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+	rd_kafka_cgrp_coord_query(rkcg, reason);
+}
+
+
+
+static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg, int ignore_response) {
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
+                   "Group \"%.*s\": leave",
+                   RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+
+        if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
+                rd_kafka_LeaveGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id,
+                                           rkcg->rkcg_member_id,
+					   ignore_response ?
+					   RD_KAFKA_NO_REPLYQ :
+                                           RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+                                           ignore_response ? NULL :
+                                           rd_kafka_handle_LeaveGroup, rkcg);
+        else if (!ignore_response)
+                rd_kafka_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_rkb,
+                                           RD_KAFKA_RESP_ERR__WAIT_COORD,
+                                           NULL, NULL, rkcg);
+}
+
+
+/**
+ * Enqueue a rebalance op (if configured). 'partitions' is copied.
+ * This delegates the responsibility of assign() and unassign() to the
+ * application.
+ *
+ * Returns 1 if a rebalance op was enqueued, else 0.
+ * Returns 0 if there was no rebalance_cb or 'assignment' is NULL,
+ * in which case rd_kafka_cgrp_assign(rkcg,assignment) is called immediately.
+ */
+static int
+rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg,
+		       rd_kafka_resp_err_t err,
+		       rd_kafka_topic_partition_list_t *assignment,
+		       const char *reason) {
+	rd_kafka_op_t *rko;
+
+        rd_kafka_wrlock(rkcg->rkcg_rk);
+        rkcg->rkcg_c.ts_rebalance = rd_clock();
+        rkcg->rkcg_c.rebalance_cnt++;
+        rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+	/* Pause current partition set consumers until new assign() is called */
+	if (rkcg->rkcg_assignment)
+		rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 1,
+					      RD_KAFKA_TOPPAR_F_LIB_PAUSE,
+					      rkcg->rkcg_assignment);
+
+	if (!(rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE)
+	    || !assignment) {
+	no_delegation:
+		if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
+			rd_kafka_cgrp_assign(rkcg, assignment);
+		else
+			rd_kafka_cgrp_unassign(rkcg);
+		return 0;
+	}
+
+	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+		     "Group \"%s\": delegating %s of %d partition(s) "
+		     "to application rebalance callback on queue %s: %s",
+		     rkcg->rkcg_group_id->str,
+		     err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
+		     "revoke":"assign", assignment->cnt,
+		     rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
+
+	rd_kafka_cgrp_set_join_state(
+		rkcg,
+		err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
+		RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB :
+		RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB);
+
+	rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
+	rko->rko_err = err;
+	rko->rko_u.rebalance.partitions =
+		rd_kafka_topic_partition_list_copy(assignment);
+
+	if (rd_kafka_q_enq(rkcg->rkcg_q, rko) == 0) {
+		/* Queue disabled, handle assignment here. */
+		goto no_delegation;
+	}
+
+	return 1;
+}
+
+
+/**
+ * @brief Run group assignment.
+ */
+static void
+rd_kafka_cgrp_assignor_run (rd_kafka_cgrp_t *rkcg,
+                            const char *protocol_name,
+                            rd_kafka_resp_err_t err,
+                            rd_kafka_metadata_t *metadata,
+                            rd_kafka_group_member_t *members,
+                            int member_cnt) {
+        char errstr[512];
+
+        if (err) {
+                rd_snprintf(errstr, sizeof(errstr),
+                            "Failed to get cluster metadata: %s",
+                            rd_kafka_err2str(err));
+                goto err;
+        }
+
+        *errstr = '\0';
+
+        /* Run assignor */
+        err = rd_kafka_assignor_run(rkcg, protocol_name, metadata,
+                                    members, member_cnt,
+                                    errstr, sizeof(errstr));
+
+        if (err) {
+                if (!*errstr)
+                        rd_snprintf(errstr, sizeof(errstr), "%s",
+                                    rd_kafka_err2str(err));
+                goto err;
+        }
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNOR",
+                     "Group \"%s\": \"%s\" assignor run for %d member(s)",
+                     rkcg->rkcg_group_id->str, protocol_name, member_cnt);
+
+        rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
+
+        /* Respond to broker with assignment set or error */
+        rd_kafka_SyncGroupRequest(rkcg->rkcg_rkb,
+                                  rkcg->rkcg_group_id, rkcg->rkcg_generation_id,
+                                  rkcg->rkcg_member_id,
+                                  members, err ? 0 : member_cnt,
+                                  RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+                                  rd_kafka_handle_SyncGroup, rkcg);
+        return;
+
+err:
+        rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR",
+                     "Group \"%s\": failed to run assignor \"%s\" for "
+                     "%d member(s): %s",
+                     rkcg->rkcg_group_id->str, protocol_name,
+                     member_cnt, errstr);
+
+        rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
+
+}
+
+
+
+/**
+ * @brief Op callback from handle_JoinGroup
+ */
+static rd_kafka_op_res_t
+rd_kafka_cgrp_assignor_handle_Metadata_op (rd_kafka_t *rk,
+                                           rd_kafka_q_t *rkq,
+                                           rd_kafka_op_t *rko) {
+        rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+
+        if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
+                return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
+
+        if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)
+                return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */
+
+        if (!rkcg->rkcg_group_leader.protocol) {
+                rd_kafka_dbg(rk, CGRP, "GRPLEADER",
+                             "Group \"%.*s\": no longer leader: "
+                             "not running assignor",
+                             RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+                return RD_KAFKA_OP_RES_HANDLED;
+        }
+
+        rd_kafka_cgrp_assignor_run(rkcg,
+                                   rkcg->rkcg_group_leader.protocol,
+                                   rko->rko_err, rko->rko_u.metadata.md,
+                                   rkcg->rkcg_group_leader.members,
+                                   rkcg->rkcg_group_leader.member_cnt);
+
+        return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType
+ *
+ * Protocol definition:
+ * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
+ *
+ * Returns 0 on success or -1 on error.
+ */
+static int
+rd_kafka_group_MemberMetadata_consumer_read (
+        rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm,
+        const rd_kafkap_str_t *GroupProtocol,
+        const rd_kafkap_bytes_t *MemberMetadata) {
+
+        rd_kafka_buf_t *rkbuf;
+        int16_t Version;
+        int32_t subscription_cnt;
+        rd_kafkap_bytes_t UserData;
+        const int log_decode_errors = LOG_ERR;
+        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
+
+        /* Create a shadow-buffer pointing to the metadata to ease parsing. */
+        rkbuf = rd_kafka_buf_new_shadow(MemberMetadata->data,
+                                        RD_KAFKAP_BYTES_LEN(MemberMetadata),
+                                        NULL);
+
+        rd_kafka_buf_read_i16(rkbuf, &Version);
+        rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
+
+        if (subscription_cnt > 10000 || subscription_cnt <= 0)
+                goto err;
+
+        rkgm->rkgm_subscription =
+                rd_kafka_topic_partition_list_new(subscription_cnt);
+
+        while (subscription_cnt-- > 0) {
+                rd_kafkap_str_t Topic;
+                char *topic_name;
+                rd_kafka_buf_read_str(rkbuf, &Topic);
+                RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
+                rd_kafka_topic_partition_list_add(rkgm->rkgm_subscription,
+                                                  topic_name,
+                                                  RD_KAFKA_PARTITION_UA);
+        }
+
+        rd_kafka_buf_read_bytes(rkbuf, &UserData);
+        rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);
+
+        rd_kafka_buf_destroy(rkbuf);
+
+        return 0;
+
+ err_parse:
+        err = rkbuf->rkbuf_err;
+
+ err:
+        rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
+                   "Failed to parse MemberMetadata for \"%.*s\": %s",
+                   RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
+                   rd_kafka_err2str(err));
+        if (rkgm->rkgm_subscription) {
+                rd_kafka_topic_partition_list_destroy(rkgm->
+                                                      rkgm_subscription);
+                rkgm->rkgm_subscription = NULL;
+        }
+
+        rd_kafka_buf_destroy(rkbuf);
+        return -1;
+}
+
+
+
+
+/**
+ * @brief cgrp handler for JoinGroup responses
+ * opaque must be the cgrp handle.
+ *
+ * @locality cgrp broker thread
+ */
+static void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk,
+                                            rd_kafka_broker_t *rkb,
+                                            rd_kafka_resp_err_t err,
+                                            rd_kafka_buf_t *rkbuf,
+                                            rd_kafka_buf_t *request,
+                                            void *opaque) {
+        rd_kafka_cgrp_t *rkcg = opaque;
+        const int log_decode_errors = LOG_ERR;
+        int16_t ErrorCode = 0;
+        int32_t GenerationId;
+        rd_kafkap_str_t Protocol, LeaderId, MyMemberId;
+        int32_t member_cnt;
+        int actions;
+        int i_am_leader = 0;
+
+        if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) {
+                rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+                             "JoinGroup response: discarding outdated request "
+                             "(now in join-state %s)",
+                             rd_kafka_cgrp_join_state_names[rkcg->
+                                                            rkcg_join_state]);
+                return;
+        }
+
+        if (err) {
+                ErrorCode = err;
+                goto err;
+        }
+
+        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+        rd_kafka_buf_read_i32(rkbuf, &GenerationId);
+        rd_kafka_buf_read_str(rkbuf, &Protocol);
+        rd_kafka_buf_read_str(rkbuf, &LeaderId);
+        rd_kafka_buf_read_str(rkbuf, &MyMemberId);
+        rd_kafka_buf_read_i32(rkbuf, &member_cnt);
+
+        if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
+                /* Protocol not set, we will not be able to find
+                 * a matching assignor so error out early. */
+                ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG;
+        }
+
+        rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+                     "JoinGroup response: GenerationId %"PRId32", "
+                     "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, "
+                     "%"PRId32" members in group: %s",
+                     GenerationId,
+                     RD_KAFKAP_STR_PR(&Protocol),
+                     RD_KAFKAP_STR_PR(&LeaderId),
+                     !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "",
+                     RD_KAFKAP_STR_PR(&MyMemberId),
+                     member_cnt,
+                     ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)");
+
+        if (!ErrorCode) {
+                char *my_member_id;
+                RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
+                rkcg->rkcg_generation_id = GenerationId;
+                rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
+                i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId);
+        } else {
+                rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000*1000);
+                goto err;
+        }
+
+        if (i_am_leader) {
+                rd_kafka_group_member_t *members;
+                int i;
+                int sub_cnt = 0;
+                rd_list_t topics;
+                rd_kafka_op_t *rko;
+                rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+                             "Elected leader for group \"%s\" "
+                             "with %"PRId32" member(s)",
+                             rkcg->rkcg_group_id->str, member_cnt);
+
+                if (member_cnt > 100000) {
+                        err = RD_KAFKA_RESP_ERR__BAD_MSG;
+                        goto err;
+                }
+
+                rd_list_init(&topics, member_cnt, rd_free);
+
+                members = rd_calloc(member_cnt, sizeof(*members));
+
+                for (i = 0 ; i < member_cnt ; i++) {
+                        rd_kafkap_str_t MemberId;
+                        rd_kafkap_bytes_t MemberMetadata;
+                        rd_kafka_group_member_t *rkgm;
+
+                        rd_kafka_buf_read_str(rkbuf, &MemberId);
+                        rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata);
+
+                        rkgm = &members[sub_cnt];
+                        rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
+                        rd_list_init(&rkgm->rkgm_eligible, 0, NULL);
+
+                        if (rd_kafka_group_MemberMetadata_consumer_read(
+                                    rkb, rkgm, &Protocol, &MemberMetadata)) {
+                                /* Failed to parse this member's metadata,
+                                 * ignore it. */
+                        } else {
+                                sub_cnt++;
+                                rkgm->rkgm_assignment =
+                                        rd_kafka_topic_partition_list_new(
+                                                rkgm->rkgm_subscription->size);
+                                rd_kafka_topic_partition_list_get_topic_names(
+                                        rkgm->rkgm_subscription, &topics,
+                                        0/*dont include regex*/);
+                        }
+
+                }
+
+                /* FIXME: What to do if parsing failed for some/all members?
+                 *        It is a sign of incompatibility. */
+
+
+                rd_kafka_cgrp_group_leader_reset(rkcg,
+                                                 "JoinGroup response clean-up");
+
+                rkcg->rkcg_group_leader.protocol = RD_KAFKAP_STR_DUP(&Protocol);
+                rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL);
+                rkcg->rkcg_group_leader.members    = members;
+                rkcg->rkcg_group_leader.member_cnt = sub_cnt;
+
+                rd_kafka_cgrp_set_join_state(
+                        rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
+
+                /* The assignor will need metadata so fetch it asynchronously
+                 * and run the assignor when we get a reply.
+                 * Create a callback op that the generic metadata code
+                 * will trigger when metadata has been parsed. */
+                rko = rd_kafka_op_new_cb(
+                        rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
+                        rd_kafka_cgrp_assignor_handle_Metadata_op);
+                rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
+
+                rd_kafka_MetadataRequest(rkb, &topics,
+                                         "partition assignor", rko);
+                rd_list_destroy(&topics);
+
+        } else {
+                rd_kafka_cgrp_set_join_state(
+                        rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
+
+                rd_kafka_SyncGroupRequest(rkb, rkcg->rkcg_group_id,
+                                          rkcg->rkcg_generation_id,
+                                          rkcg->rkcg_member_id,
+                                          NULL, 0,
+                                          RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+                                          rd_kafka_handle_SyncGroup, rkcg);
+
+        }
+
+err:
+        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+                                      RD_KAFKA_ERR_ACTION_IGNORE,
+                                      RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+
+                                      RD_KAFKA_ERR_ACTION_END);
+
+        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+                /* Re-query for coordinator */
+                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+                                 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+        }
+
+        if (ErrorCode) {
+                if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+                        return; /* Termination */
+
+                if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
+                        rd_kafka_q_op_err(rkcg->rkcg_q,
+                                          RD_KAFKA_OP_CONSUMER_ERR,
+                                          ErrorCode, 0, NULL, 0,
+                                          "JoinGroup failed: %s",
+                                          rd_kafka_err2str(ErrorCode));
+
+                if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
+                        rd_kafka_cgrp_set_member_id(rkcg, "");
+                rd_kafka_cgrp_set_join_state(rkcg,
+                                             RD_KAFKA_CGRP_JOIN_STATE_INIT);
+        }
+
+        return;
+
+ err_parse:
+        ErrorCode = rkbuf->rkbuf_err;
+        goto err;
+}
+
+
+/**
+ * @brief Check subscription against requested Metadata.
+ */
+static rd_kafka_op_res_t
+rd_kafka_cgrp_handle_Metadata_op (rd_kafka_t *rk, rd_kafka_q_t *rkq,
+                                  rd_kafka_op_t *rko) {
+        rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+
+        if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
+                return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
+
+        rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont rejoin*/);
+
+        return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * @brief (Async) Refresh metadata (for cgrp's needs)
+ *
+ * @returns 1 if metadata refresh was requested, or 0 if metadata is
+ *          up to date, or -1 if no broker is available for metadata requests.
+ *
+ * @locks none
+ * @locality rdkafka main thread
+ */
+static int rd_kafka_cgrp_metadata_refresh (rd_kafka_cgrp_t *rkcg,
+                                            int *metadata_agep,
+                                            const char *reason) {
+        rd_kafka_t *rk = rkcg->rkcg_rk;
+        rd_kafka_op_t *rko;
+        rd_list_t topics;
+        rd_kafka_resp_err_t err;
+
+        rd_list_init(&topics, 8, rd_free);
+
+        /* Insert all non-wildcard topics in cache. */
+        rd_kafka_metadata_cache_hint_rktparlist(rkcg->rkcg_rk,
+                                                rkcg->rkcg_subscription,
+                                                NULL, 0/*dont replace*/);
+
+        if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
+                /* For wildcard subscriptions make sure the
+                 * cached full metadata isn't too old. */
+                int metadata_age = -1;
+
+                if (rk->rk_ts_full_metadata)
+                        metadata_age = (int)(rd_clock() -
+                                             rk->rk_ts_full_metadata)/1000;
+
+                *metadata_agep = metadata_age;
+
+                if (metadata_age != -1 &&
+                    metadata_age <=
+                    /* The +1000 is since metadata.refresh.interval.ms
+                     * can be set to 0. */
+                    rk->rk_conf.metadata_refresh_interval_ms + 1000) {
+                        rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+                                     "CGRPMETADATA",
+                                     "%s: metadata for wildcard subscription "
+                                     "is up to date (%dms old)",
+                                     reason, *metadata_agep);
+                        rd_list_destroy(&topics);
+                        return 0; /* Up-to-date */
+                }
+
+        } else {
+                /* Check that all subscribed topics are in the cache. */
+                int r;
+
+                rd_kafka_topic_partition_list_get_topic_names(
+                        rkcg->rkcg_subscription, &topics, 0/*no regexps*/);
+
+                rd_kafka_rdlock(rk);
+                r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics,
+                                                                metadata_agep);
+                rd_kafka_rdunlock(rk);
+
+                if (r == rd_list_cnt(&topics)) {
+                        rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+                                     "CGRPMETADATA",
+                                     "%s: metadata for subscription "
+                                     "is up to date (%dms old)", reason,
+                                     *metadata_agep);
+                        rd_list_destroy(&topics);
+                        return 0; /* Up-to-date and all topics exist. */
+                }
+
+                rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+                             "CGRPMETADATA",
+                             "%s: metadata for subscription "
+                             "only available for %d/%d topics (%dms old)",
+                             reason, r, rd_list_cnt(&topics), *metadata_agep);
+
+        }
+
+        /* Async request, result will be triggered from
+         * rd_kafka_parse_metadata(). */
+        rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
+                                 rd_kafka_cgrp_handle_Metadata_op);
+        rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
+
+        err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
+                                        reason, rko);
+        if (err) {
+                rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
+                             "CGRPMETADATA",
+                             "%s: need to refresh metadata (%dms old) "
+                             "but no usable brokers available: %s",
+                             reason, *metadata_agep, rd_kafka_err2str(err));
+                rd_kafka_op_destroy(rko);
+        }
+
+        rd_list_destroy(&topics);
+
+        return err ? -1 : 1;
+}
+
+
+
+static void rd_kafka_cgrp_join (rd_kafka_cgrp_t *rkcg) {
+        int metadata_age;
+
+        if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
+            rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT)
+                return;
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
+                     "Group \"%.*s\": join with %d (%d) subscribed topic(s)",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rd_list_cnt(rkcg->rkcg_subscribed_topics),
+                     rkcg->rkcg_subscription->cnt);
+
+
+        /* See if we need to query metadata to continue:
+         * - if subscription contains wildcards:
+         *   * query all topics in cluster
+         *
+         * - if subscription does not contain wildcards but
+         *   some topics are missing from the local metadata cache:
+         *   * query subscribed topics (all cached ones)
+         *
+         * - otherwise:
+         *   * rely on topic metadata cache
+         */
+        /* We need up-to-date full metadata to continue,
+         * refresh metadata if necessary. */
+        if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
+                                           "consumer join") == 1) {
+                rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
+                             "Group \"%.*s\": "
+                             "postponing join until up-to-date "
+                             "metadata is available",
+                             RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+                return; /* ^ async call */
+        }
+
+        if (rd_list_empty(rkcg->rkcg_subscribed_topics))
+                rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont join*/);
+
+        if (rd_list_empty(rkcg->rkcg_subscribed_topics)) {
+                rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
+                             "Group \"%.*s\": "
+                             "no matching topics based on %dms old metadata: "
+                             "next metadata refresh in %dms",
+                             RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                             metadata_age,
+                             rkcg->rkcg_rk->rk_conf.
+                             metadata_refresh_interval_ms - metadata_age);
+                return;
+        }
+
+        rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN);
+        rd_kafka_JoinGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id,
+                                  rkcg->rkcg_member_id,
+                                  rkcg->rkcg_rk->rk_conf.group_protocol_type,
+                                  rkcg->rkcg_subscribed_topics,
+                                  RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+                                  rd_kafka_cgrp_handle_JoinGroup, rkcg);
+}
+
+/**
+ * Rejoin group on update to effective subscribed topics list
+ */
+static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg) {
+        /*
+         * Clean-up group leader duties, if any.
+         */
+        rd_kafka_cgrp_group_leader_reset(rkcg, "Group rejoin");
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "REJOIN",
+                     "Group \"%.*s\" rejoining in join-state %s "
+                     "with%s an assignment",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+                     rkcg->rkcg_assignment ? "" : "out");
+
+        /* Remove assignment (async), if any. If there is already an
+         * unassign in progress we dont need to bother. */
+        if (rkcg->rkcg_assignment) {
+		if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
+			rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
+
+			rd_kafka_rebalance_op(
+				rkcg,
+				RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+				rkcg->rkcg_assignment, "unsubscribe");
+		}
+	} else {
+                rd_kafka_cgrp_set_join_state(rkcg,
+                                             RD_KAFKA_CGRP_JOIN_STATE_INIT);
+		rd_kafka_cgrp_join(rkcg);
+	}
+}
+
+/**
+ * Update the effective list of subscribed topics and trigger a rejoin
+ * if it changed.
+ *
+ * Set \p tinfos to NULL for clearing the list.
+ *
+ * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list
+ *
+ * @returns 1 on change, else 0.
+ *
+ * @remark Takes ownership of \p tinfos
+ */
+static int
+rd_kafka_cgrp_update_subscribed_topics (rd_kafka_cgrp_t *rkcg,
+                                        rd_list_t *tinfos) {
+        rd_kafka_topic_info_t *tinfo;
+        int i;
+
+        if (!tinfos) {
+                if (!rd_list_empty(rkcg->rkcg_subscribed_topics))
+                        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
+                                     "Group \"%.*s\": "
+                                     "clearing subscribed topics list (%d)",
+                                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                                     rd_list_cnt(rkcg->rkcg_subscribed_topics));
+                tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
+
+        } else {
+                if (rd_list_cnt(tinfos) == 0)
+                        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
+                                     "Group \"%.*s\": "
+                                     "no topics in metadata matched "
+                                     "subscription",
+                                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+        }
+
+        /* Sort for comparison */
+        rd_list_sort(tinfos, rd_kafka_topic_info_cmp);
+
+        /* Compare to existing to see if anything changed. */
+        if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos,
+                         rd_kafka_topic_info_cmp)) {
+                /* No change */
+                rd_list_destroy(tinfos);
+                return 0;
+        }
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "SUBSCRIPTION",
+                     "Group \"%.*s\": effective subscription list changed "
+                     "from %d to %d topic(s):",
+                     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+                     rd_list_cnt(rkcg->rkcg_subscribed_topics),
+                     rd_list_cnt(tinfos));
+
+        RD_LIST_FOREACH(tinfo, tinfos, i)
+                rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA,
+                             "SUBSCRIPTION",
+                             " Topic %s with %d partition(s)",
+                             tinfo->topic, tinfo->partition_cnt);
+
+        rd_list_destroy(rkcg->rkcg_subscribed_topics);
+
+        rkcg->rkcg_subscribed_topics = tinfos;
+
+        return 1;
+}
+
+
+
+/**
+ * @brief Handle heart Heartbeat response.
+ */
+void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
+                                     rd_kafka_broker_t *rkb,
+                                     rd_kafka_resp_err_t err,
+                                     rd_kafka_buf_t *rkbuf,
+                                     rd_kafka_buf_t *request,
+                                     void *opaque) {
+        rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+        const int log_decode_errors = LOG_ERR;
+        int16_t ErrorCode = 0;
+        int actions;
+
+        if (err) {
+                ErrorCode = err;
+                goto err;
+        }
+
+        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+err:
+        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
+                                      RD_KAFKA_ERR_ACTION_END);
+
+        rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
+        rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+
+        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+                /* Re-query for coordinator */
+                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+                                 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+                /* Schedule a retry */
+                if (ErrorCode != RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP) {
+                        rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+                        rd_kafka_buf_keep(request);
+                        rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+                        rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
+                }
+                return;
+        }
+
+        if (ErrorCode != 0 && ErrorCode != RD_KAFKA_RESP_ERR__DESTROY)
+                rd_kafka_cgrp_handle_heartbeat_error(rkcg, ErrorCode);
+
+        return;
+
+ err_parse:
+        ErrorCode = rkbuf->rkbuf_err;
+        goto err;
+}
+
+
+
+/**
+ * @brief Send Heartbeat
+ */
+static void rd_kafka_cgrp_heartbeat (rd_kafka_cgrp_t *rkcg,
+                                     rd_kafka_broker_t *rkb) {
+        /* Skip heartbeat if we have one in transit */
+        if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
+                return;
+
+        rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+        rd_kafka_HeartbeatRequest(rkb, rkcg->rkcg_group_id,
+                                  rkcg->rkcg_generation_id,
+                                  rkcg->rkcg_member_id,
+                                  RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+                                  rd_kafka_cgrp_handle_Heartbeat, NULL);
+}
+
+/**
+ * Cgrp is now terminated: decommission it and signal back to application.
+ */
+static void rd_kafka_cgrp_terminated (rd_kafka_cgrp_t *rkcg) {
+
+	rd_kafka_assert(NULL, rkcg->rkcg_wait_unassign_cnt == 0);
+	rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt == 0);
+	rd_kafka_assert(NULL, !(rkcg->rkcg_flags&RD_KAFKA_CGRP_F_WAIT_UNASSIGN));
+        rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM);
+
+        rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
+                            &rkcg->rkcg_offset_commit_tmr, 1/*lock*/);
+
+	rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
+
+	/* Disable and empty ops queue since there will be no
+	 * (broker) thread serving it anymore after the unassign_broker
+	 * below.
+	 * This prevents hang on destroy where responses are enqueued on rkcg_ops
+	 * without anything serving the queue. */
+	rd_kafka_q_disable(rkcg->rkcg_ops);
+	rd_kafka_q_purge(rkcg->rkcg_ops);
+
+	if (rkcg->rkcg_rkb)
+		rd_kafka_cgrp_unassign_broker(rkcg);
+
+        if (rkcg->rkcg_reply_rko) {
+                /* Signal back to application. */
+                rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq,
+				    rkcg->rkcg_reply_rko, 0);
+                rkcg->rkcg_reply_rko = NULL;
+        }
+}
+
+
+/**
+ * If a cgrp is terminating and all outstanding ops are now finished
+ * then progress to final termination and return 1.
+ * Else returns 0.
+ */
+static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg) {
+
+        if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
+                return 1;
+
+	if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
+		return 0;
+
+	/* Check if wait-coord queue has timed out. */
+	if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
+	    rkcg->rkcg_ts_terminate +
+	    (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
+	    rd_clock()) {
+		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
+			     "Group \"%s\": timing out %d op(s) in "
+			     "wait-for-coordinator queue",
+			     rkcg->rkcg_group_id->str,
+			     rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
+		rd_kafka_q_disable(rkcg->rkcg_wait_coord_q);
+		if (rd_kafka_q_concat(rkcg->rkcg_ops,
+				      rkcg->rkcg_wait_coord_q) == -1) {
+			/* ops queue shut down, purge coord queue */
+			rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
+		}
+	}
+
+	if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
+	    rd_list_empty(&rkcg->rkcg_toppars) &&
+	    rkcg->rkcg_wait_unassign_cnt == 0 &&
+	    rkcg->rkcg_wait_commit_cnt == 0 &&
+            !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
+                /* Since we might be deep down in a 'rko' handler
+                 * called from cgrp_op_serve() we cant call terminated()
+                 * directly since it will decommission the rkcg_ops queue
+                 * that might be locked by intermediate functions.
+                 * Instead set the TERM state and let the cgrp terminate
+                 * at its own discretion. */
+                rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM);
+                return 1;
+        } else {
+		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
+			     "Group \"%s\": "
+			     "waiting for %s%d toppar(s), %d unassignment(s), "
+			     "%d commit(s)%s (state %s, join-state %s) "
+			     "before terminating",
+			     rkcg->rkcg_group_id->str,
+                             RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) ?
+                             "rebalance_cb, ": "",
+			     rd_list_cnt(&rkcg->rkcg_toppars),
+			     rkcg->rkcg_wait_unassign_cnt,
+			     rkcg->rkcg_wait_commit_cnt,
+			     (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
+			     ", wait-unassign flag," : "",
+			     rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+			     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+                return 0;
+        }
+}
+
+
+/**
+ * Add partition to this cgrp management
+ */
+static void rd_kafka_cgrp_partition_add (rd_kafka_cgrp_t *rkcg,
+                                         rd_kafka_toppar_t *rktp) {
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP,"PARTADD",
+                     "Group \"%s\": add %s [%"PRId32"]",
+                     rkcg->rkcg_group_id->str,
+                     rktp->rktp_rkt->rkt_topic->str,
+                     rktp->rktp_partition);
+
+        rd_kafka_assert(rkcg->rkcg_rk, !rktp->rktp_s_for_cgrp);
+        rktp->rktp_s_for_cgrp = rd_kafka_toppar_keep(rktp);
+        rd_list_add(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
+}
+
+/**
+ * Remove partition from this cgrp management
+ */
+static void rd_kafka_cgrp_partition_del (rd_kafka_cgrp_t *rkcg,
+                                         rd_kafka_toppar_t *rktp) {
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
+                     "Group \"%s\": delete %s [%"PRId32"]",
+                     rkcg->rkcg_group_id->str,
+                     rktp->rktp_rkt->rkt_topic->str,
+                     rktp->rktp_partition);
+        rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_s_for_cgrp);
+
+        rd_list_remove(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
+        rd_kafka_toppar_destroy(rktp->rktp_s_for_cgrp);
+        rktp->rktp_s_for_cgrp = NULL;
+
+        rd_kafka_cgrp_try_terminate(rkcg);
+}
+
+
+
+/**
+ * Reply for OffsetFetch from call below.
+ */
+static void rd_kafka_cgrp_offsets_fetch_response (
+	rd_kafka_t *rk,
+	rd_kafka_broker_t *rkb,
+	rd_kafka_resp_err_t err,
+	rd_kafka_buf_t *reply,
+	rd_kafka_buf_t *request,
+	void *opaque) {
+	rd_kafka_topic_partition_list_t *offsets = opaque;
+	rd_kafka_cgrp_t *rkcg;
+
+	if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+                /* Termination, quick cleanup. */
+		rd_kafka_topic_partition_list_destroy(offsets);
+                return;
+        }
+
+        rkcg = rd_kafka_cgrp_get(rk);
+
+        if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version)) {
+                rd_kafka_topic_partition_list_destroy(offsets);
+                return;
+        }
+
+	rd_kafka_topic_partition_list_log(rk, "OFFSETFETCH", offsets);
+	/* If all partitions already had usable offsets then there
+	 * was no request sent and thus no reply, the offsets list is
+	 * good to go. */
+	if (reply)
+		err = rd_kafka_handle_OffsetFetch(rk, rkb, err,
+						  reply, request, offsets,
+						  1/* Update toppars */);
+	if (err) {
+		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
+			     "Offset fetch error: %s",
+			     rd_kafka_err2str(err));
+
+		if (err != RD_KAFKA_RESP_ERR__WAIT_COORD)
+			rd_kafka_q_op_err(rkcg->rkcg_q,
+					  RD_KAFKA_OP_CONSUMER_ERR, err, 0,
+					  NULL, 0,
+					  "Failed to fetch offsets: %s",
+					  rd_kafka_err2str(err));
+	} else {
+		if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
+			rd_kafka_cgrp_partitions_fetch_start(
+				rkcg, offsets, 1 /* usable offsets */);
+		else
+			rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
+				     "Group \"%.*s\": "
+				     "ignoring Offset fetch response for "
+				     "%d partition(s): in state %s",
+				     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+				     offsets ? offsets->cnt : -1,
+				     rd_kafka_cgrp_join_state_names[
+					     rkcg->rkcg_join_state]);
+	}
+
+	rd_kafka_topic_partition_list_destroy(offsets);
+}
+
+/**
+ * Fetch offsets for a list of partitions
+ */
+static void
+rd_kafka_cgrp_offsets_fetch (rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb,
+                             rd_kafka_topic_partition_list_t *offsets) {
+	rd_kafka_topic_partition_list_t *use_offsets;
+
+	/* Make a copy of the offsets */
+	use_offsets = rd_kafka_topic_partition_list_copy(offsets);
+
+        if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkb)
+		rd_kafka_cgrp_offsets_fetch_response(
+			rkcg->rkcg_rk, rkb, RD_KAFKA_RESP_ERR__WAIT_COORD,
+			NULL, NULL, use_offsets);
+        else {
+		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
+			     "Fetch %d offsets with v%d",
+			     use_offsets->cnt, rkcg->rkcg_version);
+                rd_kafka_OffsetFetchRequest(
+                        rkb, 1, offsets,
+                        RD_KAFKA_REPLYQ(rkcg->rkcg_ops, rkcg->rkcg_version),
+			rd_kafka_cgrp_offsets_fetch_response,
+			use_offsets);
+	}
+
+}
+
+
+/**
+ * Start fetching all partitions in 'assignment' (async)
+ */
+static void
+rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
+				       rd_kafka_topic_partition_list_t
+				       *assignment, int usable_offsets,
+				       int line) {
+        int i;
+
+	/* If waiting for offsets to commit we need that to finish first
+	 * before starting fetchers (which might fetch those stored offsets).*/
+	if (rkcg->rkcg_wait_commit_cnt > 0) {
+		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
+			     "Group \"%s\": not starting fetchers "
+			     "for %d assigned partition(s) in join-state %s "
+			     "(usable_offsets=%s, v%"PRId32", line %d): "
+			     "waiting for %d commit(s)",
+			     rkcg->rkcg_group_id->str, assignment->cnt,
+			     rd_kafka_cgrp_join_state_names[rkcg->
+							    rkcg_join_state],
+			     usable_offsets ? "yes":"no",
+			     rkcg->rkcg_version, line,
+			     rkcg->rkcg_wait_commit_cnt);
+		return;
+	}
+
+	rd_kafka_cgrp_version_new_barrier(rkcg);
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
+                     "Group \"%s\": starting fetchers for %d assigned "
+                     "partition(s) in join-state %s "
+		     "(usable_offsets=%s, v%"PRId32", line %d)",
+                     rkcg->rkcg_group_id->str, assignment->cnt,
+		     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+		     usable_offsets ? "yes":"no",
+		     rkcg->rkcg_version, line);
+
+	rd_kafka_topic_partition_list_log(rkcg->rkcg_rk,
+					  "FETCHSTART", assignment);
+
+        if (assignment->cnt == 0)
+                return;
+
+	/* Check if offsets are really unusable, this is to catch the
+	 * case where the entire assignment has absolute offsets set which
+	 * should make us skip offset lookups. */
+	if (!usable_offsets)
+		usable_offsets =
+			rd_kafka_topic_partition_list_count_abs_offsets(
+				assignment) == assignment->cnt;
+
+        if (!usable_offsets &&
+            rkcg->rkcg_rk->rk_conf.offset_store_method ==
+            RD_KAFKA_OFFSET_METHOD_BROKER) {
+
+                /* Fetch offsets for all assigned partitions */
+                rd_kafka_cgrp_offsets_fetch(rkcg, rkcg->rkcg_rkb, assignment);
+
+        } else {
+		rd_kafka_cgrp_set_join_state(rkcg,
+					     RD_KAFKA_CGRP_JOIN_STATE_STARTED);
+
+                for (i = 0 ; i < assignment->cnt ; i++) {
+                        rd_kafka_topic_partition_t *rktpar =
+                                &assignment->elems[i];
+                        shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
+                        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
+
+			if (!rktp->rktp_assigned) {
+				rktp->rktp_assigned = 1;
+				rkcg->rkcg_assigned_cnt++;
+
+				/* Start fetcher for partition and
+				 * forward partition's fetchq to
+				 * consumer groups queue. */
+				rd_kafka_toppar_op_fetch_start(
+					rktp, rktpar->offset,
+					rkcg->rkcg_q, RD_KAFKA_NO_REPLYQ);
+			} else {
+				int64_t offset;
+				/* Fetcher already started,
+				 * just do seek to update offset */
+				rd_kafka_toppar_lock(rktp);
+				if (rktpar->offset < rktp->rktp_app_offset)
+					offset = rktp->rktp_app_offset;
+				else
+					offset = rktpar->offset;
+				rd_kafka_toppar_unlock(rktp);
+				rd_kafka_toppar_op_seek(rktp, offset,
+							RD_KAFKA_NO_REPLYQ);
+			}
+                }
+        }
+
+	rd_kafka_assert(NULL, rkcg->rkcg_assigned_cnt <=
+			(rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0));
+}
+
+
+
+
+
+/**
+ * @brief Defer offset commit (rko) until coordinator is available.
+ *
+ * @returns 1 if the rko was deferred or 0 if the defer queue is disabled
+ *          or rko already deferred.
+ */
+static int rd_kafka_cgrp_defer_offset_commit (rd_kafka_cgrp_t *rkcg,
+                                              rd_kafka_op_t *rko,
+                                              const char *reason) {
+
+        /* wait_coord_q is disabled session.timeout.ms after
+         * group close() has been initated. */
+        if (rko->rko_u.offset_commit.ts_timeout != 0 ||
+            !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
+                return 0;
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
+                     "Group \"%s\": "
+                     "unable to OffsetCommit in state %s: %s: "
+                     "coordinator (%s) is unavailable: "
+                     "retrying later",
+                     rkcg->rkcg_group_id->str,
+                     rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+                     reason,
+                     rkcg->rkcg_rkb ?
+                     rd_kafka_broker_name(rkcg->rkcg_rkb) :
+                     "none");
+
+        rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
+        rko->rko_u.offset_commit.ts_timeout = rd_clock() +
+                (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms
+                 * 1000);
+        rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
+
+        return 1;
+}
+
+
+/**
+ * @brief Handler of OffsetCommit response (after parsing).
+ * @remark \p offsets may be NULL if \p err is set
+ * @returns the number of partitions with errors encountered
+ */
+static int
+rd_kafka_cgrp_handle_OffsetCommit (rd_kafka_cgrp_t *rkcg,
+                                   rd_kafka_resp_err_t err,
+                                   rd_kafka_topic_partition_list_t
+                                   *offsets) {
+	int i;
+        int errcnt = 0;
+
+	if (!err) {
+		/* Update toppars' committed offset */
+		for (i = 0 ; i < offsets->cnt ; i++) {
+			rd_kafka_topic_partition_t *rktpar =&offsets->elems[i];
+			shptr_rd_kafka_toppar_t *s_rktp;
+			rd_kafka_toppar_t *rktp;
+
+			if (unlikely(rktpar->err)) {
+				rd_kafka_dbg(rkcg->rkcg_rk, TOPIC,
+					     "OFFSET",
+					     "OffsetCommit failed for "
+					     "%s [%"PRId32"] at offset "
+					     "%"PRId64": %s",
+					     rktpar->topic, rktpar->partition,
+					     rktpar->offset,
+					     rd_kafka_err2str(rktpar->err));
+                                errcnt++;
+				continue;
+			} else if (unlikely(rktpar->offset < 0))
+				continue;
+
+			s_rktp = rd_kafka_topic_partition_list_get_toppar(
+				rkcg->rkcg_rk, rktpar);
+			if (!s_rktp)
+				continue;
+
+			rktp = rd_kafka_toppar_s2i(s_rktp);
+			rd_kafka_toppar_lock(rktp);
+			rktp->rktp_committed_offset = rktpar->offset;
+			rd_kafka_toppar_unlock(rktp);
+
+			rd_kafka_toppar_destroy(s_rktp);
+		}
+	}
+
+        if (rd_kafka_cgrp_try_terminate(rkcg))
+                return errcnt; /* terminated */
+
+        if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
+		rd_kafka_cgrp_check_unassign_done(rkcg,
+                                                  "OffsetCommit done");
+
+        return errcnt;
+}
+
+
+
+
+/**
+ * Handle OffsetCommitResponse
+ * Takes the original 'rko' as opaque argument.
+ * @remark \p rkb, rkbuf, and request may be NULL in a number of
+ *         error cases (e.g., _NO_OFFSET, _WAIT_COORD)
+ */
+static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
+						  rd_kafka_broker_t *rkb,
+						  rd_kafka_resp_err_t err,
+						  rd_kafka_buf_t *rkbuf,
+						  rd_kafka_buf_t *request,
+						  void *opaque) {
+	rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+        rd_kafka_op_t *rko_orig = opaque;
+	rd_kafka_topic_partition_list_t *offsets =
+		rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */
+        int errcnt;
+        int offset_commit_cb_served = 0;
+
+	RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT);
+
+        if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version))
+                err = RD_KAFKA_RESP_ERR__DESTROY;
+
+	err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf,
+					   request, offsets);
+
+        if (rkb)
+                rd_rkb_dbg(rkb, CGRP, "COMMIT",
+                           "OffsetCommit for %d partition(s): %s: returned: %s",
+                           offsets ? offsets->cnt : -1,
+                           rko_orig->rko_u.offset_commit.reason,
+                           rd_kafka_err2str(err));
+        else
+                rd_kafka_dbg(rk, CGRP, "COMMIT",
+                             "OffsetCommit for %d partition(s): %s: returned: %s",
+                             offsets ? offsets->cnt : -1,
+                             rko_orig->rko_u.offset_commit.reason,
+                             rd_kafka_err2str(err));
+
+        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+                return; /* Retrying */
+        else if (err == RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP ||
+                 err == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) {
+
+                /* future-proofing, see timeout_scan(). */
+                rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD);
+
+                if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig,
+                                                      rd_kafka_err2str(err)))
+                        return;
+
+                /* FALLTHRU and error out */
+        }
+
+	rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt > 0);
+	rkcg->rkcg_wait_commit_cnt--;
+
+	if (err == RD_KAFKA_RESP_ERR__DESTROY ||
+            (err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
+             rko_orig->rko_u.offset_commit.silent_empty)) {
+		rd_kafka_op_destroy(rko_orig);
+                rd_kafka_cgrp_check_unassign_done(
+                        rkcg,
+                        err == RD_KAFKA_RESP_ERR__DESTROY ?
+                        "OffsetCommit done (__DESTROY)" :
+                        "OffsetCommit done (__NO_OFFSET)");
+		return;
+	}
+
+        /* Call on_commit interceptors */
+        if (err != RD_KAFKA_RESP_ERR__NO_OFFSET &&
+            err != RD_KAFKA_RESP_ERR__DESTROY &&
+            offsets && offsets->cnt > 0)
+                rd_kafka_interceptors_on_commit(rk, offsets, err);
+
+
+	/* If no special callback is set but a offset_commit_cb has
+	 * been set in conf then post an event for the latter. */
+	if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
+                rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
+
+                rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
+
+		if (offsets)
+			rko_reply->rko_u.offset_commit.partitions =
+				rd_kafka_topic_partition_list_copy(offsets);
+
+		rko_reply->rko_u.offset_commit.cb =
+			rk->rk_conf.offset_commit_cb;
+		rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
+
+                rd_kafka_q_enq(rk->rk_rep, rko_reply);
+                offset_commit_cb_served++;
+	}
+
+
+	/* Enqueue reply to requester's queue, if any. */
+	if (rko_orig->rko_replyq.q) {
+                rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
+
+                rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
+
+		/* Copy offset & partitions & callbacks to reply op */
+		rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
+		if (offsets)
+			rko_reply->rko_u.offset_commit.partitions =
+				rd_kafka_topic_partition_list_copy(offsets);
+                if (rko_reply->rko_u.offset_commit.reason)
+                        rko_reply->rko_u.offset_commit.reason =
+                        rd_strdup(rko_reply->rko_u.offset_commit.reason);
+
+                rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0);
+                offset_commit_cb_served++;
+        }
+
+        errcnt = rd_kafka_cgrp_handle_OffsetCommit(rkcg, err, offsets);
+
+        if (!offset_commit_cb_served &&
+            err != RD_KAFKA_RESP_ERR_NO_ERROR &&
+            err != RD_KAFKA_RESP_ERR__NO_OFFSET) {
+                /* If there is no callback or handler for this (auto)
+                 * commit then raise an error to the application (#1043) */
+                char tmp[512];
+
+                rd_kafka_topic_partition_list_str(
+                        offsets, tmp, sizeof(tmp),
+                        /*no partition-errs if a global error*/
+                        RD_KAFKA_FMT_F_OFFSET |
+                        (err ? 0 : RD_KAFKA_FMT_F_ONLY_ERR));
+
+                rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL",
+                             "Offset commit (%s) failed "
+                             "for %d/%d partition(s): "
+                             "%s%s%s",
+                             rko_orig->rko_u.offset_commit.reason,
+                             err ? offsets->cnt : errcnt, offsets->cnt,
+                             err ? rd_kafka_err2str(err) : "",
+                             err ? ": " : "",
+                             tmp);
+        }
+
+        rd_kafka_op_destroy(rko_orig);
+}
+
+
+static size_t rd_kafka_topic_partition_has_absolute_offset (
+        const rd_kafka_topic_partition_t *rktpar, void *opaque) {
+        return rktpar->offset >= 0 ? 1 : 0;
+}
+
+
+/**
+ * Commit a list of offsets.
+ * Reuse the orignating 'rko' for the async reply.
+ * 'rko->rko_payload' should either by NULL (to commit current assignment) or
+ * a proper topic_partition_list_t with offsets to commit.
+ * The offset list will be altered.
+ *
+ * \p rko...silent_empty: if there are no offsets to commit bail out
+ *                        silently without posting an op on the reply queue.
+ * \p set_offsets: set offsets in rko->rko_u.offset_commit.partitions
+ *
+ * \p op_version: cgrp's op version to use (or 0)
+ *
+ * Locality: cgrp thread
+ */
+static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
+                                          rd_kafka_op_t *rko,
+                                          int set_offsets,
+                                          const char *reason,
+                                          int op_version) {
+	rd_kafka_topic_partition_list_t *offsets;
+	rd_kafka_resp_err_t err;
+        int valid_offsets = 0;
+
+	/* If offsets is NULL we shall use the current assignment. */
+	if (!rko->rko_u.offset_commit.partitions && rkcg->rkcg_assignment)
+		rko->rko_u.offset_commit.partitions =
+			rd_kafka_topic_partition_list_copy(
+				rkcg->rkcg_assignment);
+
+	offsets = rko->rko_u.offset_commit.partitions;
+
+        if (offsets) {
+                /* Set offsets to commits */
+                if (set_offsets)
+                        rd_kafka_topic_partition_list_set_offsets(
+			rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1,
+			RD_KAFKA_OFFSET_INVALID/* def */,
+			1 /* is commit */);
+
+                /*  Check the number of valid offsets to commit. */
+                valid_offsets = (int)rd_kafka_topic_partition_list_sum(
+                        offsets,
+                        rd_kafka_topic_partition_has_absolute_offset, NULL);
+        }
+
+        if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
+                /* wait_commit_cnt has already been increased for
+                 * reprocessed ops. */
+                rkcg->rkcg_wait_commit_cnt++;
+        }
+
+	if (!valid_offsets) {
+                /* No valid offsets */
+                err = RD_KAFKA_RESP_ERR__NO_OFFSET;
+                goto err;
+	}
+
+        if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_rkb ||
+	    rkcg->rkcg_rkb->rkb_source == RD_KAFKA_INTERNAL) {
+
+		if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason))
+			return;
+
+		err = RD_KAFKA_RESP_ERR__WAIT_COORD;
+
+	} else {
+                int r;
+
+                /* Send OffsetCommit */
+                r = rd_kafka_OffsetCommitRequest(
+                            rkcg->rkcg_rkb, rkcg, 1, offsets,
+                            RD_KAFKA_REPLYQ(rkcg->rkcg_ops, op_version),
+                            rd_kafka_cgrp_op_handle_OffsetCommit, rko,
+                        reason);
+
+                /* Must have valid offsets to commit if we get here */
+                rd_kafka_assert(NULL, r != 0);
+
+                return;
+        }
+
+
+
+ err:
+	/* Propagate error to whoever wanted offset committed. */
+	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
+		     "OffsetCommit internal error: %s", rd_kafka_err2str(err));
+	rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL, err,
+					     NULL, NULL, rko);
+}
+
+
+/**
+ * Commit offsets for all assigned partitions.
+ */
+static void
+rd_kafka_cgrp_assigned_offsets_commit (rd_kafka_cgrp_t *rkcg,
+                                       const rd_kafka_topic_partition_list_t
+                                       *offsets, const char *reason) {
+        rd_kafka_op_t *rko;
+
+	rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
+        rko->rko_u.offset_commit.reason = rd_strdup(reason);
+	if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT) {
+		rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0);
+		rko->rko_u.offset_commit.cb =
+			rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/
+		rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque;
+	}
+        /* NULL partitions means current assignment */
+        if (offsets)
+                rko->rko_u.offset_commit.partitions =
+                        rd_kafka_topic_partition_list_copy(offsets);
+	rko->rko_u.offset_commit.silent_empty = 1;
+        rd_kafka_cgrp_offsets_commit(rkcg, rko, 1/* set offsets */, reason,
+                                     rkcg->rkcg_version);
+}
+
+
+/**
+ * auto.commit.interval.ms commit timer callback.
+ *
+ * Trigger a group offset commit.
+ *
+ * Locality: rdkafka main thread
+ */
+static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
+                                                void *arg) {
+        rd_kafka_cgrp_t *rkcg = arg;
+
+	rd_kafka_cgrp_assigned_offsets_commit(rkcg, NULL,
+                                              "cgrp auto commit timer");
+}
+
+
+
+
+/**
+ * Call when all unassign operations are done to transition to the next state
+ */
+static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg,
+                                         const char *reason) {
+	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+		     "Group \"%s\": unassign done in state %s (join state %s): "
+		     "%s: %s",
+		     rkcg->rkcg_group_id->str,
+		     rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+		     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+		     rkcg->rkcg_assignment ?
+		     "with new assignment" : "without new assignment",
+                     reason);
+
+	if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN) {
+		rd_kafka_cgrp_leave(rkcg, 1/*ignore response*/);
+		rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN;
+	}
+
+        if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) {
+                rd_kafka_cgrp_try_terminate(rkcg);
+                return;
+        }
+
+        if (rkcg->rkcg_assignment) {
+		rd_kafka_cgrp_set_join_state(rkcg,
+					     RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
+                if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
+                        rd_kafka_cgrp_partitions_fetch_start(
+                                rkcg, rkcg->rkcg_assignment, 0);
+	} else {
+		rd_kafka_cgrp_set_join_state(rkcg,
+					     RD_KAFKA_CGRP_JOIN_STATE_INIT);
+	}
+
+	rd_kafka_cgrp_try_terminate(rkcg);
+}
+
+
+/**
+ * Checks if the current unassignment is done and if so
+ * calls .._done().
+ * Else does nothing.
+ */
+static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
+                                               const char *reason) {
+	if (rkcg->rkcg_wait_unassign_cnt > 0 ||
+	    rkcg->rkcg_assigned_cnt > 0 ||
+	    rkcg->rkcg_wait_commit_cnt > 0 ||
+	    rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN) {
+                rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+                             "Unassign not done yet "
+                             "(%d wait_unassign, %d assigned, %d wait commit"
+                             "%s): %s",
+                             rkcg->rkcg_wait_unassign_cnt,
+                             rkcg->rkcg_assigned_cnt,
+                             rkcg->rkcg_wait_commit_cnt,
+                             (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
+                             ", F_WAIT_UNASSIGN" : "", reason);
+		return;
+        }
+
+	rd_kafka_cgrp_unassign_done(rkcg, reason);
+}
+
+
+
+/**
+ * Remove existing assignment.
+ */
+static rd_kafka_resp_err_t
+rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) {
+        int i;
+        rd_kafka_topic_partition_list_t *old_assignment;
+
+        rd_kafka_cgrp_set_join_state(rkcg,
+                                     RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN);
+
+	rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
+        old_assignment = rkcg->rkcg_assignment;
+        if (!old_assignment) {
+		rd_kafka_cgrp_check_unassign_done(
+                        rkcg, "unassign (no previous assignment)");
+                return RD_KAFKA_RESP_ERR_NO_ERROR;
+	}
+        rkcg->rkcg_assignment = NULL;
+
+	rd_kafka_cgrp_version_new_barrier(rkcg);
+
+	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+                     "Group \"%s\": unassigning %d partition(s) (v%"PRId32")",
+                     rkcg->rkcg_group_id->str, old_assignment->cnt,
+		     rkcg->rkcg_version);
+
+        if (rkcg->rkcg_rk->rk_conf.offset_store_method ==
+            RD_KAFKA_OFFSET_METHOD_BROKER &&
+	    rkcg->rkcg_rk->rk_conf.enable_auto_commit) {
+                /* Commit all offsets for all assigned partitions to broker */
+                rd_kafka_cgrp_assigned_offsets_commit(rkcg, old_assignment,
+                                                      "unassign");
+        }
+
+        for (i = 0 ; i < old_assignment->cnt ; i++) {
+                rd_kafka_topic_partition_t *rktpar;
+                shptr_rd_kafka_toppar_t *s_rktp;
+                rd_kafka_toppar_t *rktp;
+
+                rktpar = &old_assignment->elems[i];
+                s_rktp = rktpar->_private;
+                rktp = rd_kafka_toppar_s2i(s_rktp);
+
+                if (rktp->rktp_assigned) {
+                        rd_kafka_toppar_op_fetch_stop(
+				rktp, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0));
+                        rkcg->rkcg_wait_unassign_cnt++;
+                }
+
+                rd_kafka_toppar_lock(rktp);
+                rd_kafka_toppar_desired_del(rktp);
+                rd_kafka_toppar_unlock(rktp);
+        }
+
+	/* Resume partition consumption. */
+	rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 0/*resume*/,
+				      RD_KAFKA_TOPPAR_F_LIB_PAUSE,
+                                      old_assignment);
+
+        rd_kafka_topic_partition_list_destroy(old_assignment);
+
+        rd_kafka_cgrp_check_unassign_done(rkcg, "unassign");
+
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * Set new atomic partition assignment
+ * May update \p assignment but will not hold on to it.
+ */
+static void
+rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
+                      rd_kafka_topic_partition_list_t *assignment) {
+        int i;
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+                     "Group \"%s\": new assignment of %d partition(s) "
+                     "in join state %s",
+                     rkcg->rkcg_group_id->str,
+                     assignment ? assignment->cnt : 0,
+                     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+        /* Get toppar object for each partition.
+         * This is to make sure the rktp stays alive during unassign(). */
+        for (i = 0 ; assignment && i < assignment->cnt ; i++) {
+                rd_kafka_topic_partition_t *rktpar;
+                shptr_rd_kafka_toppar_t *s_rktp;
+
+                rktpar = &assignment->elems[i];
+
+                /* Use existing toppar if set */
+                if (rktpar->_private)
+                        continue;
+
+                s_rktp = rd_kafka_toppar_get2(rkcg->rkcg_rk,
+                                              rktpar->topic,
+                                              rktpar->partition,
+                                              0/*no-ua*/, 1/*create-on-miss*/);
+                if (s_rktp)
+                        rktpar->_private = s_rktp;
+        }
+
+        rd_kafka_cgrp_version_new_barrier(rkcg);
+
+        rd_kafka_wrlock(rkcg->rkcg_rk);
+        rkcg->rkcg_c.assignment_size = assignment ? assignment->cnt : 0;
+        rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+
+        /* Remove existing assignment (async operation) */
+	if (rkcg->rkcg_assignment)
+		rd_kafka_cgrp_unassign(rkcg);
+
+        rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+                     "Group \"%s\": assigning %d partition(s) in join state %s",
+                     rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0,
+                     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+
+	if (assignment) {
+		rkcg->rkcg_assignment =
+			rd_kafka_topic_partition_list_copy(assignment);
+
+                /* Mark partition(s) as desired */
+                for (i = 0 ; i < rkcg->rkcg_assignment->cnt ; i++) {
+                        rd_kafka_topic_partition_t *rktpar =
+                                &rkcg->rkcg_assignment->elems[i];
+                        shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
+                        rd_kafka_toppar_t *rktp =
+                                rd_kafka_toppar_s2i(s_rktp);
+                        rd_kafka_toppar_lock(rktp);
+                        rd_kafka_toppar_desired_add0(rktp);
+                        rd_kafka_toppar_unlock(rktp);
+                }
+        }
+
+        if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
+                return;
+
+        rd_dassert(rkcg->rkcg_wait_unassign_cnt == 0);
+
+        rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
+
+        if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) && rkcg->rkcg_assignment) {
+                /* No existing assignment that needs to be decommissioned,
+                 * start partition fetchers right away */
+                rd_kafka_cgrp_partitions_fetch_start(
+                        rkcg, rkcg->rkcg_assignment, 0);
+        }
+}
+
+
+
+
+/**
+ * Handle a rebalance-triggered partition assignment.
+ *
+ * If a rebalance_cb has been registered we enqueue an op for the app
+ * and let the app perform the actual assign() call.
+ * Otherwise we assign() directly from here.
+ *
+ * This provides the most flexibility, allowing the app to perform any
+ * operation it seem fit (e.g., offset writes or reads) before actually
+ * updating the assign():ment.
+ */
+static void
+rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg,
+				 rd_kafka_topic_partition_list_t *assignment) {
+
+	rd_kafka_rebalance_op(rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+			      assignment, "new assignment");
+}
+
+
+/**
+ * Handle HeartbeatResponse errors.
+ *
+ * If an IllegalGeneration error code is returned in the
+ * HeartbeatResponse, it indicates that the co-ordinator has
+ * initiated a rebalance. The consumer then stops fetching data,
+ * commits offsets and sends a JoinGroupRequest to it's co-ordinator
+ * broker */
+void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
+					   rd_kafka_resp_err_t err) {
+
+
+	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
+		     "Group \"%s\" heartbeat error response in "
+		     "state %s (join state %s, %d partition(s) assigned): %s",
+		     rkcg->rkcg_group_id->str,
+		     rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+		     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+		     rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0,
+		     rd_kafka_err2str(err));
+
+	if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
+		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
+			     "Heartbeat response: discarding outdated "
+			     "request (now in join-state %s)",
+			     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+		return;
+	}
+
+	switch (err)
+	{
+	case RD_KAFKA_RESP_ERR__DESTROY:
+		/* quick cleanup */
+		break;
+	case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
+	case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
+	case RD_KAFKA_RESP_ERR__TRANSPORT:
+		/* Remain in joined state and keep querying for coordinator */
+		rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0);
+		break;
+
+	case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
+		rd_kafka_cgrp_set_member_id(rkcg, "");
+	case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
+	case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
+	default:
+                /* Just revert to INIT state if join state is active. */
+                if (rkcg->rkcg_join_state <
+                    RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB ||
+                    rkcg->rkcg_join_state ==
+                    RD_KAFKA_CGRP

<TRUNCATED>