You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:11 UTC
[3/30] git commit: KAFKA-664 RequestPurgatory should clean up
satisfied requests from watchers map. Also,
simplify the purge logic - purge based on an incoming request interval.
KAFKA-664 RequestPurgatory should clean up satisfied requests from watchers map. Also, simplify the purge logic - purge based on an incoming request interval.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04b52743
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04b52743
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04b52743
Branch: refs/heads/trunk
Commit: 04b52743f8eaa9057b0b5687fb3681e1bd1e2d49
Parents: 4e5a6fc
Author: Joel Koshy <jj...@gmail.com>
Authored: Mon Dec 17 16:35:32 2012 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Mon Dec 17 16:35:32 2012 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/server/KafkaApis.scala | 12 ++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 8 ++-
.../main/scala/kafka/server/RequestPurgatory.scala | 76 ++++++++-------
.../unit/kafka/integration/TopicMetadataTest.scala | 2 +-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 4 +-
5 files changed, 58 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ac90b20..5a85b04 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -40,8 +40,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val zkClient: ZkClient,
brokerId: Int) extends Logging {
- private val producerRequestPurgatory = new ProducerRequestPurgatory
- private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
+ private val producerRequestPurgatory =
+ new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
+ private val fetchRequestPurgatory =
+ new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
private val delayedRequestMetrics = new DelayedRequestMetrics
private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -496,7 +498,8 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* A holding pen for fetch requests waiting to be satisfied
*/
- class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, MessageSet](brokerId) {
+ class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
+ extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) {
this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
/**
@@ -633,7 +636,8 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* A holding pen for produce requests waiting to be satisfied.
*/
- private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey] {
+ private [kafka] class ProducerRequestPurgatory(purgeInterval: Int)
+ extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) {
this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
protected def checkSatisfied(followerFetchRequestKey: RequestKey,
http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3488908..5754676 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -161,5 +161,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the frequency with which the highwater mark is saved out to disk */
val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
-
+
+ /* the purge interval (in number of requests) of the fetch request purgatory */
+ val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
+
+ /* the purge interval (in number of requests) of the producer request purgatory */
+ val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 1a3dbd3..8fb9865 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -18,13 +18,13 @@
package kafka.server
import scala.collection._
-import java.util.LinkedList
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.network._
import kafka.utils._
-import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
+import java.util
+import com.yammer.metrics.core.Gauge
/**
@@ -61,11 +61,21 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
+abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000)
+ extends Logging with KafkaMetricsGroup {
/* a list of requests watching each key */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
+ private val requestCounter = new AtomicInteger(0)
+
+ newGauge(
+ "PurgatorySize",
+ new Gauge[Int] {
+ def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
+ }
+ )
+
newGauge(
"NumDelayedRequests",
new Gauge[Int] {
@@ -78,10 +88,19 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()
+ def purgeSatisfied() {
+ expiredRequestReaper.forcePurge()
+ }
+
/**
* Add a new delayed request watching the contained keys
*/
def watch(delayedRequest: T) {
+ if (requestCounter.getAndIncrement() >= purgeInterval) {
+ requestCounter.set(0)
+ purgeSatisfied()
+ }
+
for(key <- delayedRequest.keys) {
var lst = watchersFor(key)
lst.add(delayedRequest)
@@ -125,37 +144,29 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
*/
private class Watchers {
- /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
- private val CleanupThresholdSize = 100
- private val CleanupThresholdPrct = 0.5
- private val requests = new LinkedList[T]
+ private val requests = new util.ArrayList[T]
- /* you can only change this if you have added something or marked something satisfied */
- var liveCount = 0.0
+ def numRequests = requests.size
def add(t: T) {
synchronized {
requests.add(t)
- liveCount += 1
- maybePurge()
}
}
- private def maybePurge() {
- if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
+ def purgeSatisfied(): Int = {
+ synchronized {
val iter = requests.iterator()
+ var purged = 0
while(iter.hasNext) {
val curr = iter.next
- if(curr.satisfied.get())
+ if(curr.satisfied.get()) {
iter.remove()
+ purged += 1
+ }
}
- }
- }
-
- def decLiveCount() {
- synchronized {
- liveCount -= 1
+ purged
}
}
@@ -177,7 +188,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) {
response += curr
- liveCount -= 1
expiredRequestReaper.satisfyRequest()
}
}
@@ -193,17 +203,16 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
*/
private class ExpiredRequestReaper extends Runnable with Logging {
this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
- /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
- private val CleanupThresholdSize = 100
- private val CleanupThresholdPrct = 0.5
private val delayed = new DelayQueue[T]
private val running = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
- private val needsPurge = new AtomicBoolean(false)
+
/* The count of elements in the delay queue that are unsatisfied */
private [kafka] val unsatisfied = new AtomicInteger(0)
+ def numRequests = delayed.size()
+
/** Main loop for the expiry thread */
def run() {
while(running.get) {
@@ -214,10 +223,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
}
} catch {
case ie: InterruptedException =>
- if(needsPurge.getAndSet(false)) {
- val purged = purgeSatisfied()
- debug("Forced purge of " + purged + " requests from delay queue.")
- }
+ val purged = purgeSatisfied()
+ debug("Purged %d requests from delay queue.".format(purged))
+ val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
+ debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
case e: Exception =>
error("Error in long poll expiry thread: ", e)
}
@@ -229,12 +238,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
def enqueue(t: T) {
delayed.add(t)
unsatisfied.incrementAndGet()
- if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
- forcePurge()
}
- private def forcePurge() {
- needsPurge.set(true)
+ def forcePurge() {
expirationThread.interrupt()
}
@@ -259,8 +265,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
unsatisfied.getAndDecrement()
- for(key <- curr.keys)
- watchersFor(key).decLiveCount()
return curr
}
}
@@ -284,4 +288,4 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6d88a4f..54a5a06 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -97,7 +97,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
// topic metadata request only requires 1 call from the replica manager
val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
- EasyMock.expect(replicaManager.config).andReturn(configs.head).times(2)
+ EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
EasyMock.replay(replicaManager)
// create a topic metadata request
http://git-wip-us.apache.org/repos/asf/kafka/blob/04b52743/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 3aae5ce..0377e08 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -81,7 +81,7 @@ class SimpleFetchTest extends JUnit3Suite {
partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L
EasyMock.reset(replicaManager)
- EasyMock.expect(replicaManager.config).andReturn(configs.head)
+ EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
EasyMock.replay(replicaManager)
@@ -175,7 +175,7 @@ class SimpleFetchTest extends JUnit3Suite {
partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]
EasyMock.reset(replicaManager)
- EasyMock.expect(replicaManager.config).andReturn(configs.head)
+ EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO))
EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId))
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()