You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/14 23:41:53 UTC

[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r439543737



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may

Review comment:
       It's cleaner to not pass in completeDelayedRequests here and let the caller (`ReplicaManager.appendRecords()`) check and complete purgatory instead.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1118,33 +1170,38 @@ class GroupCoordinator(val brokerId: Int,
     group.removeStaticMember(member.groupInstanceId)
 
     group.currentState match {
-      case Dead | Empty =>
-      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
-      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      case Dead | Empty => None
+      case Stable | CompletingRebalance =>
+        maybePrepareRebalance(group, reason)
+        None
+      case PreparingRebalance => Some(GroupKey(group.groupId))
     }
   }
 
-  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String): Unit = {
+  /**
+   * remove the pending member and then return the group key whihc is in PreparingRebalance,
+   * @param group group
+   * @param memberId member id
+   * @return group key if it is in PreparingRebalance. Otherwise, None
+   */
+  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String): Option[GroupKey] = {
     group.removePendingMember(memberId)
 
-    if (group.is(PreparingRebalance)) {
-      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
-    }
-  }
-
-  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
-    group.inLock {
-      if (group.hasAllMembersJoined)
-        forceComplete()
-      else false
-    }
+    if (group.is(PreparingRebalance)) Some(GroupKey(group.groupId))
+    else None
   }
 
   def onExpireJoin(): Unit = {
     // TODO: add metrics for restabilize timeouts
   }
 
-  def onCompleteJoin(group: GroupMetadata): Unit = {
+  /**
+   * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been
+   *         incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete

Review comment:
       The caller no longer passed in completeDelayedRequests.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,24 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock and then it tries to hold more group

Review comment:
       There was => There were

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3921,22 +3934,26 @@ class GroupCoordinatorTest {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
     val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
-
-    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
-      EasyMock.anyShort(),
+    EasyMock.expect(replicaManager.completeDelayedRequests(EasyMock.anyObject()))

Review comment:
       Hmm, this should only be called with LeaderHWChange.LeaderHWIncremented, but the mock later returns LeaderHWChange.None? Ditto below.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete delayed requests.
+   *
+   */
+  override def tryComplete(): Boolean = try group.inLock {
+    /**
+     * holds the group lock for both the "group.hasAllMembersJoined" check and the call to forceComplete()
+     */
+    if (group.hasAllMembersJoined) forceComplete()
+    else false
+  } finally completeDelayedRequests()
+  override def onExpiration(): Unit = coordinator.onExpireJoin()
+  override def onComplete(): Unit = try partitionsToComplete = coordinator.onCompleteJoin(group)
+  finally completeDelayedRequests()
+
+  /**
+   * try to complete delayed requests only if the caller does not hold the group lock.
+   * This method is called by following cases:
+   * 1) tryComplete -> hold lock -> onComplete -> release lock -> completeDelayedRequests
+   * 2) onComplete -> completeDelayedRequests

Review comment:
       expire ->  onComplete -> completeDelayedRequests

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, None) {

Review comment:
       DelyaedOperation.lockOpt defaults to None. So, we don't have to specify it explicitly.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete delayed requests.

Review comment:
       "GroupCoordinator#onCompleteJoin() tries to complete delayed requests" => since the completion of the delayed request for partitions returned from GroupCoordinator#onCompleteJoin() need to be done outside of the group lock.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -603,6 +650,9 @@ class ReplicaManager(val config: KafkaConfig,
         val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
         responseCallback(produceResponseStatus)
       }
+      localProduceResults
+        .filter { case (_, logAppendResult) => logAppendResult.exception.isEmpty}
+        .map(e => e._1 -> e._2.leaderHWChange)

Review comment:
       Could we use `map {case (tp, appendResult) => ...}` here to avoid using unamed references?

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete delayed requests.
+   *
+   */
+  override def tryComplete(): Boolean = try group.inLock {
+    /**
+     * holds the group lock for both the "group.hasAllMembersJoined" check and the call to forceComplete()
+     */
+    if (group.hasAllMembersJoined) forceComplete()
+    else false
+  } finally completeDelayedRequests()
+  override def onExpiration(): Unit = coordinator.onExpireJoin()
+  override def onComplete(): Unit = try partitionsToComplete = coordinator.onCompleteJoin(group)
+  finally completeDelayedRequests()
+
+  /**
+   * try to complete delayed requests only if the caller does not hold the group lock.
+   * This method is called by following cases:
+   * 1) tryComplete -> hold lock -> onComplete -> release lock -> completeDelayedRequests
+   * 2) onComplete -> completeDelayedRequests
+   */
+  private[group] def completeDelayedRequests(): Unit = if (!group.lock.isHeldByCurrentThread) {

Review comment:
       Another way that doesn't require checking lock.isHeldByCurrentThread is the following. But your approach seems simpler.
   
   Override forceComplete() to
   ```
   override def forceComplete() {
       if (completed.compareAndSet(false, true)) {
         // cancel the timeout timer
         cancel()
         partitionsToComplete  = coordinator.onCompleteJoin(group)
         onComplete()
         true
       } else {
         false
       }
   }
   ```
   In onComplete(), do nothing.
   
   In tryComplete(), do
   ```
   override def tryComplete() {
     group.inLock {
       if (group.hasAllMembersJoined) 
         isForceComplete = forceComplete()
     }
     completeDelayedRequests(partitionsToComplete)
     isForceComplete
   }
   ```
   
   In onExpiration(),
   ```
   override def onExpiration() {
     completeDelayedRequests(partitionsToComplete)
   }
   ```
   

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = {
       val producerId = 1000L
       val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-      groupCoordinator.groupManager.handleTxnCompletion(producerId,
-        offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean)
+      val isCommit = random.nextBoolean
+      try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+        offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+      catch {
+        case e: IllegalStateException if isCommit
+          && e.getMessage.contains("though the offset commit record itself hasn't been appended to the log")=>

Review comment:
       Hmm, why do we need this logic now?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -311,37 +317,47 @@ class GroupMetadataManager(brokerId: Int,
 
           responseCallback(responseError)
         }
-        appendForGroup(group, groupMetadataRecords, putCacheCallback)
-
+        appendForGroup(group, groupMetadataRecords, putCacheCallback, completeDelayedRequests)
       case None =>
         responseCallback(Errors.NOT_COORDINATOR)
-        None
+        Map.empty
     }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been
+   *         incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete
+   *         delayed requests for those returned partitions.
+   */
   private def appendForGroup(group: GroupMetadata,
                              records: Map[TopicPartition, MemoryRecords],
-                             callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+                             callback: Map[TopicPartition, PartitionResponse] => Unit,
+                             completeDelayedRequests: Boolean): Map[TopicPartition, LeaderHWChange] = {
     // call replica manager to append the group message
     replicaManager.appendRecords(
       timeout = config.offsetCommitTimeoutMs.toLong,
       requiredAcks = config.offsetCommitRequiredAcks,
       internalTopicsAllowed = true,
       origin = AppendOrigin.Coordinator,
+      completeDelayedRequests = completeDelayedRequests,
       entriesPerPartition = records,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback)
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
+   * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been
+   *         incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete
+   *         delayed requests for those returned partitions.
    */
   def storeOffsets(group: GroupMetadata,
                    consumerId: String,
                    offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+                   completeDelayedRequests: Boolean,

Review comment:
       All callers pass in completeDelayedRequests as false. Could we remove this param?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1118,33 +1170,38 @@ class GroupCoordinator(val brokerId: Int,
     group.removeStaticMember(member.groupInstanceId)
 
     group.currentState match {
-      case Dead | Empty =>
-      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
-      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      case Dead | Empty => None
+      case Stable | CompletingRebalance =>
+        maybePrepareRebalance(group, reason)
+        None
+      case PreparingRebalance => Some(GroupKey(group.groupId))
     }
   }
 
-  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String): Unit = {
+  /**
+   * remove the pending member and then return the group key whihc is in PreparingRebalance,

Review comment:
       typo whihc

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may

Review comment:
       may requires => may require

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,24 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to complete delayed requests does NOT hold lock.
+   * We introduces a flag in ReplicaManager.appendRecords()., called completeDelayedRequests, to prevent the method

Review comment:
        ReplicaManager.appendRecords()., => ReplicaManager.appendRecords(),

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -536,6 +537,11 @@ class GroupCoordinatorTest {
     // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked
     assertEquals(1, group.size)
 
+    // prepare the mock replica manager again since the delayed join is going to complete
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()

Review comment:
       Hmm, why do we need to mock this since replicaManager.getMagic() is only called through replicaManager.handleWriteTxnMarkersRequest()?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org