You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/24 00:25:31 UTC
[kafka] branch 2.2 updated: KAFKA-9232;
Coordinator new member timeout does not work for JoinGroup v3 and
below (#7753)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new bfbd397 KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
bfbd397 is described below
commit bfbd397f5860dcbf619002f6e8654d4cd11bbf1a
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon Dec 23 18:55:48 2019 -0500
KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3 and below (#7753)
The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/coordinator/group/MemberMetadata.scala | 13 +++++++--
.../coordinator/group/GroupCoordinatorTest.scala | 32 +++++++++++++++++++++-
2 files changed, 41 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 1932f42..5645b27 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -71,6 +71,7 @@ private[group] class MemberMetadata(val memberId: String,
var isNew: Boolean = false
def isAwaitingJoin = awaitingJoinCallback != null
+ def isAwaitingSync = awaitingSyncCallback != null
/**
* Get metadata corresponding to the provided protocol.
@@ -84,10 +85,16 @@ private[group] class MemberMetadata(val memberId: String,
}
def shouldKeepAlive(deadlineMs: Long): Boolean = {
- if (isAwaitingJoin)
- !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
- else awaitingSyncCallback != null ||
+ if (isNew) {
+ // New members are expired after the static join timeout
+ latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+ } else if (isAwaitingJoin || isAwaitingSync) {
+ // Don't remove members as long as they have a request in purgatory
+ true
+ } else {
+ // Otherwise check for session expiration
latestHeartbeat + sessionTimeoutMs > deadlineMs
+ }
}
/**
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index a529750..126d01d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -288,6 +288,36 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
+ def testNewMemberTimeoutCompletion(): Unit = {
+ val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
+ val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, sessionTimeout, DefaultRebalanceTimeout, false)
+
+ timer.advanceClock(GroupInitialRebalanceDelay + 1)
+
+ val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+ val group = groupCoordinator.groupManager.getGroup(groupId).get
+ val memberId = joinResult.memberId
+
+ assertEquals(Errors.NONE, joinResult.error)
+ assertEquals(0, group.allMemberMetadata.count(_.isNew))
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId -> Array[Byte]()))
+ val syncGroupError = syncGroupResult._2
+
+ assertEquals(Errors.NONE, syncGroupError)
+ assertEquals(1, group.size)
+
+ timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100)
+
+ // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked
+ assertEquals(1, group.size)
+
+ timer.advanceClock(sessionTimeout + 100)
+ assertEquals(0, group.size)
+ }
+
+ @Test
def testNewMemberJoinExpiration(): Unit = {
// This tests new member expiration during a protracted rebalance. We first create a
// group with one member which uses a large value for session timeout and rebalance timeout.
@@ -1008,7 +1038,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val nextGenerationId = joinResult.generationId
- // with no leader SyncGroup, the follower's request should failure with an error indicating
+ // with no leader SyncGroup, the follower's request should fail with an error indicating
// that it should rejoin
EasyMock.reset(replicaManager)
val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)