You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:43:49 UTC

[GitHub] [kafka] showuon commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

showuon commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r649931726



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,12 +1463,67 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          val delayedSync = new DelayedSync(this, group, group.rebalanceTimeoutMs)
+          val groupKey = GroupKey(group.groupId)
+          syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
         }
       }
     }
   }
 
+  private def maybeCompleteSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  def tryCompletePendingSync(
+    group: GroupMetadata,
+    forceComplete: () => Boolean
+  ): Boolean = {
+    group.inLock {
+      group.currentState match {
+        case Dead | Empty | PreparingRebalance =>
+          forceComplete()
+        case CompletingRebalance | Stable =>
+          if (group.hasReceivedSyncFromAllMembers())
+            forceComplete()
+          else false
+      }
+    }
+  }
+
+  def onExpirePendingSync(
+    group: GroupMetadata
+  ): Unit = {
+    group.inLock {
+      group.currentState match {
+        case Dead | Empty | PreparingRebalance =>
+          debug(s"Received notification of sync expiration for group ${group.groupId} in ${group.currentState} state. Ignoring.")

Review comment:
       nit: We could log this debug string (except `ignoring`) on the top of the `onExpirePendingSync` method, so that we can see the log in each situation. 

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is completing the rebalance.
+ *
+ * Whenever a SyncGroup is receives, checks that a SyncGroup has been received for each
+ * member of the group; if yes, complete this operation.
+ *
+ * When the operation has expired, any known members that have not sent a SyncGroup requests
+ * are removed from the group. If any members is removed, the group is rebalanced.
+ */
+private[group] class DelayedSync(
+  coordinator: GroupCoordinator,
+  group: GroupMetadata,
+  rebalanceTimeoutMs: Long
+) extends DelayedOperation(
+  rebalanceTimeoutMs,
+  Some(group.lock)
+) {
+  override def tryComplete(): Boolean = {
+    coordinator.tryCompletePendingSync(group, forceComplete _)
+  }
+
+  override def onExpiration(): Unit = {
+    coordinator.onExpirePendingSync(group)
+  }
+
+  override def onComplete(): Unit = {
+    // Nothing

Review comment:
       nit: remove this comment.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1209,7 +1216,11 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata): Unit = {
-    val memberKey = MemberKey(group.groupId, member.memberId)

Review comment:
       I don't think we should keep this overloading method here. We can just remove it and change all the caller to pass memberId (I searched, only 2 places called it)

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is completing the rebalance.
+ *
+ * Whenever a SyncGroup is receives, checks that a SyncGroup has been received for each
+ * member of the group; if yes, complete this operation.

Review comment:
       Are you trying to say: 
   Whenever a SyncGroup is receives, checks that **we received all the** SyncGroup **request from** each member of the group?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org