You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/01 23:15:23 UTC
kafka git commit: kafka-2226;
NullPointerException in TestPurgatoryPerformance; patched by Yasuhiro Matsuda;
reviewed by Onur Karaman, Guozhang Wang and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 3d8494bca -> 9e894aa01
kafka-2226; NullPointerException in TestPurgatoryPerformance; patched by Yasuhiro Matsuda; reviewed by Onur Karaman, Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e894aa0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e894aa0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e894aa0
Branch: refs/heads/trunk
Commit: 9e894aa0173b14d64a900bcf780d6b7809368384
Parents: 3d8494b
Author: Yasuhiro Matsuda <ya...@gmail.com>
Authored: Mon Jun 1 14:15:16 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jun 1 14:15:16 2015 -0700
----------------------------------------------------------------------
.../main/scala/kafka/utils/timer/Timer.scala | 5 +-
.../scala/kafka/utils/timer/TimerTask.scala | 8 ++-
.../scala/kafka/utils/timer/TimerTaskList.scala | 65 ++++++++++++++------
.../scala/kafka/utils/timer/TimingWheel.scala | 5 +-
4 files changed, 60 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index b8cde82..bdd0e75 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -51,8 +51,9 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
- // already expired
- taskExecutor.submit(timerTaskEntry.timerTask)
+ // Already expired or cancelled
+ if (!timerTaskEntry.cancelled)
+ taskExecutor.submit(timerTaskEntry.timerTask)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/TimerTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
index 3407138..d6b3a2e 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
@@ -33,11 +33,15 @@ trait TimerTask extends Runnable {
synchronized {
// if this timerTask is already held by an existing timer task entry,
// we will remove such an entry first.
- if (timerTaskEntry != null && timerTaskEntry != entry) {
+ if (timerTaskEntry != null && timerTaskEntry != entry)
timerTaskEntry.remove()
- }
+
timerTaskEntry = entry
}
}
+ private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
+ timerTaskEntry
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index e7a9657..c4aeb5d 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -52,7 +52,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
var entry = root.next
while (entry ne root) {
val nextEntry = entry.next
- f(entry.timerTask)
+
+ if (!entry.cancelled) f(entry.timerTask)
+
entry = nextEntry
}
}
@@ -60,28 +62,43 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
// Add a timer task entry to this list
def add(timerTaskEntry: TimerTaskEntry): Unit = {
- synchronized {
- // put the timer task entry to the end of the list. (root.prev points to the tail entry)
- val tail = root.prev
- timerTaskEntry.next = root
- timerTaskEntry.prev = tail
- timerTaskEntry.list = this
- tail.next = timerTaskEntry
- root.prev = timerTaskEntry
- taskCounter.incrementAndGet()
+ var done = false
+ while (!done) {
+ // Remove the timer task entry if it is already in any other list
+ // We do this outside of the sync block below to avoid deadlocking.
+ // We may retry until timerTaskEntry.list becomes null.
+ timerTaskEntry.remove()
+
+ synchronized {
+ timerTaskEntry.synchronized {
+ if (timerTaskEntry.list == null) {
+ // put the timer task entry to the end of the list. (root.prev points to the tail entry)
+ val tail = root.prev
+ timerTaskEntry.next = root
+ timerTaskEntry.prev = tail
+ timerTaskEntry.list = this
+ tail.next = timerTaskEntry
+ root.prev = timerTaskEntry
+ taskCounter.incrementAndGet()
+ done = true
+ }
+ }
+ }
}
}
// Remove the specified timer task entry from this list
def remove(timerTaskEntry: TimerTaskEntry): Unit = {
synchronized {
- if (timerTaskEntry.list != null) {
- timerTaskEntry.next.prev = timerTaskEntry.prev
- timerTaskEntry.prev.next = timerTaskEntry.next
- timerTaskEntry.next = null
- timerTaskEntry.prev = null
- timerTaskEntry.list = null
- taskCounter.decrementAndGet()
+ timerTaskEntry.synchronized {
+ if (timerTaskEntry.list eq this) {
+ timerTaskEntry.next.prev = timerTaskEntry.prev
+ timerTaskEntry.prev.next = timerTaskEntry.next
+ timerTaskEntry.next = null
+ timerTaskEntry.prev = null
+ timerTaskEntry.list = null
+ taskCounter.decrementAndGet()
+ }
}
}
}
@@ -116,6 +133,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
+ @volatile
var list: TimerTaskList = null
var next: TimerTaskEntry = null
var prev: TimerTaskEntry = null
@@ -124,8 +142,19 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
// setTimerTaskEntry will remove it.
if (timerTask != null) timerTask.setTimerTaskEntry(this)
+ def cancelled: Boolean = {
+ timerTask.getTimerTaskEntry != this
+ }
+
def remove(): Unit = {
- if (list != null) list.remove(this)
+ var currentList = list
+ // If remove is called when another thread is moving the entry from a task entry list to another,
+ // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
+ // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
+ while (currentList != null) {
+ currentList.remove(this)
+ currentList = list
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
index e92aba3..f5b6efe 100644
--- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
@@ -125,7 +125,10 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.timerTask.expirationMs
- if (expiration < currentTime + tickMs) {
+ if (timerTaskEntry.cancelled) {
+ // Cancelled
+ false
+ } else if (expiration < currentTime + tickMs) {
// Already expired
false
} else if (expiration < currentTime + interval) {