You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/05 21:41:19 UTC
[kafka] branch 2.3 updated: KAFKA-8386;
Use COORDINATOR_NOT_AVAILABLE error when group is Dead (#6762)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new be0ddc8 KAFKA-8386; Use COORDINATOR_NOT_AVAILABLE error when group is Dead (#6762)
be0ddc8 is described below
commit be0ddc8d8da91155fcf98033cd5b9d2a13d3baee
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Jun 5 14:20:04 2019 -0700
KAFKA-8386; Use COORDINATOR_NOT_AVAILABLE error when group is Dead (#6762)
The Dead state in the coordinator is used for groups which are either pending deletion or migration to a new coordinator. Currently requests received while in this state result in an UNKNOWN_MEMBER_ID which causes consumers to reset the memberId. This is a problem for KIP-345 since it can cause an older member to fence a newer member. This patch changes the error code returned in this state to COORDINATOR_NOT_AVAILABLE, which causes the consumer to rediscover the coordinator, but not [...]
Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 28 +++++------
.../coordinator/group/GroupCoordinatorTest.scala | 56 +++++++++++++++++++---
2 files changed, 61 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 55c90e5..52f1a98 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -170,8 +170,8 @@ class GroupCoordinator(val brokerId: Int,
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id.
- responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
+ // finding the correct coordinator and rejoin.
+ responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
@@ -235,8 +235,8 @@ class GroupCoordinator(val brokerId: Int,
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id.
- responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+ // finding the correct coordinator and rejoin.
+ responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (group.isPendingMember(memberId)) {
@@ -351,8 +351,8 @@ class GroupCoordinator(val brokerId: Int,
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id.
- responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+ // finding the correct coordinator and rejoin.
+ responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
} else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
responseCallback(Array.empty, Errors.FENCED_INSTANCE_ID)
} else if (!group.has(memberId)) {
@@ -414,16 +414,12 @@ class GroupCoordinator(val brokerId: Int,
groupManager.getGroup(groupId) match {
case None =>
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; it is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
- // joining without specified consumer id,
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) =>
group.inLock {
if (group.is(Dead)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
+ responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
} else if (group.isPendingMember(memberId)) {
// if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
// and if necessary, prompt a JoinGroup completion.
@@ -506,10 +502,10 @@ class GroupCoordinator(val brokerId: Int,
case Some(group) => group.inLock {
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; it is likely that the group has migrated to some other
+ // from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id.
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
+ // finding the correct coordinator and rejoin.
+ responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
} else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
responseCallback(Errors.FENCED_INSTANCE_ID)
} else if (!group.has(memberId)) {
@@ -603,8 +599,8 @@ class GroupCoordinator(val brokerId: Int,
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id.
- responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+ // finding the correct coordinator and rejoin.
+ responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
} else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.FENCED_INSTANCE_ID))
} else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 280fc8e..5bbaf5d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -386,7 +386,18 @@ class GroupCoordinatorTest {
groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, protocolType, protocols)
- assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
+ }
+
+ @Test
+ def testSyncDeadGroup() {
+ val memberId = "memberId"
+
+ val deadGroupId = "deadGroupId"
+
+ groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+ val syncGroupResult = syncGroupFollower(deadGroupId, 1, memberId)
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, syncGroupResult._2)
}
@Test
@@ -507,7 +518,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1)
- assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
}
@Test
@@ -716,6 +727,19 @@ class GroupCoordinatorTest {
}
@Test
+ def testOffsetCommitDeadGroup() {
+ val memberId = "memberId"
+
+ val deadGroupId = "deadGroupId"
+ val tp = new TopicPartition("topic", 0)
+ val offset = offsetAndMetadata(0)
+
+ groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+ val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tp -> offset))
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, offsetCommitResult(tp))
+ }
+
+ @Test
def staticMemberCommitOffsetWithInvalidMemberId() {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
@@ -999,12 +1023,22 @@ class GroupCoordinatorTest {
@Test
def testHeartbeatUnknownGroup() {
-
val heartbeatResult = heartbeat(groupId, memberId, -1)
assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
}
@Test
+ def testheartbeatDeadGroup() {
+ val memberId = "memberId"
+
+ val deadGroupId = "deadGroupId"
+
+ groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+ val heartbeatResult = heartbeat(deadGroupId, memberId, 1)
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, heartbeatResult)
+ }
+
+ @Test
def testHeartbeatUnknownConsumerExistingGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = "memberId"
@@ -1303,9 +1337,7 @@ class GroupCoordinatorTest {
@Test
def testSyncGroupFromUnknownGroup() {
- val generation = 1
-
- val syncGroupResult = syncGroupFollower(groupId, generation, memberId)
+ val syncGroupResult = syncGroupFollower(groupId, 1, memberId)
assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult._2)
}
@@ -2178,12 +2210,22 @@ class GroupCoordinatorTest {
@Test
def testLeaveGroupUnknownGroup() {
-
val leaveGroupResult = leaveGroup(groupId, memberId)
assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
}
@Test
+ def testLeaveDeadGroup() {
+ val memberId = "memberId"
+
+ val deadGroupId = "deadGroupId"
+
+ groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+ val leaveGroupResult = leaveGroup(deadGroupId, memberId)
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, leaveGroupResult)
+ }
+
+ @Test
def testLeaveGroupUnknownConsumerExistingGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = "consumerId"