You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/11 01:03:03 UTC

[kafka] branch 2.6 updated: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout (#8834)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 16ec179  KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout (#8834)
16ec179 is described below

commit 16ec1793d53700623c9cb43e711f585aafd44dd4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Sep 10 14:34:38 2020 -0700

    KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout (#8834)
    
    1. Split the consumer coordinator's REBALANCING state into PREPARING_REBALANCE and COMPLETING_REBALANCE. The first is when the join group request is sent, and the second is after the join group response is received. During the first state we should still not send hb since it shares the same socket with the join group request and the group coordinator has disabled timeout, however when we transit to the second state we should start sending hb in case leader's assign takes long time. Th [...]
    
    2. When deciding coordinator#timeToNextPoll, do not count in timeToNextHeartbeat if the state is in UNJOINED or PREPARING_REBALANCE since we would disable hb and hence its timer would not be updated.
    
    3. On the broker side, allow hb received during PREPARING_REBALANCE, return NONE error code instead of REBALANCE_IN_PROGRESS. However on client side, we still need to ignore REBALANCE_IN_PROGRESS if state is COMPLETING_REBALANCE in case it is talking to an old versioned broker.
    
    4. Piggy-backing a log4j improvement on the broker coordinator for triggering rebalance reason, as I found it a bit blurred during the investigation. Also subsumed #9038 with log4j improvements.
    
    The tricky part for allowing hb during COMPLETING_REBALANCE is in two parts: 1) before the sync-group response is received, a hb response may have reset the generation; also after the sync-group response but before the callback is triggered, a hb response can still reset the generation, we need to handle both cases by checking the generation / state. 2) with the hb thread enabled, the sync-group request may be sent by the hb thread even if the caller thread did not call poll yet.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Boyang Chen <bo...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  15 +-
 .../consumer/internals/AbstractCoordinator.java    | 205 +++++++++++----------
 .../consumer/internals/ConsumerCoordinator.java    |  19 +-
 .../clients/consumer/internals/Heartbeat.java      |  13 ++
 .../java/org/apache/kafka/clients/MockClient.java  |   8 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  13 +-
 .../internals/AbstractCoordinatorTest.java         |  94 +++++-----
 .../internals/ConsumerCoordinatorTest.java         |  16 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |  23 ++-
 .../coordinator/group/GroupCoordinatorTest.scala   |   2 +-
 10 files changed, 228 insertions(+), 180 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 33a2fbb..3a7339d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1257,13 +1257,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-    /**
-     * Visible for testing
-     */
-    boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
-        return updateAssignmentMetadataIfNeeded(timer, true);
-    }
-
     boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
         if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
             return false;
@@ -1297,6 +1290,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             pollTimeout = retryBackoffMs;
         }
 
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
         Timer pollTimer = time.timer(pollTimeout);
         client.poll(pollTimer, () -> {
             // since a fetch might be completed by the background thread, we need this poll condition
@@ -2478,8 +2473,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
-    // Visible for testing
+    // Functions below are for testing only
     String getClientId() {
         return clientId;
     }
+
+    boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
+        return updateAssignmentMetadataIfNeeded(timer, true);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 619146e..a565c21 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -113,18 +113,23 @@ public abstract class AbstractCoordinator implements Closeable {
     public static final int JOIN_GROUP_TIMEOUT_LAPSE = 5000;
 
     protected enum MemberState {
-        UNJOINED,    // the client is not part of a group
-        REBALANCING, // the client has begun rebalancing
-        STABLE,      // the client has joined and is sending heartbeats
+        UNJOINED,             // the client is not part of a group
+        PREPARING_REBALANCE,  // the client has sent the join group request, but have not received response
+        COMPLETING_REBALANCE, // the client has received join group response, but have not received assignment
+        STABLE;               // the client has joined and is sending heartbeats
+
+        public boolean hasNotJoinedGroup() {
+            return equals(UNJOINED) || equals(PREPARING_REBALANCE);
+        }
     }
 
     private final Logger log;
-    private final GroupCoordinatorMetrics sensors;
     private final Heartbeat heartbeat;
+    private final GroupCoordinatorMetrics sensors;
     private final GroupRebalanceConfig rebalanceConfig;
-    protected final ConsumerNetworkClient client;
+
     protected final Time time;
-    protected MemberState state = MemberState.UNJOINED;
+    protected final ConsumerNetworkClient client;
 
     private Node coordinator = null;
     private boolean rejoinNeeded = true;
@@ -137,6 +142,8 @@ public abstract class AbstractCoordinator implements Closeable {
     private long lastRebalanceStartMs = -1L;
     private long lastRebalanceEndMs = -1L;
 
+    protected MemberState state = MemberState.UNJOINED;
+
 
     /**
      * Initialize the coordination manager.
@@ -326,8 +333,9 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     protected synchronized long timeToNextHeartbeat(long now) {
-        // if we have not joined the group, we don't need to send heartbeats
-        if (state == MemberState.UNJOINED)
+        // if we have not joined the group or we are preparing rebalance,
+        // we don't need to send heartbeats
+        if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
         return heartbeat.timeToNextHeartbeat(now);
     }
@@ -366,13 +374,8 @@ public abstract class AbstractCoordinator implements Closeable {
         }
     }
 
-    private synchronized void disableHeartbeatThread() {
-        if (heartbeatThread != null)
-            heartbeatThread.disable();
-    }
-
     private void closeHeartbeatThread() {
-        HeartbeatThread thread = null;
+        HeartbeatThread thread;
         synchronized (this) {
             if (heartbeatThread == null)
                 return;
@@ -391,6 +394,12 @@ public abstract class AbstractCoordinator implements Closeable {
     /**
      * Joins the group without starting the heartbeat thread.
      *
+     * If this function returns true, the state must always be in STABLE and heartbeat enabled.
+     * If this function returns false, the state can be in one of the following:
+     *  * UNJOINED: got error response but times out before being able to re-join, heartbeat disabled
+     *  * PREPARING_REBALANCE: not yet received join-group response before timeout, heartbeat disabled
+     *  * COMPLETING_REBALANCE: not yet received sync-group response before timeout, hearbeat enabled
+     *
      * Visible for testing.
      *
      * @param timer Timer bounding how long this method can block
@@ -424,16 +433,18 @@ public abstract class AbstractCoordinator implements Closeable {
 
             if (future.succeeded()) {
                 Generation generationSnapshot;
+                MemberState stateSnapshot;
 
                 // Generation data maybe concurrently cleared by Heartbeat thread.
                 // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
-                // and  shouldn't block heartbeat thread.
+                // and shouldn't block heartbeat thread.
                 // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment}
                 synchronized (AbstractCoordinator.this) {
                     generationSnapshot = this.generation;
+                    stateSnapshot = this.state;
                 }
 
-                if (generationSnapshot != Generation.NO_GENERATION) {
+                if (generationSnapshot != Generation.NO_GENERATION && stateSnapshot == MemberState.STABLE) {
                     // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                     ByteBuffer memberAssignment = future.value().duplicate();
 
@@ -446,14 +457,15 @@ public abstract class AbstractCoordinator implements Closeable {
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
-                    log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
+                    log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
+                         "the rebalance callback is triggered, marking this rebalance as failed and retry",
+                         generationSnapshot, stateSnapshot);
                     resetStateAndRejoin();
                     resetJoinGroupFuture();
-                    return false;
                 }
             } else {
                 final RuntimeException exception = future.exception();
-                log.info("Join group failed with {}", exception.toString());
+                log.info("Rebalance failed.", exception);
                 resetJoinGroupFuture();
                 if (exception instanceof UnknownMemberIdException ||
                     exception instanceof RebalanceInProgressException ||
@@ -463,6 +475,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 else if (!future.isRetriable())
                     throw exception;
 
+                resetStateAndRejoin();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
         }
@@ -473,22 +486,12 @@ public abstract class AbstractCoordinator implements Closeable {
         this.joinFuture = null;
     }
 
-    private synchronized void resetStateAndRejoin() {
-        rejoinNeeded = true;
-        state = MemberState.UNJOINED;
-    }
-
     private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
         // we store the join future in case we are woken up by the user after beginning the
         // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
         // to rejoin before the pending rebalance has completed.
         if (joinFuture == null) {
-            // fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
-            // Note that this must come after the call to onJoinPrepare since we must be able to continue
-            // sending heartbeats if that callback takes some time.
-            disableHeartbeatThread();
-
-            state = MemberState.REBALANCING;
+            state = MemberState.PREPARING_REBALANCE;
             // a rebalance can be triggered consecutively if the previous one failed,
             // in this case we would not update the start time.
             if (lastRebalanceStartMs == -1L)
@@ -497,40 +500,18 @@ public abstract class AbstractCoordinator implements Closeable {
             joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                 @Override
                 public void onSuccess(ByteBuffer value) {
-                    // handle join completion in the callback so that the callback will be invoked
-                    // even if the consumer is woken up before finishing the rebalance
-                    synchronized (AbstractCoordinator.this) {
-                        if (generation != Generation.NO_GENERATION) {
-                            log.info("Successfully joined group with generation {}", generation.generationId);
-                            state = MemberState.STABLE;
-                            rejoinNeeded = false;
-                            // record rebalance latency
-                            lastRebalanceEndMs = time.milliseconds();
-                            sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
-                            lastRebalanceStartMs = -1L;
-
-                            if (heartbeatThread != null)
-                                heartbeatThread.enable();
-                        } else {
-                            log.info("Generation data was cleared by heartbeat thread. Rejoin failed.");
-                            recordRebalanceFailure();
-                        }
-                    }
+                    // do nothing since all the handler logic are in SyncGroupResponseHandler already
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
                     // we handle failures below after the request finishes. if the join completes
-                    // after having been woken up, the exception is ignored and we will rejoin
+                    // after having been woken up, the exception is ignored and we will rejoin;
+                    // this can be triggered when either join or sync request failed
                     synchronized (AbstractCoordinator.this) {
-                        recordRebalanceFailure();
+                        sensors.failedRebalanceSensor.record();
                     }
                 }
-
-                private void recordRebalanceFailure() {
-                    state = MemberState.UNJOINED;
-                    sensors.failedRebalanceSensor.record();
-                }
             });
         }
         return joinFuture;
@@ -590,14 +571,24 @@ public abstract class AbstractCoordinator implements Closeable {
                     sensors.joinSensor.record(response.requestLatencyMs());
 
                     synchronized (AbstractCoordinator.this) {
-                        if (state != MemberState.REBALANCING) {
+                        if (state != MemberState.PREPARING_REBALANCE) {
                             // if the consumer was woken up before a rebalance completes, we may have already left
                             // the group. In this case, we do not want to continue with the sync group.
                             future.raise(new UnjoinedGroupException());
                         } else {
+                            state = MemberState.COMPLETING_REBALANCE;
+
+                            // we only need to enable heartbeat thread whenever we transit to
+                            // COMPLETING_REBALANCE state since we always transit from this state to STABLE
+                            if (heartbeatThread != null)
+                                heartbeatThread.enable();
+
                             AbstractCoordinator.this.generation = new Generation(
                                 joinResponse.data().generationId(),
                                 joinResponse.data().memberId(), joinResponse.data().protocolName());
+
+                            log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
+
                             if (joinResponse.isLeader()) {
                                 onJoinLeader(joinResponse).chain(future);
                             } else {
@@ -652,10 +643,10 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.MEMBER_ID_REQUIRED) {
                 // Broker requires a concrete member id to be allowed to join the group. Update member id
                 // and send another join group request in next cycle.
+                String memberId = joinResponse.data().memberId();
+                log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId);
                 synchronized (AbstractCoordinator.this) {
-                    AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
-                            joinResponse.data().memberId(), null);
-                    AbstractCoordinator.this.resetStateAndRejoin();
+                    AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
                 }
                 future.raise(error);
             } else {
@@ -736,12 +727,41 @@ public abstract class AbstractCoordinator implements Closeable {
                     log.error("SyncGroup failed due to inconsistent Protocol Type, received {} but expected {}",
                         syncResponse.data.protocolType(), protocolType());
                     future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
-                } else if (isProtocolNameInconsistent(ApiKeys.SYNC_GROUP, syncResponse.data.protocolName())) {
-                    future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                 } else {
                     log.debug("Received successful SyncGroup response: {}", syncResponse);
                     sensors.syncSensor.record(response.requestLatencyMs());
-                    future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
+
+                    synchronized (AbstractCoordinator.this) {
+                        if (generation != Generation.NO_GENERATION && state == MemberState.COMPLETING_REBALANCE) {
+                            // check protocol name only if the generation is not reset
+                            final String protocolName = syncResponse.data.protocolName();
+                            final boolean protocolNameInconsistent = protocolName != null &&
+                                !protocolName.equals(generation.protocolName);
+
+                            if (protocolNameInconsistent) {
+                                log.error("SyncGroup failed due to inconsistent Protocol Name, received {} but expected {}",
+                                    protocolName, generation.protocolName);
+
+                                future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
+                            } else {
+                                log.info("Successfully synced group in generation {}", generation);
+                                state = MemberState.STABLE;
+                                rejoinNeeded = false;
+                                // record rebalance latency
+                                lastRebalanceEndMs = time.milliseconds();
+                                sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
+                                lastRebalanceStartMs = -1L;
+
+                                future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
+                            }
+                        } else {
+                            log.info("Generation data was cleared by heartbeat thread as {} and state is now {} before " +
+                                "received SyncGroup response, marking this rebalance as failed and retry",
+                                generation, state);
+                            // use ILLEGAL_GENERATION error code to let it retry immediately
+                            future.raise(Errors.ILLEGAL_GENERATION);
+                        }
+                    }
                 }
             } else {
                 requestRejoin();
@@ -902,35 +922,33 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     protected synchronized boolean rebalanceInProgress() {
-        return this.state == MemberState.REBALANCING;
+        return this.state == MemberState.PREPARING_REBALANCE || this.state == MemberState.COMPLETING_REBALANCE;
     }
 
     protected synchronized String memberId() {
         return generation.memberId;
     }
 
-    private synchronized void resetGeneration() {
-        rejoinNeeded = true;
+    private synchronized void resetState() {
+        state = MemberState.UNJOINED;
         generation = Generation.NO_GENERATION;
     }
 
+    private synchronized void resetStateAndRejoin() {
+        resetState();
+        rejoinNeeded = true;
+    }
+
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
         log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);
 
-        // only reset the state to un-joined when it is not already in rebalancing
-        if (state != MemberState.REBALANCING)
-            state = MemberState.UNJOINED;
-
-        resetGeneration();
+        resetState();
     }
 
     synchronized void resetGenerationOnLeaveGroup() {
         log.debug("Resetting generation due to consumer pro-actively leaving the group");
 
-        // always set the state to un-joined
-        state = MemberState.UNJOINED;
-
-        resetGeneration();
+        resetStateAndRejoin();
     }
 
     public synchronized void requestRejoin() {
@@ -941,19 +959,6 @@ public abstract class AbstractCoordinator implements Closeable {
         return protocolType != null && !protocolType.equals(protocolType());
     }
 
-    private boolean isProtocolNameInconsistent(ApiKeys key, String protocolName) {
-        final Generation currentGeneration = generation();
-        final boolean protocolNameInconsistent = protocolName != null &&
-            currentGeneration != Generation.NO_GENERATION &&
-            !protocolName.equals(currentGeneration.protocolName);
-
-        if (protocolNameInconsistent) {
-            log.error("{} failed due to inconsistent Protocol Name, received {} but expected {}",
-                key, protocolName, currentGeneration.protocolName);
-        }
-        return protocolNameInconsistent;
-    }
-
     /**
      * Close the coordinator, waiting if needed to send LeaveGroup.
      */
@@ -1069,6 +1074,7 @@ public abstract class AbstractCoordinator implements Closeable {
         public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
             sensors.heartbeatSensor.record(response.requestLatencyMs());
             Errors error = heartbeatResponse.error();
+
             if (error == Errors.NONE) {
                 log.debug("Received successful Heartbeat response");
                 future.complete(null);
@@ -1079,9 +1085,16 @@ public abstract class AbstractCoordinator implements Closeable {
                 markCoordinatorUnknown();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                log.info("Attempt to heartbeat failed since group is rebalancing");
-                requestRejoin();
-                future.raise(error);
+                // since we may be sending the request during rebalance, we should check
+                // this case and ignore the REBALANCE_IN_PROGRESS error
+                if (state == MemberState.STABLE) {
+                    log.info("Attempt to heartbeat failed since group is rebalancing");
+                    requestRejoin();
+                    future.raise(error);
+                } else {
+                    log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
+                    future.complete(null);
+                }
             } else if (error == Errors.ILLEGAL_GENERATION ||
                        error == Errors.UNKNOWN_MEMBER_ID ||
                        error == Errors.FENCED_INSTANCE_ID) {
@@ -1260,7 +1273,7 @@ public abstract class AbstractCoordinator implements Closeable {
     private class HeartbeatThread extends KafkaThread implements AutoCloseable {
         private boolean enabled = false;
         private boolean closed = false;
-        private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
+        private final AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
 
         private HeartbeatThread() {
             super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true);
@@ -1311,9 +1324,10 @@ public abstract class AbstractCoordinator implements Closeable {
                             continue;
                         }
 
-                        if (state != MemberState.STABLE) {
-                            // the group is not stable (perhaps because we left the group or because the coordinator
-                            // kicked us out), so disable heartbeats and wait for the main thread to rejoin.
+                        // we do not need to heartbeat we are not part of a group yet;
+                        // also if we already have fatal error, the client will be
+                        // crashed soon, hence we do not need to continue heartbeating either
+                        if (state.hasNotJoinedGroup() || hasFailed()) {
                             disable();
                             continue;
                         }
@@ -1366,7 +1380,6 @@ public abstract class AbstractCoordinator implements Closeable {
                                         } else if (e instanceof FencedInstanceIdException) {
                                             log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
                                             heartbeatThread.failed.set(e);
-                                            heartbeatThread.disable();
                                         } else {
                                             heartbeat.failHeartbeat();
                                             // wake up the thread if it's sleeping to reschedule the heartbeat
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9a932f9..89c9d5e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -452,11 +452,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
-    // for testing
-    boolean poll(Timer timer) {
-        return poll(timer, true);
-    }
-
     /**
      * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
      * has joined the group (if it is using group management). This also handles periodic offset commits
@@ -511,6 +506,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
                 // if not wait for join group, we would just use a timer of 0
                 if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
+                    // since we may use a different timer in the callee, we'd still need
+                    // to update the original timer's current time after the call
+                    timer.update(time.milliseconds());
+
                     return false;
                 }
             }
@@ -532,7 +531,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     /**
-     * Return the time to the next needed invocation of {@link #poll(Timer)}.
+     * Return the time to the next needed invocation of {@link ConsumerNetworkClient#poll(Timer)}.
      * @param now current time in milliseconds
      * @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
      */
@@ -1213,7 +1212,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                             if (generationUnchanged()) {
                                 future.raise(error);
                             } else {
-                                if (ConsumerCoordinator.this.state == MemberState.REBALANCING) {
+                                if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
                                     future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
                                         "consumer member's old generation is fenced by its group instance id, it is possible that " +
                                         "this consumer has already participated another rebalance and got a new generation"));
@@ -1242,7 +1241,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
                             // only need to reset generation and re-join group if generation has not changed or we are not in rebalancing;
                             // otherwise only raise rebalance-in-progress error
-                            if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.REBALANCING) {
+                            if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
                                 future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
                                     "consumer member's generation is already stale, meaning it has already participated another rebalance and " +
                                     "got a new generation. You can try completing the rebalance by calling poll() and then retry commit again"));
@@ -1459,4 +1458,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     RebalanceProtocol getProtocol() {
         return protocol;
     }
+
+    boolean poll(Timer timer) {
+        return poll(timer, true);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 2e9a5ad..dfb9f85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
+import org.slf4j.Logger;
+
 /**
  * A helper class for managing the heartbeat to the coordinator
  */
@@ -30,6 +33,7 @@ public final class Heartbeat {
     private final Timer heartbeatTimer;
     private final Timer sessionTimer;
     private final Timer pollTimer;
+    private final Logger log;
 
     private volatile long lastHeartbeatSend = 0L;
     private volatile boolean heartbeatInFlight = false;
@@ -44,6 +48,9 @@ public final class Heartbeat {
         this.sessionTimer = time.timer(config.sessionTimeoutMs);
         this.maxPollIntervalMs = config.rebalanceTimeoutMs;
         this.pollTimer = time.timer(maxPollIntervalMs);
+
+        final LogContext logContext = new LogContext("[Heartbeat groupID=" + config.groupId + "] ");
+        this.log = logContext.logger(getClass());
     }
 
     private void update(long now) {
@@ -66,12 +73,18 @@ public final class Heartbeat {
         heartbeatInFlight = true;
         update(now);
         heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
+
+        if (log.isTraceEnabled()) {
+            log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
+        }
     }
 
     void failHeartbeat() {
         update(time.milliseconds());
         heartbeatInFlight = false;
         heartbeatTimer.reset(rebalanceConfig.retryBackoffMs);
+
+        log.trace("Heartbeat failed, reset the timer to {}ms remaining", heartbeatTimer.remainingMs());
     }
 
     void receiveHeartbeat() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6cfc4fd..38c0d84 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -312,6 +312,14 @@ public class MockClient implements KafkaClient {
         return this.requests;
     }
 
+    public Queue<ClientResponse> responses() {
+        return this.responses;
+    }
+
+    public Queue<FutureResponse> futureResponses() {
+        return this.futureResponses;
+    }
+
     public void respond(AbstractResponse response) {
         respond(response, false);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9fc54bf..8cfbfd6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -508,13 +508,17 @@ public class KafkaConsumerTest {
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
-        prepareRebalance(client, node, assignor, singletonList(tp0), null);
+        // Since we would enable the heartbeat thread after received join-response which could
+        // send the sync-group on behalf of the consumer if it is enqueued, we may still complete
+        // the rebalance and send out the fetch; in order to avoid it we do not prepare sync response here.
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, memberId, leaderId, Errors.NONE), coordinator);
 
         consumer.poll(Duration.ZERO);
 
-        // The underlying client should NOT get a fetch request
         final Queue<ClientRequest> requests = client.requests();
-        Assert.assertEquals(0, requests.size());
+        Assert.assertEquals(0, requests.stream().filter(request -> request.apiKey().equals(ApiKeys.FETCH)).count());
     }
 
     @SuppressWarnings("deprecation")
@@ -1253,9 +1257,6 @@ public class KafkaConsumerTest {
         time.sleep(heartbeatIntervalMs);
         TestUtils.waitForCondition(heartbeatReceived::get, "Heartbeat response did not occur within timeout.");
 
-        consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
-        assertTrue(heartbeatReceived.get());
-
         RuntimeException unsubscribeException = assertThrows(RuntimeException.class, consumer::unsubscribe);
         assertEquals(partitionLost + singleTopicPartition, unsubscribeException.getCause().getMessage());
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 331a6f3..d0a6575 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -450,6 +450,26 @@ public class AbstractCoordinatorTest {
                        && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
         }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName));
 
+        // let the retry to complete successfully to break out of the while loop
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
+            }
+            JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
+            return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
+        }, joinGroupFollowerResponse(1, memberId,
+                "memberid", Errors.NONE, PROTOCOL_TYPE));
+
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+
+            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body;
+            return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE)
+                    && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
+        }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME));
+
         // No exception shall be thrown as the generation is reset.
         coordinator.joinGroupIfNeeded(mockTime.timer(100L));
     }
@@ -531,7 +551,7 @@ public class AbstractCoordinatorTest {
 
         final AbstractCoordinator.Generation currGen = coordinator.generation();
 
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
 
         TestUtils.waitForCondition(() -> {
@@ -571,7 +591,7 @@ public class AbstractCoordinatorTest {
 
         final AbstractCoordinator.Generation currGen = coordinator.generation();
 
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
 
         TestUtils.waitForCondition(() -> {
@@ -605,6 +625,25 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testHeartbeatSentWhenCompletingRebalance() throws Exception {
+        setupCoordinator();
+        joinGroup();
+
+        final AbstractCoordinator.Generation currGen = coordinator.generation();
+
+        coordinator.setNewState(AbstractCoordinator.MemberState.COMPLETING_REBALANCE);
+
+        // the heartbeat should be sent out during a rebalance
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+        TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000,
+                "The heartbeat request was not sent");
+        assertTrue(coordinator.heartbeat().hasInflight());
+
+        mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));
+        assertEquals(currGen, coordinator.generation());
+    }
+
+    @Test
     public void testHeartbeatIllegalGenerationResponseWithOldGeneration() throws InterruptedException {
         setupCoordinator();
         joinGroup();
@@ -673,7 +712,7 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
-    public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws InterruptedException {
+    public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws InterruptedException {
         setupCoordinator();
         joinGroup();
 
@@ -687,8 +726,7 @@ public class AbstractCoordinatorTest {
 
         assertTrue(coordinator.heartbeat().hasInflight());
 
-        // set the client to re-join group
-        mockClient.respond(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
+        mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));
 
         coordinator.requestRejoin();
 
@@ -699,8 +737,8 @@ public class AbstractCoordinatorTest {
             2000,
             "The heartbeat response was not received");
 
-        // the generation should be reset but the rebalance should still proceed
-        assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+        // the generation would not be reset while the rebalance is in progress
+        assertEquals(currGen, coordinator.generation());
 
         mockClient.respond(joinGroupFollowerResponse(currGen.generationId, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
@@ -1099,44 +1137,6 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
-    public void testWakeupAfterSyncGroupSent() throws Exception {
-        setupCoordinator();
-
-        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            private int invocations = 0;
-            @Override
-            public boolean matches(AbstractRequest body) {
-                invocations++;
-                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
-                if (isSyncGroupRequest && invocations == 1)
-                    // simulate wakeup after the request sent
-                    throw new WakeupException();
-                return isSyncGroupRequest;
-            }
-        }, syncGroupResponse(Errors.NONE));
-        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
-
-        try {
-            coordinator.ensureActiveGroup();
-            fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException ignored) {
-        }
-
-        assertEquals(1, coordinator.onJoinPrepareInvokes);
-        assertEquals(0, coordinator.onJoinCompleteInvokes);
-        assertFalse(heartbeatReceived.get());
-
-        coordinator.ensureActiveGroup();
-
-        assertEquals(1, coordinator.onJoinPrepareInvokes);
-        assertEquals(1, coordinator.onJoinCompleteInvokes);
-
-        awaitFirstHeartbeat(heartbeatReceived);
-    }
-
-    @Test
     public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
         setupCoordinator();
 
@@ -1149,8 +1149,8 @@ public class AbstractCoordinatorTest {
                 invocations++;
                 boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                 if (isSyncGroupRequest && invocations == 1)
-                    // simulate wakeup after the request sent
-                    throw new WakeupException();
+                    // wakeup after the request returns
+                    consumerClient.wakeup();
                 return isSyncGroupRequest;
             }
         }, syncGroupResponse(Errors.NONE));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7b2495c..ae2ab8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2119,7 +2119,7 @@ public class ConsumerCoordinatorTest {
             "memberId-new",
             null);
         coordinator.setNewGeneration(newGen);
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
 
         assertTrue(consumerClient.poll(future, time.timer(30000)));
         assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2174,7 +2174,7 @@ public class ConsumerCoordinatorTest {
             "memberId-new",
             null);
         coordinator.setNewGeneration(newGen);
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
 
         assertTrue(consumerClient.poll(future, time.timer(30000)));
         assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2218,7 +2218,7 @@ public class ConsumerCoordinatorTest {
             "memberId",
             null);
         coordinator.setNewGeneration(currGen);
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
         RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
@@ -2818,14 +2818,18 @@ public class ConsumerCoordinatorTest {
             res = coordinator.joinGroupIfNeeded(time.timer(1));
 
             assertFalse(res);
+
+            // should have retried sending a join group request already
             assertFalse(client.hasPendingResponses());
-            assertFalse(client.hasInFlightRequests());
+            assertEquals(1, client.inFlightRequestCount());
+
+            System.out.println(client.requests());
 
             // Retry join should then succeed
-            client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
+            client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
             client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-            res = coordinator.joinGroupIfNeeded(time.timer(2));
+            res = coordinator.joinGroupIfNeeded(time.timer(3000));
 
             assertTrue(res);
             assertFalse(client.hasPendingResponses());
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 00dd09b..aa957cd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int,
 
           group.currentState match {
             case PreparingRebalance =>
-              updateMemberAndRebalance(group, member, protocols, responseCallback)
+              updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback)
 
             case CompletingRebalance =>
               if (member.matches(protocols)) {
@@ -308,16 +308,18 @@ class GroupCoordinator(val brokerId: Int,
                   error = Errors.NONE))
               } else {
                 // member has changed metadata, so force a rebalance
-                updateMemberAndRebalance(group, member, protocols, responseCallback)
+                updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
               }
 
             case Stable =>
               val member = group.get(memberId)
-              if (group.isLeader(memberId) || !member.matches(protocols)) {
-                // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
-                // The latter allows the leader to trigger rebalances for changes affecting assignment
+              if (group.isLeader(memberId)) {
+                // force a rebalance if the leader sends JoinGroup;
+                // This allows the leader to trigger rebalances for changes affecting assignment
                 // which do not affect the member metadata (such as topic metadata changes for the consumer)
-                updateMemberAndRebalance(group, member, protocols, responseCallback)
+                updateMemberAndRebalance(group, member, protocols, s"leader ${member.memberId} re-joining group during ${group.currentState}", responseCallback)
+              } else if (!member.matches(protocols)) {
+                updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
               } else {
                 // for followers with no actual change to their metadata, just return group information
                 // for the current generation which will allow them to issue SyncGroup
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
             case CompletingRebalance =>
-                responseCallback(Errors.REBALANCE_IN_PROGRESS)
+              // consumers may start sending heartbeat after join-group response, in which case
+              // we should treat them as normal hb request and reset the timer
+              val member = group.get(memberId)
+              completeAndScheduleNextHeartbeatExpiration(group, member)
+              responseCallback(Errors.NONE)
 
             case PreparingRebalance =>
                 val member = group.get(memberId)
@@ -1071,9 +1077,10 @@ class GroupCoordinator(val brokerId: Int,
   private def updateMemberAndRebalance(group: GroupMetadata,
                                        member: MemberMetadata,
                                        protocols: List[(String, Array[Byte])],
+                                       reason: String,
                                        callback: JoinCallback): Unit = {
     group.updateMember(member, protocols, callback)
-    maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}")
+    maybePrepareRebalance(group, reason)
   }
 
   private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 9791cd6..875a9a1 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1719,7 +1719,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedMemberId, 1)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
+    assertEquals(Errors.NONE, heartbeatResult)
   }
 
   @Test