You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/01 23:15:23 UTC

kafka git commit: kafka-2226; NullPointerException in TestPurgatoryPerformance; patched by Yasuhiro Matsuda; reviewed by Onur Karaman, Guozhang Wang and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 3d8494bca -> 9e894aa01


kafka-2226; NullPointerException in TestPurgatoryPerformance; patched by Yasuhiro Matsuda; reviewed by Onur Karaman, Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e894aa0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e894aa0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e894aa0

Branch: refs/heads/trunk
Commit: 9e894aa0173b14d64a900bcf780d6b7809368384
Parents: 3d8494b
Author: Yasuhiro Matsuda <ya...@gmail.com>
Authored: Mon Jun 1 14:15:16 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jun 1 14:15:16 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/utils/timer/Timer.scala    |  5 +-
 .../scala/kafka/utils/timer/TimerTask.scala     |  8 ++-
 .../scala/kafka/utils/timer/TimerTaskList.scala | 65 ++++++++++++++------
 .../scala/kafka/utils/timer/TimingWheel.scala   |  5 +-
 4 files changed, 60 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index b8cde82..bdd0e75 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -51,8 +51,9 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20
 
   private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
     if (!timingWheel.add(timerTaskEntry)) {
-      // already expired
-      taskExecutor.submit(timerTaskEntry.timerTask)
+      // Already expired or cancelled
+      if (!timerTaskEntry.cancelled)
+        taskExecutor.submit(timerTaskEntry.timerTask)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/TimerTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
index 3407138..d6b3a2e 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
@@ -33,11 +33,15 @@ trait TimerTask extends Runnable {
     synchronized {
       // if this timerTask is already held by an existing timer task entry,
       // we will remove such an entry first.
-      if (timerTaskEntry != null && timerTaskEntry != entry) {
+      if (timerTaskEntry != null && timerTaskEntry != entry)
         timerTaskEntry.remove()
-      }
+
       timerTaskEntry = entry
     }
   }
 
+  private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
+    timerTaskEntry
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index e7a9657..c4aeb5d 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -52,7 +52,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
       var entry = root.next
       while (entry ne root) {
         val nextEntry = entry.next
-        f(entry.timerTask)
+
+        if (!entry.cancelled) f(entry.timerTask)
+
         entry = nextEntry
       }
     }
@@ -60,28 +62,43 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
 
   // Add a timer task entry to this list
   def add(timerTaskEntry: TimerTaskEntry): Unit = {
-    synchronized {
-      // put the timer task entry to the end of the list. (root.prev points to the tail entry)
-      val tail = root.prev
-      timerTaskEntry.next = root
-      timerTaskEntry.prev = tail
-      timerTaskEntry.list = this
-      tail.next = timerTaskEntry
-      root.prev = timerTaskEntry
-      taskCounter.incrementAndGet()
+    var done = false
+    while (!done) {
+      // Remove the timer task entry if it is already in any other list
+      // We do this outside of the sync block below to avoid deadlocking.
+      // We may retry until timerTaskEntry.list becomes null.
+      timerTaskEntry.remove()
+
+      synchronized {
+        timerTaskEntry.synchronized {
+          if (timerTaskEntry.list == null) {
+            // put the timer task entry to the end of the list. (root.prev points to the tail entry)
+            val tail = root.prev
+            timerTaskEntry.next = root
+            timerTaskEntry.prev = tail
+            timerTaskEntry.list = this
+            tail.next = timerTaskEntry
+            root.prev = timerTaskEntry
+            taskCounter.incrementAndGet()
+            done = true
+          }
+        }
+      }
     }
   }
 
   // Remove the specified timer task entry from this list
   def remove(timerTaskEntry: TimerTaskEntry): Unit = {
     synchronized {
-      if (timerTaskEntry.list != null) {
-        timerTaskEntry.next.prev = timerTaskEntry.prev
-        timerTaskEntry.prev.next = timerTaskEntry.next
-        timerTaskEntry.next = null
-        timerTaskEntry.prev = null
-        timerTaskEntry.list = null
-        taskCounter.decrementAndGet()
+      timerTaskEntry.synchronized {
+        if (timerTaskEntry.list eq this) {
+          timerTaskEntry.next.prev = timerTaskEntry.prev
+          timerTaskEntry.prev.next = timerTaskEntry.next
+          timerTaskEntry.next = null
+          timerTaskEntry.prev = null
+          timerTaskEntry.list = null
+          taskCounter.decrementAndGet()
+        }
       }
     }
   }
@@ -116,6 +133,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
 
 private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
 
+  @volatile
   var list: TimerTaskList = null
   var next: TimerTaskEntry = null
   var prev: TimerTaskEntry = null
@@ -124,8 +142,19 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
   // setTimerTaskEntry will remove it.
   if (timerTask != null) timerTask.setTimerTaskEntry(this)
 
+  def cancelled: Boolean = {
+    timerTask.getTimerTaskEntry != this
+  }
+
   def remove(): Unit = {
-    if (list != null) list.remove(this)
+    var currentList = list
+    // If remove is called when another thread is moving the entry from a task entry list to another,
+    // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
+    // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
+    while (currentList != null) {
+      currentList.remove(this)
+      currentList = list
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e894aa0/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
index e92aba3..f5b6efe 100644
--- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
@@ -125,7 +125,10 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
   def add(timerTaskEntry: TimerTaskEntry): Boolean = {
     val expiration = timerTaskEntry.timerTask.expirationMs
 
-    if (expiration < currentTime + tickMs) {
+    if (timerTaskEntry.cancelled) {
+      // Cancelled
+      false
+    } else if (expiration < currentTime + tickMs) {
       // Already expired
       false
     } else if (expiration < currentTime + interval) {