You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/08 21:35:33 UTC

[GitHub] [kafka] junrao commented on pull request #6915: KAFKA-8334 Executor to retry delayed operations failed to obtain lock

junrao commented on pull request #6915:
URL: https://github.com/apache/kafka/pull/6915#issuecomment-626031375


   @chia7712 :  Thanks for your interest. My understand of the issue is the following. We added the retry logic based on `tryCompletePending` in `DelayedOperation.maybeTryComplete()` like the following.
   
   ```
     private[server] def maybeTryComplete(): Boolean = {
       var retry = false
       var done = false
       do {
         if (lock.tryLock()) {
           try {
             tryCompletePending.set(false)
             done = tryComplete()
           } finally {
             lock.unlock()
           }
           // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
           // `tryCompletePending`. In this case we should retry.
           retry = tryCompletePending.get()
         } else {
           // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
           // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
           // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
           // released the lock and returned by the time the flag is set.
           retry = !tryCompletePending.getAndSet(true)
         }
       } while (!isCompleted && retry)
   ```
   
   This is causing the issue in KAFKA-8334 since the logic expects when a caller of `DelayedOperation.maybeTryComplete()` can't get the lock, the other caller holding the lock will try to complete the operation. This is not the case when the caller is handling the replication fetch request on the offset_commit topic and the lock holding caller is handling the HeartBeat request. 
   
   If we remove the logic based on `tryCompletePending` and revert to the logic before by just blocking on the lock, we can avoid the above issue.
   
   The reason that we added the `tryCompletePending` logic is to avoid deadlock in the following situation. We hold a group lock while calling ReplicaManager.appendRecords(). After it appends to the local log, it may call ReplicaManager.tryCompleteDelayedProduce(), which may need to hold a different group lock (since the key of the operation is a topic partition on which many groups can reside), which can cause a deadlock.
   
   The above seems to be the only case where deadlock can be generated. For two other purgatories used in GroupCoordinator--Heartbeat and Join, their patterns are different from how produce purgatory is used. In these two cases, the pattern is that the caller holds a group level lock when calling checkAndComplete. Inside the DelayedOperation, tryComplete()/onComplete() also hold onto the same group lock. The key associated with the Join/Heartbeat delayed operation is group or group + member. This means only pending operations on the same group may have their watcher callback triggered.
   
   By splitting ReplicaManager.appendRecords() into 2 methods: appendToLocal() and checkPurgatory(), we hold a group lock to call in the former for serialization, but don't hold a group lock to the latter. This way, there is no room for deadlock. This means we can get rid of the `tryCompletePending` logic in `DelayedOperation.maybeTryComplete()`, which will address the issue in KAFKA-8334. It also simplifies the logic in DelayedOperation instead of making it more complicated.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org