You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/06/10 20:29:11 UTC
kafka git commit: KAFKA-2253;
fix deadlock between removeWatchersLock and watcher operations list
lock; reviewed by Onur Karaman and Jiangjie Qin
Repository: kafka
Updated Branches:
refs/heads/trunk ca6d01bc6 -> 9f80665ec
KAFKA-2253; fix deadlock between removeWatchersLock and watcher operations list lock; reviewed by Onur Karaman and Jiangjie Qin
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f80665e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f80665e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f80665e
Branch: refs/heads/trunk
Commit: 9f80665ec6deff8525b61096034af8dc0cc9a03c
Parents: ca6d01b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jun 10 11:28:53 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 10 11:28:53 2015 -0700
----------------------------------------------------------------------
.../scala/kafka/server/DelayedOperation.scala | 45 +++++++++++++-------
1 file changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9f80665e/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 123078d..0b53532 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -189,8 +189,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
// If the operation is already completed, stop adding it to the rest of the watcher list.
if (operation.isCompleted())
return false
- val watchers = watchersFor(key)
- watchers.watch(operation)
+ watchForOperation(key, operation)
if (!watchCreated) {
watchCreated = true
@@ -241,22 +240,34 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
def delayed() = timeoutTimer.size
/*
- * Return the watch list of the given key
+ * Return all the current watcher lists,
+ * note that the returned watchers may be removed from the list by other threads
*/
- private def watchersFor(key: Any) = inReadLock(removeWatchersLock) { watchersForKey.getAndMaybePut(key) }
+ private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values }
/*
- * Return all the current watcher lists
+ * Return the watch list of the given key, note that we need to
+ * grab the removeWatchersLock to avoid the operation being added to a removed watcher list
*/
- private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values }
+ private def watchForOperation(key: Any, operation: T) {
+ inReadLock(removeWatchersLock) {
+ val watcher = watchersForKey.getAndMaybePut(key)
+ watcher.watch(operation)
+ }
+ }
/*
* Remove the key from watcher lists if its list is empty
*/
- private def removeKeyIfEmpty(key: Any) = inWriteLock(removeWatchersLock) {
- val watchers = watchersForKey.get(key)
- if (watchers != null && watchers.watched == 0) {
- watchersForKey.remove(key)
+ private def removeKeyIfEmpty(key: Any, watchers: Watchers) {
+ inWriteLock(removeWatchersLock) {
+ // if the current key is no longer correlated to the watchers to remove, skip
+ if (watchersForKey.get(key) != watchers)
+ return
+
+ if (watchers != null && watchers.watched == 0) {
+ watchersForKey.remove(key)
+ }
}
}
@@ -298,10 +309,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
iter.remove()
}
}
-
- if (operations.size == 0)
- removeKeyIfEmpty(key)
}
+
+ if (operations.size == 0)
+ removeKeyIfEmpty(key, this)
+
completed
}
@@ -317,10 +329,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
purged += 1
}
}
-
- if (operations.size == 0)
- removeKeyIfEmpty(key)
}
+
+ if (operations.size == 0)
+ removeKeyIfEmpty(key, this)
+
purged
}
}