You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:50:48 UTC

[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r428949769



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -550,19 +584,22 @@ class ReplicaManager(val config: KafkaConfig,
    * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
    * the callback function will be triggered either when timeout or the required acks are satisfied;
    * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
+   * @return the topic partitions we succeed to append data. It is useful to caller which tries to complete delayed requests.

Review comment:
       Perhaps reword like the following?
   
   Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete delayed requests for those returned partitions.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -593,6 +630,7 @@ class ReplicaManager(val config: KafkaConfig,
         val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
         responseCallback(produceResponseStatus)
       }
+      localProduceResults.filter(_._2.exception.isEmpty).map(e => e._1 -> e._2.leaderHWIncremented)

Review comment:
       Could we do `localProduceResults.filter{ case (tp, logAppendResult) => ... }` to avoid unnamed references?

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,20 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock". There was a lot of cases that hold a lock and then try to
+   * hold more locks to complete delayed requests. Unfortunately, that scenario causes deadlock and we had introduced
+   * the "tryLock" to avoid deadlock. However, the "tryLock" causes another issue that thread_A holds a lock but it does
+   * not complete the delayed requests and there are no threads can complete request as the lock is not free.

Review comment:
       "as the lock is not free" : Do you mean "when the lock is free"?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -311,37 +312,40 @@ class GroupMetadataManager(brokerId: Int,
 
           responseCallback(responseError)
         }
-        appendForGroup(group, groupMetadataRecords, putCacheCallback)
-
+        appendForGroup(group, groupMetadataRecords, putCacheCallback, completeDelayedRequests)
       case None =>
         responseCallback(Errors.NOT_COORDINATOR)
-        None
+        Map.empty
     }
   }
 
   private def appendForGroup(group: GroupMetadata,
                              records: Map[TopicPartition, MemoryRecords],
-                             callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+                             callback: Map[TopicPartition, PartitionResponse] => Unit,
+                             completeDelayedRequests: Boolean): Map[TopicPartition, Boolean] = {

Review comment:
       Could we add a comment to explain the return value?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -311,37 +312,40 @@ class GroupMetadataManager(brokerId: Int,
 
           responseCallback(responseError)
         }
-        appendForGroup(group, groupMetadataRecords, putCacheCallback)
-
+        appendForGroup(group, groupMetadataRecords, putCacheCallback, completeDelayedRequests)
       case None =>
         responseCallback(Errors.NOT_COORDINATOR)
-        None
+        Map.empty
     }
   }
 
   private def appendForGroup(group: GroupMetadata,
                              records: Map[TopicPartition, MemoryRecords],
-                             callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+                             callback: Map[TopicPartition, PartitionResponse] => Unit,
+                             completeDelayedRequests: Boolean): Map[TopicPartition, Boolean] = {
     // call replica manager to append the group message
     replicaManager.appendRecords(
       timeout = config.offsetCommitTimeoutMs.toLong,
       requiredAcks = config.offsetCommitRequiredAcks,
       internalTopicsAllowed = true,
       origin = AppendOrigin.Coordinator,
+      completeDelayedRequests = completeDelayedRequests,
       entriesPerPartition = records,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback)
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
+   * @return the topic partitions having new records

Review comment:
       A map containing the topic partitions having new records and a flag indicating whether the HWM has been incremented.
   

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1113,33 +1163,27 @@ class GroupCoordinator(val brokerId: Int,
     group.removeStaticMember(member.groupInstanceId)
 
     group.currentState match {
-      case Dead | Empty =>
-      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
-      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      case Dead | Empty => None
+      case Stable | CompletingRebalance =>
+        maybePrepareRebalance(group, reason)
+        None
+      case PreparingRebalance => Some(GroupKey(group.groupId))
     }
   }
 
-  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String): Unit = {
+  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String): Option[GroupKey] = {

Review comment:
       Could we add a comment for the return value?

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +33,15 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, None) {

Review comment:
       I was trying to check if it's safe to do this. The intention for this is probably to avoid the deadlock between the group lock and the lock in DelayedOperation. None of the caller of joinPurgatory.checkAndComplete holds a group lock now. The only other caller that can first hold a group lock and then the lock in DelayedOperation is joinPurgatory.tryCompleteElseWatch(). However, that's not an issue since that's when the DelayedJoin operation is first added. So, this changes seems ok.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -241,7 +241,8 @@ class GroupMetadataManager(brokerId: Int,
 
   def storeGroup(group: GroupMetadata,
                  groupAssignment: Map[String, Array[Byte]],
-                 responseCallback: Errors => Unit): Unit = {
+                 responseCallback: Errors => Unit,
+                 completeDelayedRequests: Boolean): Map[TopicPartition, Boolean] = {

Review comment:
       Could we add a comment to explain the return value?

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,20 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock". There was a lot of cases that hold a lock and then try to
+   * hold more locks to complete delayed requests. Unfortunately, that scenario causes deadlock and we had introduced
+   * the "tryLock" to avoid deadlock. However, the "tryLock" causes another issue that thread_A holds a lock but it does
+   * not complete the delayed requests and there are no threads can complete request as the lock is not free.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to complete delayed requests does NOT hold lock.
+   * We introduces a flag, called completeDelayedRequests, to prevent the method from automatically completing delayed

Review comment:
       a flag => a flag in ReplicaManager.appendRecords().

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -970,7 +970,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may
+   *                                produce deadlock if caller already holds a group lock. Hence, caller should pass
+   *                                false to disable completion and then complete the delayed requests after releasing
+   *                                held group lock
+   */
+  def appendRecordsToLeader(records: MemoryRecords,
+                            origin: AppendOrigin,
+                            requiredAcks: Int,
+                            completeDelayedRequests: Boolean): LogAppendResult = {

Review comment:
       I agree that it's simpler to let the caller in ReplicaManager to complete the delayed requests. This way, we don't need to pass completeDelayedRequests in here.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +33,15 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete delayed requests.
+   */
+  override def tryComplete(): Boolean = if (group.inLock(group.hasAllMembersJoined)) forceComplete() else false

Review comment:
       This is bit tricky to untangle. It seems the original code holds the group lock for both the `group.hasAllMembersJoined` check and the call to forceComplete(). So, we probably want to keep doing that. 
   
   I am thinking that we could do the following.
   1. Change `GroupCoordinator.onCompleteJoin()` so that (1) it checks group.hasAllMembersJoined inside the group lock and returns whether hasAllMembersJoined is true.
   2. In `DelayedJoin.tryComplete() `, we do 
   ```
           if (GroupCoordinator.onCompleteJoin()) 
                  forceComplete() 
   }
   ```
   In onComplete(), we do nothing.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1103,7 +1153,7 @@ class GroupCoordinator(val brokerId: Int,
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
 
-  private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Unit = {
+  private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Option[GroupKey] = {

Review comment:
       Could we add a comment for the return value?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org