You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/05 19:12:06 UTC

kafka git commit: KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without any locks

Repository: kafka
Updated Branches:
  refs/heads/trunk 2d19ad4bb -> a55e29631


KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without any locks

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma, Jun Rao, Jiangjie Qin, Guozhang Wang

Closes #2195 from hachikuji/KAFKA-3994-linked-queue


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a55e2963
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a55e2963
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a55e2963

Branch: refs/heads/trunk
Commit: a55e2963116eb11252b957304b4f399eff3df9c2
Parents: 2d19ad4
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Dec 5 11:12:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Dec 5 11:12:03 2016 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/DelayedHeartbeat.scala    |  5 ++
 .../scala/kafka/coordinator/DelayedJoin.scala   |  8 +-
 .../scala/kafka/server/DelayedOperation.scala   | 90 +++++++++++---------
 .../other/kafka/TestPurgatoryPerformance.scala  |  1 -
 .../kafka/server/DelayedOperationTest.scala     |  4 +-
 5 files changed, 61 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a55e2963/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index 8e250c3..b05186c 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -29,6 +29,11 @@ private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                             heartbeatDeadline: Long,
                                             sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {
+
+  // overridden since tryComplete already synchronizes on the group. This makes it safe to
+  // call purgatory operations while holding the group lock.
+  override def safeTryComplete(): Boolean = tryComplete()
+
   override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
   override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55e2963/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
index a62884a..8744f16 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
@@ -31,8 +31,12 @@ import kafka.server.DelayedOperation
  */
 private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
                                        group: GroupMetadata,
-                                       sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout) {
+                                       rebalanceTimeout: Long)
+  extends DelayedOperation(rebalanceTimeout) {
+
+  // overridden since tryComplete already synchronizes on the group. This makes it safe to
+  // call purgatory operations while holding the group lock.
+  override def safeTryComplete(): Boolean = tryComplete()
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
   override def onExpiration() = coordinator.onExpireJoin()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55e2963/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index dbee092..074b66e 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -17,20 +17,17 @@
 
 package kafka.server
 
-import kafka.utils._
-import kafka.utils.timer._
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.metrics.KafkaMetricsGroup
-
-import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import scala.collection._
-
 import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils._
+import kafka.utils.timer._
 
+import scala.collection._
 
 /**
  * An operation whose processing needs to be delayed for at most the given delayMs. For example
@@ -75,7 +72,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
   /**
    * Check if the delayed operation is already completed
    */
-  def isCompleted(): Boolean = completed.get()
+  def isCompleted: Boolean = completed.get()
 
   /**
    * Call-back to execute when a delayed operation gets expired and hence forced to complete.
@@ -88,7 +85,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
    */
   def onComplete(): Unit
 
-  /*
+  /**
    * Try to complete the delayed operation by first checking if the operation
    * can be completed by now. If yes execute the completion logic by calling
    * forceComplete() and return true iff forceComplete returns true; otherwise return false
@@ -97,6 +94,16 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
    */
   def tryComplete(): Boolean
 
+  /**
+   * Thread-safe variant of tryComplete(). This can be overridden if the operation provides its
+   * own synchronization.
+   */
+  def safeTryComplete(): Boolean = {
+    synchronized {
+      tryComplete()
+    }
+  }
+
   /*
    * run() method defines a task that is executed on timeout
    */
@@ -185,14 +192,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
     // operation is unnecessarily added for watch. However, this is a less severe issue since the
     // expire reaper will clean it up periodically.
 
-    var isCompletedByMe = operation synchronized operation.tryComplete()
+    var isCompletedByMe = operation.safeTryComplete()
     if (isCompletedByMe)
       return true
 
     var watchCreated = false
     for(key <- watchKeys) {
       // If the operation is already completed, stop adding it to the rest of the watcher list.
-      if (operation.isCompleted())
+      if (operation.isCompleted)
         return false
       watchForOperation(key, operation)
 
@@ -202,14 +209,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
       }
     }
 
-    isCompletedByMe = operation synchronized operation.tryComplete()
+    isCompletedByMe = operation.safeTryComplete()
     if (isCompletedByMe)
       return true
 
     // if it cannot be completed by now and hence is watched, add to the expire queue also
-    if (! operation.isCompleted()) {
+    if (!operation.isCompleted) {
       timeoutTimer.add(operation)
-      if (operation.isCompleted()) {
+      if (operation.isCompleted) {
         // cancel the timer task
         operation.cancel()
       }
@@ -237,7 +244,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
    * on multiple lists, and some of its watched entries may still be in the watch lists
    * even when it has been completed, this number may be larger than the number of real operations watched
    */
-  def watched() = allWatchers.map(_.watched).sum
+  def watched() = allWatchers.map(_.countWatched).sum
 
   /**
    * Return the number of delayed operations in the expiry queue
@@ -270,7 +277,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
       if (watchersForKey.get(key) != watchers)
         return
 
-      if (watchers != null && watchers.watched == 0) {
+      if (watchers != null && watchers.isEmpty) {
         watchersForKey.remove(key)
       }
     }
@@ -289,35 +296,35 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
    * A linked list of watched delayed operations based on some key
    */
   private class Watchers(val key: Any) {
+    private[this] val operations = new ConcurrentLinkedQueue[T]()
 
-    private[this] val operations = new LinkedList[T]()
+    // count the current number of watched operations. This is O(n), so use isEmpty() if possible
+    def countWatched: Int = operations.size
 
-    def watched: Int = operations synchronized operations.size
+    def isEmpty: Boolean = operations.isEmpty
 
     // add the element to watch
     def watch(t: T) {
-      operations synchronized operations.add(t)
+      operations.add(t)
     }
 
     // traverse the list and try to complete some watched elements
     def tryCompleteWatched(): Int = {
-
       var completed = 0
-      operations synchronized {
-        val iter = operations.iterator()
-        while (iter.hasNext) {
-          val curr = iter.next()
-          if (curr.isCompleted) {
-            // another thread has completed this operation, just remove it
-            iter.remove()
-          } else if (curr synchronized curr.tryComplete()) {
-            completed += 1
-            iter.remove()
-          }
+
+      val iter = operations.iterator()
+      while (iter.hasNext) {
+        val curr = iter.next()
+        if (curr.isCompleted) {
+          // another thread has completed this operation, just remove it
+          iter.remove()
+        } else if (curr.safeTryComplete()) {
+          iter.remove()
+          completed += 1
         }
       }
 
-      if (operations.size == 0)
+      if (operations.isEmpty)
         removeKeyIfEmpty(key, this)
 
       completed
@@ -326,18 +333,17 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
     // traverse the list and purge elements that are already completed by others
     def purgeCompleted(): Int = {
       var purged = 0
-      operations synchronized {
-        val iter = operations.iterator()
-        while (iter.hasNext) {
-          val curr = iter.next()
-          if (curr.isCompleted) {
-            iter.remove()
-            purged += 1
-          }
+
+      val iter = operations.iterator()
+      while (iter.hasNext) {
+        val curr = iter.next()
+        if (curr.isCompleted) {
+          iter.remove()
+          purged += 1
         }
       }
 
-      if (operations.size == 0)
+      if (operations.isEmpty)
         removeKeyIfEmpty(key, this)
 
       purged

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55e2963/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 7636d96..1e41c31 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -238,7 +238,6 @@ object TestPurgatoryPerformance {
   }
 
   private class FakeOperation(delayMs: Long, size: Int, val latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) {
-    private[this] val data = new Array[Byte](size)
     val completesAt = System.currentTimeMillis + latencyMs
 
     def onExpiration(): Unit = {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55e2963/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 49ef9f6..f5f36f5 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -62,8 +62,8 @@ class DelayedOperationTest {
     assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
     r1.awaitExpiration()
     val elapsed = Time.SYSTEM.hiResClockMs - start
-    assertTrue("r1 completed due to expiration", r1.isCompleted())
-    assertFalse("r2 hasn't completed", r2.isCompleted())
+    assertTrue("r1 completed due to expiration", r1.isCompleted)
+    assertFalse("r2 hasn't completed", r2.isCompleted)
     assertTrue(s"Time for expiration $elapsed should at least $expiration", elapsed >= expiration)
   }