You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/09/05 07:23:59 UTC
git commit: kafka-1616;
Purgatory Size and Num.Delayed.Request metrics are incorrect;
patched by Guozhang Wang; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk ae2414127 -> ffb81a581
kafka-1616; Purgatory Size and Num.Delayed.Request metrics are incorrect; patched by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffb81a58
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffb81a58
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffb81a58
Branch: refs/heads/trunk
Commit: ffb81a581b6cbd3bd7338ec88022aa745a05b1c9
Parents: ae24141
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Sep 4 22:23:25 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Sep 4 22:23:25 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/server/RequestPurgatory.scala | 64 +++++++++++---------
.../kafka/server/RequestPurgatoryTest.scala | 33 +++++++++-
2 files changed, 67 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ffb81a58/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index ce06d2c..cf3ed4c 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -71,9 +71,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
/* a list of requests watching each key */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
- /* the number of requests being watched, duplicates added on different watchers are also counted */
- private val watched = new AtomicInteger(0)
-
/* background thread expiring requests that have been waiting too long */
private val expiredRequestReaper = new ExpiredRequestReaper
private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
@@ -81,14 +78,14 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
newGauge(
"PurgatorySize",
new Gauge[Int] {
- def value = watched.get() + expiredRequestReaper.numRequests
+ def value = watched()
}
)
newGauge(
"NumDelayedRequests",
new Gauge[Int] {
- def value = expiredRequestReaper.unsatisfied.get()
+ def value = delayed()
}
)
@@ -130,6 +127,21 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
w.collectSatisfiedRequests()
}
+ /*
+ * Return the size of the watched lists in the purgatory, which is the size of watch lists.
+ * Since an operation may still be in the watch lists even when it has been completed,
+ * this number may be larger than the number of real operations watched
+ */
+ def watched() = watchersForKey.values.map(_.watched).sum
+
+ /*
+ * Return the number of requests in the expiry reaper's queue
+ */
+ def delayed() = expiredRequestReaper.delayed()
+
+ /*
+ * Return the watch list for the given watch key
+ */
private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
/**
@@ -156,6 +168,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
private class Watchers {
private val requests = new util.ArrayList[T]
+ // return the size of the watch list
+ def watched() = requests.size()
+
// potentially add the element to watch if it is not satisfied yet
def checkAndMaybeAdd(t: T): Boolean = {
synchronized {
@@ -168,7 +183,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
return false
}
requests.add(t)
- watched.getAndIncrement()
return true
}
}
@@ -182,7 +196,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
val curr = iter.next
if(curr.satisfied.get()) {
iter.remove()
- watched.getAndDecrement()
purged += 1
}
}
@@ -206,11 +219,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
val satisfied = curr synchronized checkSatisfied(curr)
if(satisfied) {
iter.remove()
- watched.getAndDecrement()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) {
response += curr
- expiredRequestReaper.satisfyRequest()
}
}
}
@@ -225,16 +236,13 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
*/
private class ExpiredRequestReaper extends Runnable with Logging {
this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
-
- private val delayed = new DelayQueue[T]
private val running = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
- /* The count of elements in the delay queue that are unsatisfied */
- private [kafka] val unsatisfied = new AtomicInteger(0)
-
- def numRequests = delayed.size()
+ private val delayedQueue = new DelayQueue[T]
+ def delayed() = delayedQueue.size()
+
/** Main loop for the expiry thread */
def run() {
while(running.get) {
@@ -245,12 +253,17 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
expire(curr)
}
}
- if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge
- debug("Beginning purgatory purge")
- val purged = purgeSatisfied()
- debug("Purged %d requests from delay queue.".format(purged))
+ // see if we need to purge the watch lists
+ if (RequestPurgatory.this.watched() >= purgeInterval) {
+ debug("Begin purging watch lists")
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
- debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers))
+ debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers))
+ }
+ // see if we need to purge the delayed request queue
+ if (delayed() >= purgeInterval) {
+ debug("Begin purging delayed queue")
+ val purged = purgeSatisfied()
+ debug("Purged %d requests from delayed queue.".format(purged))
}
} catch {
case e: Exception =>
@@ -262,8 +275,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
/** Add a request to be expired */
def enqueue(t: T) {
- delayed.add(t)
- unsatisfied.incrementAndGet()
+ delayedQueue.add(t)
}
/** Shutdown the expiry thread*/
@@ -274,20 +286,16 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
debug("Shut down complete.")
}
- /** Record the fact that we satisfied a request in the stats for the expiry queue */
- def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
-
/**
* Get the next expired event
*/
private def pollExpired(): T = {
while(true) {
- val curr = delayed.poll(200L, TimeUnit.MILLISECONDS)
+ val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
if (curr == null)
return null.asInstanceOf[T]
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
- unsatisfied.getAndDecrement()
return curr
}
}
@@ -301,7 +309,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
var purged = 0
// purge the delayed queue
- val iter = delayed.iterator()
+ val iter = delayedQueue.iterator()
while(iter.hasNext) {
val curr = iter.next()
if(curr.satisfied.get) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ffb81a58/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index 168712d..a577f4a 100644
--- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -34,7 +34,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
override def setUp() {
super.setUp()
- purgatory = new MockRequestPurgatory()
+ purgatory = new MockRequestPurgatory(5)
}
override def tearDown() {
@@ -73,8 +73,37 @@ class RequestPurgatoryTest extends JUnit3Suite {
assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
}
+
+ @Test
+ def testRequestPurge() {
+ val r1 = new DelayedRequest(Array("test1"), null, 100000L)
+ val r12 = new DelayedRequest(Array("test1", "test2"), null, 100000L)
+ val r23 = new DelayedRequest(Array("test2", "test3"), null, 100000L)
+ purgatory.checkAndMaybeWatch(r1)
+ purgatory.checkAndMaybeWatch(r12)
+ purgatory.checkAndMaybeWatch(r23)
+
+ assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched())
+ assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed())
+
+ // satisfy one of the requests, it should then be purged from the watch list with purge interval 5
+ r12.satisfied.set(true)
+ TestUtils.waitUntilTrue(() => purgatory.watched() == 3,
+ "Purgatory should have 3 watched elements instead of " + + purgatory.watched(), 1000L)
+ TestUtils.waitUntilTrue(() => purgatory.delayed() == 3,
+ "Purgatory should still have 3 total delayed requests instead of " + purgatory.delayed(), 1000L)
+
+ // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5
+ purgatory.checkAndMaybeWatch(r1)
+ purgatory.checkAndMaybeWatch(r1)
+
+ TestUtils.waitUntilTrue(() => purgatory.watched() == 5,
+ "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L)
+ TestUtils.waitUntilTrue(() => purgatory.delayed() == 4,
+ "Purgatory should have 4 total delayed requests instead of " + purgatory.delayed(), 1000L)
+ }
- class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] {
+ class MockRequestPurgatory(purge: Int) extends RequestPurgatory[DelayedRequest](purgeInterval = purge) {
val satisfied = mutable.Set[DelayedRequest]()
val expired = mutable.Set[DelayedRequest]()
def awaitExpiration(delayed: DelayedRequest) = {