You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/11 16:49:10 UTC

[kafka] branch trunk updated: KAFKA-8487: Only request re-join on REBALANCE_IN_PROGRESS in CommitOffsetResponse (#6894)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bebcbe3  KAFKA-8487: Only request re-join on REBALANCE_IN_PROGRESS in CommitOffsetResponse (#6894)
bebcbe3 is described below

commit bebcbe3a049f78c4184404f2dfb8b4150233856e
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Jun 11 09:48:43 2019 -0700

    KAFKA-8487: Only request re-join on REBALANCE_IN_PROGRESS in CommitOffsetResponse (#6894)
    
    Plus some minor cleanups on AbstractCoordinator.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    | 10 +--
 .../consumer/internals/ConsumerCoordinator.java    | 16 ++++-
 .../internals/ConsumerCoordinatorTest.java         | 34 ++++++++-
 .../kafka/coordinator/group/GroupCoordinator.scala | 84 +++++++++++++---------
 .../coordinator/group/GroupCoordinatorTest.scala   | 46 ++++++++++--
 5 files changed, 141 insertions(+), 49 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 73563fd..30277b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -555,7 +555,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 // reset the member id and retry immediately
                 resetGeneration();
                 log.debug("Attempt to join group failed due to unknown member id.");
-                future.raise(Errors.UNKNOWN_MEMBER_ID);
+                future.raise(error);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
@@ -592,7 +592,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     AbstractCoordinator.this.rejoinNeeded = true;
                     AbstractCoordinator.this.state = MemberState.UNJOINED;
                 }
-                future.raise(Errors.MEMBER_ID_REQUIRED);
+                future.raise(error);
             } else {
                 // unexpected error, throw the exception
                 log.error("Attempt to join group failed due to unexpected error: {}", error.message());
@@ -940,18 +940,18 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                 log.info("Attempt to heartbeat failed since group is rebalancing");
                 requestRejoin();
-                future.raise(Errors.REBALANCE_IN_PROGRESS);
+                future.raise(error);
             } else if (error == Errors.ILLEGAL_GENERATION) {
                 log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
                 resetGeneration();
-                future.raise(Errors.ILLEGAL_GENERATION);
+                future.raise(error);
             } else if (error == Errors.FENCED_INSTANCE_ID) {
                 log.error("Received fatal exception: group.instance.id gets fenced");
                 future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
                 resetGeneration();
-                future.raise(Errors.UNKNOWN_MEMBER_ID);
+                future.raise(error);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
             } else {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 64bf17d..bacb960 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -881,10 +881,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                             log.error("Received fatal exception: group.instance.id gets fenced");
                             future.raise(error);
                             return;
+                        } else if (error == Errors.REBALANCE_IN_PROGRESS) {
+                            /* Consumer never tries to commit offset in between join-group and sync-group,
+                             * and hence on broker-side it is not expected to see a commit offset request
+                             * during CompletingRebalance phase; if it ever happens then broker would return
+                             * this error. In this case we should just treat as a fatal CommitFailed exception.
+                             * However, we do not need to reset generations and just request re-join, such that
+                             * if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
+                             */
+                            requestRejoin();
+                            future.raise(new CommitFailedException());
+                            return;
                         } else if (error == Errors.UNKNOWN_MEMBER_ID
-                                || error == Errors.ILLEGAL_GENERATION
-                                || error == Errors.REBALANCE_IN_PROGRESS) {
-                            // need to re-join group
+                                || error == Errors.ILLEGAL_GENERATION) {
+                            // need to reset generation and re-join group
                             resetGeneration();
                             future.raise(new CommitFailedException());
                             return;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 6f62bbf..9595639 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1678,15 +1678,43 @@ public class ConsumerCoordinatorTest {
                 new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE));
     }
 
-    @Test(expected = CommitFailedException.class)
+    @Test
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed
+        final String consumerId = "leader";
+
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+
+        // ensure metadata is up-to-date for leader
+        client.updateMetadata(metadataResponse);
+
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
+        // normal join group
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                sync.data.generationId() == 1 &&
+                sync.groupAssignments().containsKey(consumerId);
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        AbstractCoordinator.Generation expectedGeneration = new AbstractCoordinator.Generation(1, consumerId, partitionAssignor.name());
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(expectedGeneration, coordinator.generation());
+
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
-        coordinator.commitOffsetsSync(singletonMap(t1p,
-                new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE));
+
+        assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p,
+            new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE)));
+
+        assertTrue(coordinator.rejoinNeededOrPending());
+        assertEquals(expectedGeneration, coordinator.generation());
     }
 
     @Test(expected = KafkaException.class)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 5d40e9b..e601d34 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -180,38 +180,48 @@ class GroupCoordinator(val brokerId: Int,
         if (group.hasStaticMember(groupInstanceId)) {
           val oldMemberId = group.getStaticMemberId(groupInstanceId)
 
-          if (group.is(Stable)) {
-            info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while " +
-              s"old member $oldMemberId will be removed. No rebalance will be triggered.")
-
-            val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
-
-            // Heartbeat of old member id will expire without affection since the group no longer contains that member id.
-            // New heartbeat shall be scheduled with new member id.
-            completeAndScheduleNextHeartbeatExpiration(group, oldMember)
-
-            responseCallback(JoinGroupResult(
-              members = if (group.isLeader(newMemberId)) {
-                group.currentMemberMetadata
-              } else {
-                List.empty
-              },
-              memberId = newMemberId,
-              generationId = group.generationId,
-              subProtocol = group.protocolOrNull,
-              leaderId = group.leaderOrNull,
-              error = Errors.NONE))
-          } else {
-            val knownStaticMember = group.get(oldMemberId)
-            updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
+          group.currentState match {
+            case Stable =>
+              info(s"Static member $groupInstanceId with unknown member id rejoins group ${group.groupId} " +
+                s"in ${group.currentState} state. Assigning new member id $newMemberId, while old member $oldMemberId " +
+                "will be removed. No rebalance will be triggered.")
+
+              val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
+
+              // Heartbeat of old member id will expire without affection since the group no longer contains that member id.
+              // New heartbeat shall be scheduled with new member id.
+              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+
+              responseCallback(JoinGroupResult(
+                members = if (group.isLeader(newMemberId)) {
+                  group.currentMemberMetadata
+                } else {
+                  List.empty
+                },
+                memberId = newMemberId,
+                generationId = group.generationId,
+                subProtocol = group.protocolOrNull,
+                leaderId = group.leaderOrNull,
+                error = Errors.NONE))
+
+            case _ =>
+              info(s"Static member $groupInstanceId with unkonwn member id rejoins group ${group.groupId} " +
+                s"in ${group.currentState} state. Update its membership with the pre-registered old member id $oldMemberId.")
+
+              val knownStaticMember = group.get(oldMemberId)
+              updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
           }
         } else if (requireKnownMemberId) {
             // If member id required (dynamic membership), register the member in the pending member list
             // and send back a response to call for another join group request with allocated member id.
-            group.addPendingMember(newMemberId)
-            addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
-            responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
+          debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
+              s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
+          group.addPendingMember(newMemberId)
+          addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
+          responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
         } else {
+          debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
+            s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
           addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
             clientId, clientHost, protocolType, protocols, group, responseCallback)
 
@@ -613,16 +623,26 @@ class GroupCoordinator(val brokerId: Int,
         // The group is only using Kafka to store offsets.
         // Also, for transactional offset commits we don't need to validate group membership and the generation.
         groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
-      } else if (group.is(CompletingRebalance)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
       } else if (!group.has(memberId)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
       } else if (generationId != group.generationId) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
       } else {
-        val member = group.get(memberId)
-        completeAndScheduleNextHeartbeatExpiration(group, member)
-        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+        group.currentState match {
+          case Stable | PreparingRebalance =>
+            // During PreparingRebalance phase, we still allow a commit request since we rely
+            // on heartbeat response to eventually notify the rebalance in progress signal to the consumer
+            val member = group.get(memberId)
+            completeAndScheduleNextHeartbeatExpiration(group, member)
+            groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+
+          case CompletingRebalance =>
+            // We should not receive a commit request if the group has not completed rebalance;
+            // but since the consumer's member.id and generation is valid, it means it has received
+            // the latest group generation information from the JoinResponse.
+            // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
+            responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
+        }
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 5a1b20a..cd1ebc5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1176,25 +1176,25 @@ class GroupCoordinatorTest {
 
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols,
       rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
-    val assignedConsumerId = joinGroupResult.memberId
+    val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE, syncGroupError)
 
     timer.advanceClock(sessionTimeout / 2)
 
     EasyMock.reset(replicaManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     timer.advanceClock(sessionTimeout / 2 + 100)
 
     EasyMock.reset(replicaManager)
-    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 1)
     assertEquals(Errors.NONE, heartbeatResult)
   }
 
@@ -2159,6 +2159,40 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def testCommitOffsetInCompletingRebalanceFromUnknownMemberId() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val tp = new TopicPartition("topic", 0)
+    val offset = offsetAndMetadata(0)
+
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testCommitOffsetInCompletingRebalanceFromIllegalGeneration() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val tp = new TopicPartition("topic", 0)
+    val offset = offsetAndMetadata(0)
+
+    val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId + 1, Map(tp -> offset))
+    assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
+  }
+
+  @Test
   def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
     // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
     val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@@ -2666,7 +2700,7 @@ class GroupCoordinatorTest {
   }
 
   private def commitOffsets(groupId: String,
-                            consumerId: String,
+                            memberId: String,
                             generationId: Int,
                             offsets: Map[TopicPartition, OffsetAndMetadata],
                             groupInstanceId: Option[String] = None): CommitOffsetCallbackParams = {
@@ -2692,7 +2726,7 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleCommitOffsets(groupId, consumerId, groupInstanceId, generationId, offsets, responseCallback)
+    groupCoordinator.handleCommitOffsets(groupId, memberId, groupInstanceId, generationId, offsets, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }