You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/05 21:20:24 UTC

[kafka] branch trunk updated: KAFKA-8386; Use COORDINATOR_NOT_AVAILABLE error when group is Dead (#6762)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb1f749  KAFKA-8386; Use COORDINATOR_NOT_AVAILABLE error when group is Dead (#6762)
fb1f749 is described below

commit fb1f74958d1431af7c45a4d499b3b6ffef0bf70e
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Jun 5 14:20:04 2019 -0700

    KAFKA-8386; Use COORDINATOR_NOT_AVAILABLE error when group is Dead (#6762)
    
    The Dead state in the coordinator is used for groups which are either pending deletion or migration to a new coordinator. Currently requests received while in this state result in an UNKNOWN_MEMBER_ID which causes consumers to reset the memberId. This is a problem for KIP-345 since it can cause an older member to fence a newer member. This patch changes the error code returned in this state to COORDINATOR_NOT_AVAILABLE, which causes the consumer to rediscover the coordinator, but not  [...]
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 28 +++++------
 .../coordinator/group/GroupCoordinatorTest.scala   | 56 +++++++++++++++++++---
 2 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index daf38f3..5d40e9b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -170,8 +170,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just removed the group
         // from the coordinator metadata; it is likely that the group has migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // joining without the specified member id.
-        responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
+        // finding the correct coordinator and rejoin.
+        responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.COORDINATOR_NOT_AVAILABLE))
       } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
         responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else {
@@ -235,8 +235,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just removed the group
         // from the coordinator metadata; this is likely that the group has migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // joining without the specified member id.
-        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+        // finding the correct coordinator and rejoin.
+        responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
       } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
         responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else if (group.isPendingMember(memberId)) {
@@ -351,8 +351,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just removed the group
         // from the coordinator metadata; this is likely that the group has migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // joining without the specified member id.
-        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+        // finding the correct coordinator and rejoin.
+        responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
       } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
         responseCallback(Array.empty, Errors.FENCED_INSTANCE_ID)
       } else if (!group.has(memberId)) {
@@ -417,16 +417,12 @@ class GroupCoordinator(val brokerId: Int,
 
     groupManager.getGroup(groupId) match {
       case None =>
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; it is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
-        // joining without specified consumer id,
         responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
       case Some(group) =>
         group.inLock {
           if (group.is(Dead)) {
-            responseCallback(Errors.UNKNOWN_MEMBER_ID)
+            responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
           } else if (group.isPendingMember(memberId)) {
             // if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
             // and if necessary, prompt a JoinGroup completion.
@@ -509,10 +505,10 @@ class GroupCoordinator(val brokerId: Int,
       case Some(group) => group.inLock {
         if (group.is(Dead)) {
           // if the group is marked as dead, it means some other thread has just removed the group
-          // from the coordinator metadata; it is likely that the group has migrated to some other
+          // from the coordinator metadata; this is likely that the group has migrated to some other
           // coordinator OR the group is in a transient unstable phase. Let the member retry
-          // joining without the specified member id.
-          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+          // finding the correct coordinator and rejoin.
+          responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
         } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
           responseCallback(Errors.FENCED_INSTANCE_ID)
         } else if (!group.has(memberId)) {
@@ -609,8 +605,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just removed the group
         // from the coordinator metadata; it is likely that the group has migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // joining without the specified member id.
-        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+        // finding the correct coordinator and rejoin.
+        responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
       } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.FENCED_INSTANCE_ID))
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 770868c..5a1b20a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -386,7 +386,18 @@ class GroupCoordinatorTest {
 
     groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
     val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, protocolType, protocols)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
+  }
+
+  @Test
+  def testSyncDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+    val syncGroupResult = syncGroupFollower(deadGroupId, 1, memberId)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, syncGroupResult._2)
   }
 
   @Test
@@ -507,7 +518,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
   }
 
   @Test
@@ -716,6 +727,19 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def testOffsetCommitDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+    val tp = new TopicPartition("topic", 0)
+    val offset = offsetAndMetadata(0)
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+    val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tp -> offset))
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, offsetCommitResult(tp))
+  }
+
+  @Test
   def staticMemberCommitOffsetWithInvalidMemberId() {
     val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
 
@@ -999,12 +1023,22 @@ class GroupCoordinatorTest {
 
   @Test
   def testHeartbeatUnknownGroup() {
-
     val heartbeatResult = heartbeat(groupId, memberId, -1)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
   }
 
   @Test
+  def testheartbeatDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+    val heartbeatResult = heartbeat(deadGroupId, memberId, 1)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, heartbeatResult)
+  }
+
+  @Test
   def testHeartbeatUnknownConsumerExistingGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "memberId"
@@ -1304,9 +1338,7 @@ class GroupCoordinatorTest {
 
   @Test
   def testSyncGroupFromUnknownGroup() {
-    val generation = 1
-
-    val syncGroupResult = syncGroupFollower(groupId, generation, memberId)
+    val syncGroupResult = syncGroupFollower(groupId, 1, memberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult._2)
   }
 
@@ -2179,12 +2211,22 @@ class GroupCoordinatorTest {
 
   @Test
   def testLeaveGroupUnknownGroup() {
-
     val leaveGroupResult = leaveGroup(groupId, memberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
   }
 
   @Test
+  def testLeaveDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+    val leaveGroupResult = leaveGroup(deadGroupId, memberId)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, leaveGroupResult)
+  }
+
+  @Test
   def testLeaveGroupUnknownConsumerExistingGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "consumerId"