You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/24 00:11:13 UTC

[kafka] branch 2.3 updated: KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 95648d3  KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
95648d3 is described below

commit 95648d399278155cda347b80b6cf9e29f09cdec8
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon Dec 23 18:55:48 2019 -0500

    KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
    
    The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/coordinator/group/MemberMetadata.scala   | 12 ++++++--
 .../coordinator/group/GroupCoordinatorTest.scala   | 32 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 83ff709..fc90c95 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -87,10 +87,16 @@ private[group] class MemberMetadata(var memberId: String,
   }
 
   def shouldKeepAlive(deadlineMs: Long): Boolean = {
-    if (isAwaitingJoin)
-      !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
-    else awaitingSyncCallback != null ||
+    if (isNew) {
+      // New members are expired after the static join timeout
+      latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+    } else if (isAwaitingJoin || isAwaitingSync) {
+      // Don't remove members as long as they have a request in purgatory
+      true
+    } else {
+      // Otherwise check for session expiration
       latestHeartbeat + sessionTimeoutMs > deadlineMs
+    }
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 0a6d7fc..d62a123 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -308,6 +308,36 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def testNewMemberTimeoutCompletion(): Unit = {
+    val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
+    val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, None, sessionTimeout, DefaultRebalanceTimeout, false)
+
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+
+    val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    val memberId = joinResult.memberId
+
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(0, group.allMemberMetadata.count(_.isNew))
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+
+    assertEquals(Errors.NONE, syncGroupError)
+    assertEquals(1, group.size)
+
+    timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100)
+
+    // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked
+    assertEquals(1, group.size)
+
+    timer.advanceClock(sessionTimeout + 100)
+    assertEquals(0, group.size)
+  }
+
+  @Test
   def testNewMemberJoinExpiration(): Unit = {
     // This tests new member expiration during a protracted rebalance. We first create a
     // group with one member which uses a large value for session timeout and rebalance timeout.
@@ -1823,7 +1853,7 @@ class GroupCoordinatorTest {
 
     val nextGenerationId = joinResult.generationId
 
-    // with no leader SyncGroup, the follower's request should failure with an error indicating
+    // with no leader SyncGroup, the follower's request should fail with an error indicating
     // that it should rejoin
     EasyMock.reset(replicaManager)
     val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId, None)