You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/06/10 20:29:11 UTC

kafka git commit: KAFKA-2253; fix deadlock between removeWatchersLock and watcher operations list lock; reviewed by Onur Karaman and Jiangjie Qin

Repository: kafka
Updated Branches:
  refs/heads/trunk ca6d01bc6 -> 9f80665ec


KAFKA-2253; fix deadlock between removeWatchersLock and watcher operations list lock; reviewed by Onur Karaman and Jiangjie Qin


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f80665e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f80665e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f80665e

Branch: refs/heads/trunk
Commit: 9f80665ec6deff8525b61096034af8dc0cc9a03c
Parents: ca6d01b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jun 10 11:28:53 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 10 11:28:53 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/DelayedOperation.scala   | 45 +++++++++++++-------
 1 file changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f80665e/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 123078d..0b53532 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -189,8 +189,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
       // If the operation is already completed, stop adding it to the rest of the watcher list.
       if (operation.isCompleted())
         return false
-      val watchers = watchersFor(key)
-      watchers.watch(operation)
+      watchForOperation(key, operation)
 
       if (!watchCreated) {
         watchCreated = true
@@ -241,22 +240,34 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
   def delayed() = timeoutTimer.size
 
   /*
-   * Return the watch list of the given key
+   * Return all the current watcher lists,
+   * note that the returned watchers may be removed from the list by other threads
    */
-  private def watchersFor(key: Any) = inReadLock(removeWatchersLock) { watchersForKey.getAndMaybePut(key) }
+  private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values }
 
   /*
-   * Return all the current watcher lists
+   * Return the watch list of the given key, note that we need to
+   * grab the removeWatchersLock to avoid the operation being added to a removed watcher list
    */
-  private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values }
+  private def watchForOperation(key: Any, operation: T) {
+    inReadLock(removeWatchersLock) {
+      val watcher = watchersForKey.getAndMaybePut(key)
+      watcher.watch(operation)
+    }
+  }
 
   /*
    * Remove the key from watcher lists if its list is empty
    */
-  private def removeKeyIfEmpty(key: Any) = inWriteLock(removeWatchersLock) {
-    val watchers = watchersForKey.get(key)
-    if (watchers != null && watchers.watched == 0) {
-      watchersForKey.remove(key)
+  private def removeKeyIfEmpty(key: Any, watchers: Watchers) {
+    inWriteLock(removeWatchersLock) {
+      // if the current key is no longer correlated to the watchers to remove, skip
+      if (watchersForKey.get(key) != watchers)
+        return
+
+      if (watchers != null && watchers.watched == 0) {
+        watchersForKey.remove(key)
+      }
     }
   }
 
@@ -298,10 +309,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
             iter.remove()
           }
         }
-
-        if (operations.size == 0)
-          removeKeyIfEmpty(key)
       }
+
+      if (operations.size == 0)
+        removeKeyIfEmpty(key, this)
+
       completed
     }
 
@@ -317,10 +329,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
             purged += 1
           }
         }
-
-        if (operations.size == 0)
-          removeKeyIfEmpty(key)
       }
+
+      if (operations.size == 0)
+        removeKeyIfEmpty(key, this)
+
       purged
     }
   }