You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/09/05 07:23:59 UTC

git commit: kafka-1616; Purgatory Size and Num.Delayed.Request metrics are incorrect; patched by Guozhang Wang; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk ae2414127 -> ffb81a581


kafka-1616; Purgatory Size and Num.Delayed.Request metrics are incorrect; patched by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffb81a58
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffb81a58
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffb81a58

Branch: refs/heads/trunk
Commit: ffb81a581b6cbd3bd7338ec88022aa745a05b1c9
Parents: ae24141
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Sep 4 22:23:25 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Sep 4 22:23:25 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/RequestPurgatory.scala   | 64 +++++++++++---------
 .../kafka/server/RequestPurgatoryTest.scala     | 33 +++++++++-
 2 files changed, 67 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ffb81a58/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index ce06d2c..cf3ed4c 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -71,9 +71,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
   /* a list of requests watching each key */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
 
-  /* the number of requests being watched, duplicates added on different watchers are also counted */
-  private val watched = new AtomicInteger(0)
-
   /* background thread expiring requests that have been waiting too long */
   private val expiredRequestReaper = new ExpiredRequestReaper
   private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
@@ -81,14 +78,14 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
   newGauge(
     "PurgatorySize",
     new Gauge[Int] {
-      def value = watched.get() + expiredRequestReaper.numRequests
+      def value = watched()
     }
   )
 
   newGauge(
     "NumDelayedRequests",
     new Gauge[Int] {
-      def value = expiredRequestReaper.unsatisfied.get()
+      def value = delayed()
     }
   )
 
@@ -130,6 +127,21 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
       w.collectSatisfiedRequests()
   }
 
+  /*
+   * Return the size of the watched lists in the purgatory, which is the size of watch lists.
+   * Since an operation may still be in the watch lists even when it has been completed,
+   * this number may be larger than the number of real operations watched
+   */
+  def watched() = watchersForKey.values.map(_.watched).sum
+
+  /*
+   * Return the number of requests in the expiry reaper's queue
+   */
+  def delayed() = expiredRequestReaper.delayed()
+
+  /*
+   * Return the watch list for the given watch key
+   */
   private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
   
   /**
@@ -156,6 +168,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
   private class Watchers {
     private val requests = new util.ArrayList[T]
 
+    // return the size of the watch list
+    def watched() = requests.size()
+
     // potentially add the element to watch if it is not satisfied yet
     def checkAndMaybeAdd(t: T): Boolean = {
       synchronized {
@@ -168,7 +183,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
           return false
         }
         requests.add(t)
-        watched.getAndIncrement()
         return true
       }
     }
@@ -182,7 +196,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
           val curr = iter.next
           if(curr.satisfied.get()) {
             iter.remove()
-            watched.getAndDecrement()
             purged += 1
           }
         }
@@ -206,11 +219,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
             val satisfied = curr synchronized checkSatisfied(curr)
             if(satisfied) {
               iter.remove()
-              watched.getAndDecrement()
               val updated = curr.satisfied.compareAndSet(false, true)
               if(updated == true) {
                 response += curr
-                expiredRequestReaper.satisfyRequest()
               }
             }
           }
@@ -225,16 +236,13 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
    */
   private class ExpiredRequestReaper extends Runnable with Logging {
     this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
-
-    private val delayed = new DelayQueue[T]
     private val running = new AtomicBoolean(true)
     private val shutdownLatch = new CountDownLatch(1)
 
-    /* The count of elements in the delay queue that are unsatisfied */
-    private [kafka] val unsatisfied = new AtomicInteger(0)
-
-    def numRequests = delayed.size()
+    private val delayedQueue = new DelayQueue[T]
 
+    def delayed() = delayedQueue.size()
+    
     /** Main loop for the expiry thread */
     def run() {
       while(running.get) {
@@ -245,12 +253,17 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
               expire(curr)
             }
           }
-          if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge
-            debug("Beginning purgatory purge")
-            val purged = purgeSatisfied()
-            debug("Purged %d requests from delay queue.".format(purged))
+          // see if we need to purge the watch lists
+          if (RequestPurgatory.this.watched() >= purgeInterval) {
+            debug("Begin purging watch lists")
             val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
-            debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers))
+            debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers))
+          }
+          // see if we need to purge the delayed request queue
+          if (delayed() >= purgeInterval) {
+            debug("Begin purging delayed queue")
+            val purged = purgeSatisfied()
+            debug("Purged %d requests from delayed queue.".format(purged))
           }
         } catch {
           case e: Exception =>
@@ -262,8 +275,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
 
     /** Add a request to be expired */
     def enqueue(t: T) {
-      delayed.add(t)
-      unsatisfied.incrementAndGet()
+      delayedQueue.add(t)
     }
 
     /** Shutdown the expiry thread*/
@@ -274,20 +286,16 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
       debug("Shut down complete.")
     }
 
-    /** Record the fact that we satisfied a request in the stats for the expiry queue */
-    def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
-
     /**
      * Get the next expired event
      */
     private def pollExpired(): T = {
       while(true) {
-        val curr = delayed.poll(200L, TimeUnit.MILLISECONDS)
+        val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
         if (curr == null)
           return null.asInstanceOf[T]
         val updated = curr.satisfied.compareAndSet(false, true)
         if(updated) {
-          unsatisfied.getAndDecrement()
           return curr
         }
       }
@@ -301,7 +309,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
       var purged = 0
 
       // purge the delayed queue
-      val iter = delayed.iterator()
+      val iter = delayedQueue.iterator()
       while(iter.hasNext) {
         val curr = iter.next()
         if(curr.satisfied.get) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffb81a58/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index 168712d..a577f4a 100644
--- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -34,7 +34,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
   
   override def setUp() {
     super.setUp()
-    purgatory = new MockRequestPurgatory()
+    purgatory = new MockRequestPurgatory(5)
   }
   
   override def tearDown() {
@@ -73,8 +73,37 @@ class RequestPurgatoryTest extends JUnit3Suite {
     assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
     assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
   }
+
+  @Test
+  def testRequestPurge() {
+    val r1 = new DelayedRequest(Array("test1"), null, 100000L)
+    val r12 = new DelayedRequest(Array("test1", "test2"), null, 100000L)
+    val r23 = new DelayedRequest(Array("test2", "test3"), null, 100000L)
+    purgatory.checkAndMaybeWatch(r1)
+    purgatory.checkAndMaybeWatch(r12)
+    purgatory.checkAndMaybeWatch(r23)
+
+    assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched())
+    assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed())
+
+    // satisfy one of the requests, it should then be purged from the watch list with purge interval 5
+    r12.satisfied.set(true)
+    TestUtils.waitUntilTrue(() => purgatory.watched() == 3,
+      "Purgatory should have 3 watched elements instead of " +  + purgatory.watched(), 1000L)
+    TestUtils.waitUntilTrue(() => purgatory.delayed() == 3,
+      "Purgatory should still have 3 total delayed requests instead of " + purgatory.delayed(), 1000L)
+
+    // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5
+    purgatory.checkAndMaybeWatch(r1)
+    purgatory.checkAndMaybeWatch(r1)
+
+    TestUtils.waitUntilTrue(() => purgatory.watched() == 5,
+      "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L)
+    TestUtils.waitUntilTrue(() => purgatory.delayed() == 4,
+      "Purgatory should have 4 total delayed requests instead of " + purgatory.delayed(), 1000L)
+  }
   
-  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] {
+  class MockRequestPurgatory(purge: Int) extends RequestPurgatory[DelayedRequest](purgeInterval = purge) {
     val satisfied = mutable.Set[DelayedRequest]()
     val expired = mutable.Set[DelayedRequest]()
     def awaitExpiration(delayed: DelayedRequest) = {