You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/08 03:48:01 UTC

[kafka] branch trunk updated: KAFKA-9801: Still trigger rebalance when static member joins in CompletingRebalance phase (#8405)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 94ef25a  KAFKA-9801: Still trigger rebalance when static member joins in CompletingRebalance phase (#8405)
94ef25a is described below

commit 94ef25ab9128030de8692b6e690787ec8012830a
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Apr 7 15:29:15 2020 -0700

    KAFKA-9801: Still trigger rebalance when static member joins in CompletingRebalance phase (#8405)
    
    Fix the direct cause of the observed issue on the client side: when heartbeat getting errors and resetting generation, we only need to set it to UNJOINED when it was not already in REBALANCING; otherwise, the join-group handler would throw the retriable UnjoinedGroupException to force the consumer to re-send join group unnecessarily.
    
    Fix the root cause of the issue on the broker side: we should still trigger rebalance when static member joins in CompletingRebalance phase; otherwise the member.ids would be changed when the assignment is received from the leader, hence causing the new member.id's assignment to be empty.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    |  39 +++++---
 .../consumer/internals/ConsumerCoordinator.java    |   7 ++
 .../clients/consumer/internals/Heartbeat.java      |  35 +++++---
 .../internals/AbstractCoordinatorTest.java         |  46 ++++++++++
 .../kafka/coordinator/group/GroupCoordinator.scala | 100 ++++++++++++---------
 .../kafka/coordinator/group/GroupMetadata.scala    |   8 +-
 .../kafka/api/PlaintextConsumerTest.scala          |   7 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |  26 +++---
 8 files changed, 183 insertions(+), 85 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index af67ab5..66da319 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -451,8 +451,9 @@ public abstract class AbstractCoordinator implements Closeable {
                     return false;
                 }
             } else {
-                resetJoinGroupFuture();
                 final RuntimeException exception = future.exception();
+                log.info("Join group failed with {}", exception.toString());
+                resetJoinGroupFuture();
                 if (exception instanceof UnknownMemberIdException ||
                     exception instanceof RebalanceInProgressException ||
                     exception instanceof IllegalGenerationException ||
@@ -889,17 +890,26 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private synchronized void resetGeneration() {
+        this.rejoinNeeded = true;
         this.generation = Generation.NO_GENERATION;
-        resetStateAndRejoin();
     }
 
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
-        log.debug("Resetting generation after encountering {} from {} response", error, api);
+        log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);
+
+        // only reset the state to un-joined when it is not already in rebalancing
+        if (state != MemberState.REBALANCING)
+            state = MemberState.UNJOINED;
+
         resetGeneration();
     }
 
     synchronized void resetGenerationOnLeaveGroup() {
         log.debug("Resetting generation due to consumer pro-actively leaving the group");
+
+        // always set the state to un-joined
+        state = MemberState.UNJOINED;
+
         resetGeneration();
     }
 
@@ -1014,7 +1024,8 @@ public abstract class AbstractCoordinator implements Closeable {
 
     // visible for testing
     synchronized RequestFuture<Void> sendHeartbeatRequest() {
-        log.debug("Sending Heartbeat request to coordinator {}", coordinator);
+        log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}",
+            generation.generationId, generation.memberId, coordinator);
         HeartbeatRequest.Builder requestBuilder =
                 new HeartbeatRequest.Builder(new HeartbeatRequestData()
                         .setGroupId(rebalanceConfig.groupId)
@@ -1022,10 +1033,16 @@ public abstract class AbstractCoordinator implements Closeable {
                         .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                         .setGenerationId(this.generation.generationId));
         return client.send(coordinator, requestBuilder)
-                .compose(new HeartbeatResponseHandler());
+                .compose(new HeartbeatResponseHandler(generation));
     }
 
     private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+        private final Generation sentGeneration;
+
+        private HeartbeatResponseHandler(final Generation generation) {
+            this.sentGeneration = generation;
+        }
+
         @Override
         public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
             sensors.heartbeatSensor.record(response.requestLatencyMs());
@@ -1035,7 +1052,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 future.complete(null);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
-                log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid.",
+                log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid",
                         coordinator());
                 markCoordinatorUnknown();
                 future.raise(error);
@@ -1044,14 +1061,14 @@ public abstract class AbstractCoordinator implements Closeable {
                 requestRejoin();
                 future.raise(error);
             } else if (error == Errors.ILLEGAL_GENERATION) {
-                log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
+                log.info("Attempt to heartbeat failed since generation {} is not current", sentGeneration.generationId);
                 resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
                 future.raise(error);
             } else if (error == Errors.FENCED_INSTANCE_ID) {
                 log.error("Received fatal exception: group.instance.id gets fenced");
                 future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                log.info("Attempt to heartbeat failed since member id {} is not valid.", generation.memberId);
+                log.info("Attempt to heartbeat failed since member id {} is not valid.", sentGeneration.memberId);
                 resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
                 future.raise(error);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
@@ -1293,8 +1310,8 @@ public abstract class AbstractCoordinator implements Closeable {
                             AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
                         } else {
                             heartbeat.sentHeartbeat(now);
-
-                            sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
+                            final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
+                            heartbeatFuture.addListener(new RequestFutureListener<Void>() {
                                 @Override
                                 public void onSuccess(Void value) {
                                     synchronized (AbstractCoordinator.this) {
@@ -1432,6 +1449,4 @@ public abstract class AbstractCoordinator implements Closeable {
     final boolean hasValidMemberId() {
         return generation != Generation.NO_GENERATION && generation.hasMemberId();
     }
-
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ab49837..b0f476a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -349,6 +349,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
 
+        // should at least encode the short version
+        if (assignmentBuffer.remaining() < 2)
+            throw new IllegalStateException("There is insufficient bytes available to read assignment from the sync-group response (" +
+                "actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; " +
+                "it is possible that the leader's assign function is buggy and did not return any assignment for this member, " +
+                "or because static member is configured and the protocol is buggy hence did not get the assignment for this member");
+
         Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
 
         Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 4d19ef4..2e9a5ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -32,6 +32,7 @@ public final class Heartbeat {
     private final Timer pollTimer;
 
     private volatile long lastHeartbeatSend = 0L;
+    private volatile boolean heartbeatInFlight = false;
 
     public Heartbeat(GroupRebalanceConfig config,
                      Time time) {
@@ -56,60 +57,66 @@ public final class Heartbeat {
         pollTimer.reset(maxPollIntervalMs);
     }
 
-    public void sentHeartbeat(long now) {
-        this.lastHeartbeatSend = now;
+    boolean hasInflight() {
+        return heartbeatInFlight;
+    }
+
+    void sentHeartbeat(long now) {
+        lastHeartbeatSend = now;
+        heartbeatInFlight = true;
         update(now);
         heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
     }
 
-    public void failHeartbeat() {
+    void failHeartbeat() {
         update(time.milliseconds());
+        heartbeatInFlight = false;
         heartbeatTimer.reset(rebalanceConfig.retryBackoffMs);
     }
 
-    public void receiveHeartbeat() {
+    void receiveHeartbeat() {
         update(time.milliseconds());
+        heartbeatInFlight = false;
         sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
     }
 
-    public boolean shouldHeartbeat(long now) {
+    boolean shouldHeartbeat(long now) {
         update(now);
         return heartbeatTimer.isExpired();
     }
     
-    public long lastHeartbeatSend() {
+    long lastHeartbeatSend() {
         return this.lastHeartbeatSend;
     }
 
-    public long timeToNextHeartbeat(long now) {
+    long timeToNextHeartbeat(long now) {
         update(now);
         return heartbeatTimer.remainingMs();
     }
 
-    public boolean sessionTimeoutExpired(long now) {
+    boolean sessionTimeoutExpired(long now) {
         update(now);
         return sessionTimer.isExpired();
     }
 
-    public void resetTimeouts() {
+    void resetTimeouts() {
         update(time.milliseconds());
         sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
         pollTimer.reset(maxPollIntervalMs);
         heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
     }
 
-    public void resetSessionTimeout() {
+    void resetSessionTimeout() {
         update(time.milliseconds());
         sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
     }
 
-    public boolean pollTimeoutExpired(long now) {
+    boolean pollTimeoutExpired(long now) {
         update(now);
         return pollTimer.isExpired();
     }
 
-    public long lastPollTime() {
+    long lastPollTime() {
         return pollTimer.currentTimeMs();
     }
-
-}
\ No newline at end of file
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index e2315cd..eb88e3a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -466,6 +466,52 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws InterruptedException {
+        setupCoordinator();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+        final int generation = 1;
+
+        mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+        coordinator.ensureActiveGroup();
+
+        final AbstractCoordinator.Generation currGen = coordinator.generation();
+
+        // let the heartbeat request to send out a request
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+
+        TestUtils.waitForCondition(() -> coordinator.heartbeat().hasInflight(), 2000,
+            "The heartbeat request was not sent in time after 2000ms elapsed");
+
+        assertTrue(coordinator.heartbeat().hasInflight());
+
+        // set the client to re-join group
+        mockClient.respond(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
+
+        coordinator.requestRejoin();
+
+        TestUtils.waitForCondition(() -> {
+            coordinator.ensureActiveGroup(new MockTime(1L).timer(100L));
+            return !coordinator.heartbeat().hasInflight();
+        },
+            2000,
+            "The heartbeat response was not been received in time after 2000ms elapsed");
+
+        assertFalse(coordinator.heartbeat().hasInflight());
+
+        // the generation should be reset but the rebalance should still proceed
+        assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+
+        mockClient.respond(joinGroupFollowerResponse(generation, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+        coordinator.ensureActiveGroup();
+        assertEquals(currGen, coordinator.generation());
+    }
+
+    @Test
     public void testHeartbeatRequestWithFencedInstanceIdException() throws InterruptedException {
         setupCoordinator();
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index e4525de..772d51d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -202,41 +202,7 @@ class GroupCoordinator(val brokerId: Int,
         val newMemberId = group.generateMemberId(clientId, groupInstanceId)
 
         if (group.hasStaticMember(groupInstanceId)) {
-          val oldMemberId = group.getStaticMemberId(groupInstanceId)
-          info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " +
-            s"old member id $oldMemberId will be removed.")
-
-          val currentLeader = group.leaderOrNull
-          val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
-          // Heartbeat of old member id will expire without effect since the group no longer contains that member id.
-          // New heartbeat shall be scheduled with new member id.
-          completeAndScheduleNextHeartbeatExpiration(group, member)
-
-          val knownStaticMember = group.get(newMemberId)
-          group.updateMember(knownStaticMember, protocols, responseCallback)
-
-          group.currentState match {
-            case Stable | CompletingRebalance =>
-              info(s"Static member joins during ${group.currentState} stage will not trigger rebalance.")
-              group.maybeInvokeJoinCallback(member, JoinGroupResult(
-                members = List.empty,
-                memberId = newMemberId,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                // We want to avoid current leader performing trivial assignment while the group
-                // is in stable/awaiting sync stage, because the new assignment in leader's next sync call
-                // won't be broadcast by a stable/awaiting sync group. This could be guaranteed by
-                // always returning the old leader id so that the current leader won't assume itself
-                // as a leader based on the returned message, since the new member.id won't match
-                // returned leader id, therefore no assignment will be performed.
-                leaderId = currentLeader,
-                error = Errors.NONE))
-            case Empty | Dead =>
-              throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " +
-                s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.")
-            case PreparingRebalance =>
-          }
+          updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
         } else if (requireKnownMemberId) {
             // If member id required (dynamic membership), register the member in the pending member list
             // and send back a response to call for another join group request with allocated member id.
@@ -246,7 +212,7 @@ class GroupCoordinator(val brokerId: Int,
           addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
           responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
         } else {
-          debug(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
+          info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
             s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
           addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
             clientId, clientHost, protocolType, protocols, group, responseCallback)
@@ -287,7 +253,7 @@ class GroupCoordinator(val brokerId: Int,
         }
       } else {
         val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
-        if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+        if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
           // given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
           responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
         } else if (!group.has(memberId) || groupInstanceIdNotFound) {
@@ -397,7 +363,7 @@ class GroupCoordinator(val brokerId: Int,
         // coordinator OR the group is in a transient unstable phase. Let the member retry
         // finding the correct coordinator and rejoin.
         responseCallback(SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE))
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "sync-group")) {
         responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID))
       } else if (!group.has(memberId)) {
         responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
@@ -483,7 +449,7 @@ class GroupCoordinator(val brokerId: Int,
                   val memberId = leavingMember.memberId
                   val groupInstanceId = Option(leavingMember.groupInstanceId)
                   if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
-                    && group.isStaticMemberFenced(memberId, groupInstanceId)) {
+                    && group.isStaticMemberFenced(memberId, groupInstanceId, "leave-group")) {
                     memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID)
                   } else if (group.isPendingMember(memberId)) {
                     if (groupInstanceId.isDefined) {
@@ -640,7 +606,7 @@ class GroupCoordinator(val brokerId: Int,
           // coordinator OR the group is in a transient unstable phase. Let the member retry
           // finding the correct coordinator and rejoin.
           responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
-        } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+        } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) {
           responseCallback(Errors.FENCED_INSTANCE_ID)
         } else if (!group.has(memberId)) {
           responseCallback(Errors.UNKNOWN_MEMBER_ID)
@@ -739,7 +705,7 @@ class GroupCoordinator(val brokerId: Int,
         // coordinator OR the group is in a transient unstable phase. Let the member retry
         // finding the correct coordinator and rejoin.
         responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) {
         responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
       } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
         // Enforce member id when it is set.
@@ -766,7 +732,7 @@ class GroupCoordinator(val brokerId: Int,
         // coordinator OR the group is in a transient unstable phase. Let the member retry
         // finding the correct coordinator and rejoin.
         responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "commit-offsets")) {
         responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
       } else if (generationId < 0 && group.is(Empty)) {
         // The group is only using Kafka to store offsets.
@@ -1025,7 +991,55 @@ class GroupCoordinator(val brokerId: Int,
     } else {
       group.removePendingMember(memberId)
     }
-    maybePrepareRebalance(group, s"Adding new member $memberId with group instanceid $groupInstanceId")
+    maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
+  }
+
+  private def updateStaticMemberAndRebalance(group: GroupMetadata,
+                                             newMemberId: String,
+                                             groupInstanceId: Option[String],
+                                             protocols: List[(String, Array[Byte])],
+                                             responseCallback: JoinCallback): Unit = {
+    val oldMemberId = group.getStaticMemberId(groupInstanceId)
+    info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " +
+      s"old member id $oldMemberId will be removed.")
+
+    val currentLeader = group.leaderOrNull
+    val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
+    // Heartbeat of old member id will expire without effect since the group no longer contains that member id.
+    // New heartbeat shall be scheduled with new member id.
+    completeAndScheduleNextHeartbeatExpiration(group, member)
+
+    val knownStaticMember = group.get(newMemberId)
+    group.updateMember(knownStaticMember, protocols, responseCallback)
+
+    group.currentState match {
+      case Stable =>
+        info(s"Static member joins during Stable stage will not trigger rebalance.")
+        group.maybeInvokeJoinCallback(member, JoinGroupResult(
+          members = List.empty,
+          memberId = newMemberId,
+          generationId = group.generationId,
+          protocolType = group.protocolType,
+          protocolName = group.protocolName,
+          // We want to avoid current leader performing trivial assignment while the group
+          // is in stable stage, because the new assignment in leader's next sync call
+          // won't be broadcast by a stable group. This could be guaranteed by
+          // always returning the old leader id so that the current leader won't assume itself
+          // as a leader based on the returned message, since the new member.id won't match
+          // returned leader id, therefore no assignment will be performed.
+          leaderId = currentLeader,
+          error = Errors.NONE))
+      case CompletingRebalance =>
+        // if the group is in after-sync stage, upon getting a new join-group of a known static member
+        // we should still trigger a new rebalance, since the old member may already be sent to the leader
+        // for assignment, and hence when the assignment gets back there would be a mismatch of the old member id
+        // with the new replaced member id. As a result the new member id would not get any assignment.
+        prepareRebalance(group, s"Updating metadata for static member ${member.memberId} with instance id $groupInstanceId")
+      case Empty | Dead =>
+        throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " +
+          s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.")
+      case PreparingRebalance =>
+    }
   }
 
   private def updateMemberAndRebalance(group: GroupMetadata,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 44b33d3..049dbef 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -385,11 +385,13 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     *   2. group stored member.id doesn't match with given member.id
     */
   def isStaticMemberFenced(memberId: String,
-                           groupInstanceId: Option[String]): Boolean = {
+                           groupInstanceId: Option[String],
+                           operation: String): Boolean = {
     if (hasStaticMember(groupInstanceId)
       && getStaticMemberId(groupInstanceId) != memberId) {
-        error(s"given member.id $memberId is identified as a known static member ${groupInstanceId.get}," +
-          s"but not matching the expected member.id ${getStaticMemberId(groupInstanceId)}")
+        error(s"given member.id $memberId is identified as a known static member ${groupInstanceId.get}, " +
+          s"but not matching the expected member.id ${getStaticMemberId(groupInstanceId)} during $operation, will " +
+          s"respond with instance fenced error")
         true
     } else
       false
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 0b3390e..5c7a52a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -173,7 +173,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @Test
   def testMaxPollIntervalMs(): Unit = {
-    this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000.toString)
+    this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString)
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString)
 
@@ -187,9 +187,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(1, listener.callsToAssigned)
     assertEquals(0, listener.callsToRevoked)
 
-    Thread.sleep(3500)
+    // after we extend longer than max.poll a rebalance should be triggered
+    // NOTE we need to have a relatively much larger value than max.poll to let heartbeat expired for sure
+    Thread.sleep(3000)
 
-    // we should fall out of the group and need to rebalance
     awaitRebalance(consumer, listener)
     assertEquals(2, listener.callsToAssigned)
     assertEquals(1, listener.callsToRevoked)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 55f0d1e..ed5ea0d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -616,10 +616,10 @@ class GroupCoordinatorTest {
       groupId,
       CompletingRebalance,
       Some(protocolType))
-    assertEquals(leaderJoinGroupResult.leaderId, leaderJoinGroupResult.memberId)
+    assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.memberId)
     assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.leaderId)
 
-    // Old member shall be getting a successful join group response.
+    // Old follower shall be getting a successful join group response.
     val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
     checkJoinGroupResult(oldFollowerJoinGroupResult,
       Errors.NONE,
@@ -629,31 +629,37 @@ class GroupCoordinatorTest {
       CompletingRebalance,
       Some(protocolType),
       expectedLeaderId = leaderJoinGroupResult.memberId)
+    assertEquals(rebalanceResult.followerId, oldFollowerJoinGroupResult.memberId)
+    assertEquals(rebalanceResult.leaderId, oldFollowerJoinGroupResult.leaderId)
+    assertTrue(getGroup(groupId).is(CompletingRebalance))
 
+    // Duplicate follower joins group with unknown member id will trigger member.id replacement,
+    // and will also trigger a rebalance under CompletingRebalance state; the old follower sync callback
+    // will return fenced exception while broker replaces the member identity with the duplicate follower joins.
     EasyMock.reset(replicaManager)
     val oldFollowerSyncGroupFuture = sendSyncGroupFollower(groupId, oldFollowerJoinGroupResult.generationId,
       oldFollowerJoinGroupResult.memberId, Some(protocolType), Some(protocolName), followerInstanceId)
-
-    // Duplicate follower joins group with unknown member id will trigger member.id replacement.
     EasyMock.reset(replicaManager)
     val duplicateFollowerJoinFuture =
       sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = followerInstanceId)
     timer.advanceClock(1)
-
-    // Old follower sync callback will return fenced exception while broker replaces the member identity.
     val oldFollowerSyncGroupResult = Await.result(oldFollowerSyncGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
     assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerSyncGroupResult.error)
+    assertTrue(getGroup(groupId).is(PreparingRebalance))
+
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+    timer.advanceClock(DefaultRebalanceTimeout + 1)
 
-    // Duplicate follower will get the same response as old follower.
     val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinFuture, Duration(1, TimeUnit.MILLISECONDS))
     checkJoinGroupResult(duplicateFollowerJoinGroupResult,
       Errors.NONE,
-      rebalanceResult.generation + 1,
-      Set.empty,
+      rebalanceResult.generation + 2,
+      Set(followerInstanceId),   // this follower will become the new leader, and hence it would have the member list
       groupId,
       CompletingRebalance,
       Some(protocolType),
-      expectedLeaderId = leaderJoinGroupResult.memberId)
+      expectedLeaderId = duplicateFollowerJoinGroupResult.memberId)
+    assertTrue(getGroup(groupId).is(CompletingRebalance))
   }
 
   @Test