You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/04 16:21:34 UTC

[kafka] branch 1.1 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 391d065  KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
391d065 is described below

commit 391d065a43ab3518014c3fcc661f2b57953435db
Author: Lincong Li <lc...@linkedin.com>
AuthorDate: Thu Oct 4 09:14:44 2018 -0700

    KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
    
    During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it.
    
    Author: Lincong Li <lc...@linkedin.com>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5556 from Lincong/remove_heartbeat_delayedOperation
    
    (cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea)
    Signed-off-by: Dong Lin <li...@gmail.com>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  1 +
 .../coordinator/group/GroupCoordinatorTest.scala   | 22 ++++++++++++++++++++--
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6cc0a41..5ffe6ba 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -774,6 +774,7 @@ class GroupCoordinator(val brokerId: Int,
     group.inLock {
       // remove any members who haven't joined the group yet
       group.notYetRejoinedMembers.foreach { failedMember =>
+        removeHeartbeatForLeavingMember(group, failedMember)
         group.remove(failedMember.memberId)
         // TODO: cut the socket connection to the client
       }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 8529bf9..515dbe1 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -471,11 +471,29 @@ class GroupCoordinatorTest extends JUnitSuite {
     heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
     assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
-    // now timeout the rebalance, which should kick the unjoined member out of the group
-    // and let the rebalance finish with only the new member
+    // now timeout the rebalance
     timer.advanceClock(500)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    val otherMemberId = otherJoinResult.memberId
+    val otherGenerationId = otherJoinResult.generationId
+    EasyMock.reset(replicaManager)
+    val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncResult._2)
+
+    // the unjoined member should be kicked out from the group
     assertEquals(Errors.NONE, otherJoinResult.error)
+    EasyMock.reset(replicaManager)
+    heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+    // the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while
+    // to verify that no new rebalance is triggered unexpectedly
+    for ( _ <-  1 to 20) {
+      timer.advanceClock(500)
+      EasyMock.reset(replicaManager)
+      heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
+      assertEquals(Errors.NONE, heartbeatResult)
+    }
   }
 
   @Test