You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/14 22:28:20 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

hachikuji commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r651224002



##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is completing the rebalance.
+ *
+ * Whenever a SyncGroup is receives, checks that we received all the SyncGroup request from

Review comment:
       nit: Whenever a SyncGroup is receive**d**?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
         }
       }
     }
   }
 
+  private def maybeRemovePendingSyncMember(

Review comment:
       nit: `removePendingSyncMember` throws an exception if the member is not in the group, so does the "maybe" in the name make sense?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
         }
       }
     }
   }
 
+  private def maybeRemovePendingSyncMember(
+    group: GroupMetadata,
+    memberId: String
+  ): Unit = {
+    group.removePendingSyncMember(memberId)
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    group.clearPendingSyncMembers()
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+    group: GroupMetadata
+  ): Unit = {
+    val delayedSync = new DelayedSync(this, group, group.rebalanceTimeoutMs)
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+    group: GroupMetadata,
+    forceComplete: () => Boolean
+  ): Boolean = {
+    group.inLock {
+      group.currentState match {
+        case Dead | Empty | PreparingRebalance =>
+          forceComplete()
+        case CompletingRebalance | Stable =>
+          if (group.hasReceivedSyncFromAllMembers())
+            forceComplete()
+          else false
+      }
+    }
+  }
+
+  def onExpirePendingSync(
+    group: GroupMetadata
+  ): Unit = {
+    group.inLock {
+      group.currentState match {
+        case Dead | Empty | PreparingRebalance =>
+          debug(s"Received unexpected notification of sync expiration after group ${group.groupId} " +
+            s"already transitioned to the ${group.currentState} state.")
+
+        case CompletingRebalance | Stable =>
+          if (!group.hasAllMembersJoined) {
+            val pendingSyncMembers = group.allPendingSyncMembers()
+
+            info(s"Group ${group.groupId} removed members who haven't " +

Review comment:
       nit: this message might be redundant given the one in `prepareRebalance`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
         }
       }
     }
   }
 
+  private def maybeRemovePendingSyncMember(
+    group: GroupMetadata,
+    memberId: String
+  ): Unit = {
+    group.removePendingSyncMember(memberId)
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    group.clearPendingSyncMembers()
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+    group: GroupMetadata
+  ): Unit = {
+    val delayedSync = new DelayedSync(this, group, group.rebalanceTimeoutMs)

Review comment:
       I was considering if we should start the timer fresh after the group is joined. An alternative is to let the rebalance timeout cover the total time from when the rebalance is first triggered. On the other hand, if we are trying to allow for a full max.poll.interval.ms in between Join and Sync, then maybe we need to reset it.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is completing the rebalance.
+ *
+ * Whenever a SyncGroup is receives, checks that we received all the SyncGroup request from
+ * each member of the group; if yes, complete this operation.
+ *
+ * When the operation has expired, any known members that have not sent a SyncGroup requests
+ * are removed from the group. If any members is removed, the group is rebalanced.
+ */
+private[group] class DelayedSync(
+  coordinator: GroupCoordinator,
+  group: GroupMetadata,

Review comment:
       Async operations always make me a little nervous. Would it be reasonable to add the generation to this as a field? That would give us a simple way to ensure that the delayed operation could not get applied to the wrong rebalance.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -2254,6 +2255,209 @@ class GroupCoordinatorTest {
     assertEquals(0, group().numPending)
   }
 
+  private def verifyHeartbeat(
+    joinGroupResult: JoinGroupResult,
+    expectedError: Errors
+  ): Unit = {
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(
+      groupId,
+      joinGroupResult.memberId,
+      joinGroupResult.generationId
+    )
+    assertEquals(expectedError, heartbeatResult)
+  }
+
+  private def joinWithNMembers(nbMembers: Int): Seq[JoinGroupResult] = {
+    val requiredKnownMemberId = true
+
+    // First JoinRequests
+    var futures = 1.to(nbMembers).map { _ =>
+      EasyMock.reset(replicaManager)
+      sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+        None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
+    }
+
+    // Get back the assigned member ids
+    val memberIds = futures.map(await(_, 1).memberId)
+
+    // Second JoinRequests
+    futures = memberIds.map { memberId =>
+      EasyMock.reset(replicaManager)
+      sendJoinGroup(groupId, memberId, protocolType, protocols,
+        None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
+    }
+
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+    timer.advanceClock(DefaultRebalanceTimeout + 1)
+
+    futures.map(await(_, 1))
+  }
+
+  @Test
+  def testRebalanceTimesOutWhenSyncRequestIsNotReceiced(): Unit = {

Review comment:
       nit: typo `Receiced` (one more below as well)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org