You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:11 UTC

[3/30] git commit: KAFKA-664 RequestPurgatory should clean up satisfied requests from watchers map. Also, simplify the purge logic - purge based on an incoming request interval.

KAFKA-664 RequestPurgatory should clean up satisfied requests from watchers map. Also, simplify the purge logic - purge based on an incoming request interval.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04b52743
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04b52743
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04b52743

Branch: refs/heads/trunk
Commit: 04b52743f8eaa9057b0b5687fb3681e1bd1e2d49
Parents: 4e5a6fc
Author: Joel Koshy <jj...@gmail.com>
Authored: Mon Dec 17 16:35:32 2012 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Mon Dec 17 16:35:32 2012 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   12 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |    8 ++-
 .../main/scala/kafka/server/RequestPurgatory.scala |   76 ++++++++-------
 .../unit/kafka/integration/TopicMetadataTest.scala |    2 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    4 +-
 5 files changed, 58 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ac90b20..5a85b04 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -40,8 +40,10 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val zkClient: ZkClient,
                 brokerId: Int) extends Logging {
 
-  private val producerRequestPurgatory = new ProducerRequestPurgatory
-  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
+  private val producerRequestPurgatory =
+    new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
+  private val fetchRequestPurgatory =
+    new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
   private val delayedRequestMetrics = new DelayedRequestMetrics
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -496,7 +498,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, MessageSet](brokerId) {
+  class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
+          extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) {
     this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
     /**
@@ -633,7 +636,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey] {
+  private [kafka] class ProducerRequestPurgatory(purgeInterval: Int)
+          extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) {
     this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
 
     protected def checkSatisfied(followerFetchRequestKey: RequestKey,

http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3488908..5754676 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -161,5 +161,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   
   /* the frequency with which the highwater mark is saved out to disk */
   val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
-  
+
+  /* the purge interval (in number of requests) of the fetch request purgatory */
+  val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
+
+  /* the purge interval (in number of requests) of the producer request purgatory */
+  val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+
  }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 1a3dbd3..8fb9865 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -18,13 +18,13 @@
 package kafka.server
 
 import scala.collection._
-import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.network._
 import kafka.utils._
-import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import java.util
+import com.yammer.metrics.core.Gauge
 
 
 /**
@@ -61,11 +61,21 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
  * this function handles delayed requests that have hit their time limit without being satisfied.
  *
  */
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
+abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000)
+        extends Logging with KafkaMetricsGroup {
 
   /* a list of requests watching each key */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
 
+  private val requestCounter = new AtomicInteger(0)
+
+  newGauge(
+    "PurgatorySize",
+    new Gauge[Int] {
+      def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
+    }
+  )
+
   newGauge(
     "NumDelayedRequests",
     new Gauge[Int] {
@@ -78,10 +88,19 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
   expirationThread.start()
 
+  def purgeSatisfied() {
+    expiredRequestReaper.forcePurge()
+  }
+
   /**
    * Add a new delayed request watching the contained keys
    */
   def watch(delayedRequest: T) {
+    if (requestCounter.getAndIncrement() >= purgeInterval) {
+      requestCounter.set(0)
+      purgeSatisfied()
+    }
+
     for(key <- delayedRequest.keys) {
       var lst = watchersFor(key)
       lst.add(delayedRequest)
@@ -125,37 +144,29 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
    */
   private class Watchers {
 
-    /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
-    private val CleanupThresholdSize = 100
-    private val CleanupThresholdPrct = 0.5
 
-    private val requests = new LinkedList[T]
+    private val requests = new util.ArrayList[T]
 
-    /* you can only change this if you have added something or marked something satisfied */
-    var liveCount = 0.0
+    def numRequests = requests.size
 
     def add(t: T) {
       synchronized {
         requests.add(t)
-        liveCount += 1
-        maybePurge()
       }
     }
 
-    private def maybePurge() {
-      if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
+    def purgeSatisfied(): Int = {
+      synchronized {
         val iter = requests.iterator()
+        var purged = 0
         while(iter.hasNext) {
           val curr = iter.next
-          if(curr.satisfied.get())
+          if(curr.satisfied.get()) {
             iter.remove()
+            purged += 1
+          }
         }
-      }
-    }
-
-    def decLiveCount() {
-      synchronized {
-        liveCount -= 1
+        purged
       }
     }
 
@@ -177,7 +188,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
               val updated = curr.satisfied.compareAndSet(false, true)
               if(updated == true) {
                 response += curr
-                liveCount -= 1
                 expiredRequestReaper.satisfyRequest()
               }
             }
@@ -193,17 +203,16 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
    */
   private class ExpiredRequestReaper extends Runnable with Logging {
     this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
-    /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
-    private val CleanupThresholdSize = 100
-    private val CleanupThresholdPrct = 0.5
 
     private val delayed = new DelayQueue[T]
     private val running = new AtomicBoolean(true)
     private val shutdownLatch = new CountDownLatch(1)
-    private val needsPurge = new AtomicBoolean(false)
+
     /* The count of elements in the delay queue that are unsatisfied */
     private [kafka] val unsatisfied = new AtomicInteger(0)
 
+    def numRequests = delayed.size()
+
     /** Main loop for the expiry thread */
     def run() {
       while(running.get) {
@@ -214,10 +223,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
           }
         } catch {
           case ie: InterruptedException =>
-            if(needsPurge.getAndSet(false)) {
-              val purged = purgeSatisfied()
-              debug("Forced purge of " + purged + " requests from delay queue.")
-            }
+            val purged = purgeSatisfied()
+            debug("Purged %d requests from delay queue.".format(purged))
+            val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
+            debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
           case e: Exception =>
             error("Error in long poll expiry thread: ", e)
         }
@@ -229,12 +238,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     def enqueue(t: T) {
       delayed.add(t)
       unsatisfied.incrementAndGet()
-      if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
-        forcePurge()
     }
 
-    private def forcePurge() {
-      needsPurge.set(true)
+    def forcePurge() {
       expirationThread.interrupt()
     }
 
@@ -259,8 +265,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
         val updated = curr.satisfied.compareAndSet(false, true)
         if(updated) {
           unsatisfied.getAndDecrement()
-          for(key <- curr.keys)
-            watchersFor(key).decLiveCount()
           return curr
         }
       }
@@ -284,4 +288,4 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6d88a4f..54a5a06 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -97,7 +97,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
     // topic metadata request only requires 1 call from the replica manager
     val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
-    EasyMock.expect(replicaManager.config).andReturn(configs.head).times(2)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
     EasyMock.replay(replicaManager)
 
     // create a topic metadata request

http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 3aae5ce..0377e08 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -81,7 +81,7 @@ class SimpleFetchTest extends JUnit3Suite {
     partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L
 
     EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.config).andReturn(configs.head)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
@@ -175,7 +175,7 @@ class SimpleFetchTest extends JUnit3Suite {
     partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]
 
     EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.config).andReturn(configs.head)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
     EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO))
     EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId))
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()