You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/08/15 04:07:01 UTC
kafka git commit: KAFKA-2084;
Add per-client-id byte-rate metrics and quota manager;
reviewed by Joel Koshy, Dong Lin, Jun Rao and Edward Ribeiro
Repository: kafka
Updated Branches:
refs/heads/trunk f6acfb089 -> bbb7d97ad
KAFKA-2084; Add per-client-id byte-rate metrics and quota manager; reviewed by Joel Koshy, Dong Lin, Jun Rao and Edward Ribeiro
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bbb7d97a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bbb7d97a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bbb7d97a
Branch: refs/heads/trunk
Commit: bbb7d97adefe5826f2e02a8e55423ea215c9f749
Parents: f6acfb0
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Fri Aug 14 18:51:48 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Aug 14 18:51:48 2015 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../org/apache/kafka/common/metrics/Quota.java | 18 ++
.../common/metrics/QuotaViolationException.java | 1 -
.../org/apache/kafka/common/metrics/Sensor.java | 11 +-
.../apache/kafka/common/metrics/stats/Rate.java | 23 +-
.../kafka/common/metrics/MetricsTest.java | 32 ++-
.../scala/kafka/server/ClientQuotaManager.scala | 250 +++++++++++++++++++
.../src/main/scala/kafka/server/KafkaApis.scala | 88 +++++--
.../main/scala/kafka/server/KafkaConfig.scala | 56 ++++-
.../main/scala/kafka/server/KafkaServer.scala | 24 +-
.../scala/kafka/server/ReplicaManager.scala | 4 +-
.../scala/kafka/server/ThrottledResponse.scala | 46 ++++
.../scala/kafka/utils/ShutdownableThread.scala | 3 +
.../integration/kafka/api/QuotasTest.scala | 194 ++++++++++++++
.../kafka/server/ClientQuotaManagerTest.scala | 159 ++++++++++++
.../unit/kafka/server/KafkaConfigTest.scala | 6 +
.../ThrottledResponseExpirationTest.scala | 90 +++++++
17 files changed, 945 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 864427b..c7f66be 100644
--- a/build.gradle
+++ b/build.gradle
@@ -253,6 +253,7 @@ project(':core') {
testCompile "$easymock"
testCompile 'org.objenesis:objenesis:1.2'
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
+ testCompile project(path: ':clients', configuration: 'archives')
testRuntime "$slf4jlog4j"
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
index d82bb0c..a3535dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
@@ -49,4 +49,22 @@ public final class Quota {
return (upper && value <= bound) || (!upper && value >= bound);
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) this.bound;
+ result = prime * result + (this.upper ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!(obj instanceof Quota))
+ return false;
+ Quota that = (Quota) obj;
+ return (that.bound == this.bound) && (this.upper == this.upper);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
index a451e53..fbe03f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
@@ -28,5 +28,4 @@ public class QuotaViolationException extends KafkaException {
public QuotaViolationException(String m) {
super(m);
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ca823fd..4d55771 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -112,8 +112,14 @@ public final class Sensor {
if (config != null) {
Quota quota = config.quota();
if (quota != null) {
- if (!quota.acceptable(metric.value(timeMs)))
- throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound());
+ double value = metric.value(timeMs);
+ if (!quota.acceptable(value)) {
+ throw new QuotaViolationException(String.format(
+ "(%s) violated quota. Actual: (%f), Threshold: (%f)",
+ metric.metricName(),
+ quota.bound(),
+ value));
+ }
}
}
}
@@ -170,5 +176,4 @@ public final class Sensor {
synchronized List<KafkaMetric> metrics() {
return Collections.unmodifiableList(this.metrics);
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 98429da..fe43940 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
+
/**
* The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
* divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
@@ -58,26 +59,28 @@ public class Rate implements MeasurableStat {
@Override
public double measure(MetricConfig config, long now) {
double value = stat.measure(config, now);
- double elapsed = convert(now - stat.oldest(now).lastWindowMs);
- return value / elapsed;
+ // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete
+ long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs;
+ long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1);
+ return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs);
}
- private double convert(long time) {
+ private double convert(long timeMs) {
switch (unit) {
case NANOSECONDS:
- return time * 1000.0 * 1000.0;
+ return timeMs * 1000.0 * 1000.0;
case MICROSECONDS:
- return time * 1000.0;
+ return timeMs * 1000.0;
case MILLISECONDS:
- return time;
+ return timeMs;
case SECONDS:
- return time / 1000.0;
+ return timeMs / 1000.0;
case MINUTES:
- return time / (60.0 * 1000.0);
+ return timeMs / (60.0 * 1000.0);
case HOURS:
- return time / (60.0 * 60.0 * 1000.0);
+ return timeMs / (60.0 * 60.0 * 1000.0);
case DAYS:
- return time / (24.0 * 60.0 * 60.0 * 1000.0);
+ return timeMs / (24.0 * 60.0 * 60.0 * 1000.0);
default:
throw new IllegalStateException("Unknown unit: " + unit);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 544e120..0a7dcd8 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -37,9 +37,9 @@ import org.junit.Test;
public class MetricsTest {
private static final double EPS = 0.000001;
-
- MockTime time = new MockTime();
- Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
+ private MockTime time = new MockTime();
+ private MetricConfig config = new MetricConfig();
+ private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time);
@Test
public void testMetricName() {
@@ -77,19 +77,33 @@ public class MetricsTest {
s2.add(new MetricName("s2.total", "grp1"), new Total());
s2.record(5.0);
- for (int i = 0; i < 10; i++)
+ int sum = 0;
+ int count = 10;
+ for (int i = 0; i < count; i++) {
s.record(i);
+ sum += i;
+ }
+ // prior to any time passing
+ double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
+ assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
+ metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
// pretend 2 seconds passed...
- time.sleep(2000);
+ long sleepTimeMs = 2;
+ time.sleep(sleepTimeMs * 1000);
+ elapsedSecs += sleepTimeMs;
assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS);
assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS);
- assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS);
+ assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS);
assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS);
- assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS);
- assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
- assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS);
+ assertEquals("Rate(0...9) = 1.40625",
+ sum / elapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS);
+ assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
+ count / elapsedSecs,
+ metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
+ assertEquals("Count(0...9) = 10",
+ (double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS);
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
new file mode 100644
index 0000000..9f8473f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.concurrent.{DelayQueue, TimeUnit}
+
+import kafka.utils.{ShutdownableThread, Logging}
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.kafka.common.utils.Time
+
+/**
+ * Represents the sensors aggregated per client
+ * @param quotaSensor @Sensor that tracks the quota
+ * @param throttleTimeSensor @Sensor that tracks the throttle time
+ */
+private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor)
+
+/**
+ * Configuration settings for quota management
+ * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client
+ * @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y"
+ * @param numQuotaSamples The number of samples to retain in memory
+ * @param quotaWindowSizeSeconds The time span of each sample
+ *
+ */
+case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
+ ClientQuotaManagerConfig.QuotaBytesPerSecondDefault,
+ quotaBytesPerSecondOverrides: String =
+ ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides,
+ numQuotaSamples: Int =
+ ClientQuotaManagerConfig.DefaultNumQuotaSamples,
+ quotaWindowSizeSeconds: Int =
+ ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds)
+
+object ClientQuotaManagerConfig {
+ val QuotaBytesPerSecondDefault = Long.MaxValue
+ val QuotaBytesPerSecondOverrides = ""
+ // Always have 10 whole windows + 1 current window
+ val DefaultNumQuotaSamples = 11
+ val DefaultQuotaWindowSizeSeconds = 1
+ val MaxThrottleTimeSeconds = 30
+}
+
+/**
+ * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
+ * for all clients.
+ * @param config @ClientQuotaManagerConfig quota configs
+ * @param metrics @Metrics Metrics instance
+ * @param apiKey API Key for the request
+ * @param time @Time object to use
+ */
+class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+ private val metrics: Metrics,
+ private val apiKey: String,
+ private val time: Time) extends Logging {
+ private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides)
+ private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault)
+ private val lock = new ReentrantReadWriteLock()
+ private val delayQueue = new DelayQueue[ThrottledResponse]()
+ val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
+ throttledRequestReaper.start()
+
+ private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
+ delayQueueSensor.add(new MetricName("queue-size",
+ apiKey,
+ "Tracks the size of the delay queue"), new Total())
+
+ /**
+ * Reaper thread that triggers callbacks on all throttled requests
+ * @param delayQueue DelayQueue to dequeue from
+ */
+ class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
+ "ThrottledRequestReaper-%s".format(apiKey), false) {
+
+ override def doWork(): Unit = {
+ val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
+ if (response != null) {
+ // Decrement the size of the delay queue
+ delayQueueSensor.record(-1)
+ trace("Response throttled for: " + response.delayTimeMs + " ms")
+ response.execute()
+ }
+ }
+ }
+
+ /**
+ * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.)
+ * @param clientId clientId that produced the data
+ * @param value amount of data written in bytes
+ * @param callback Callback function. This will be triggered immediately if quota is not violated.
+ * If there is a quota violation, this callback will be triggered after a delay
+ * @return Number of milliseconds to delay the response in case of Quota violation.
+ * Zero otherwise
+ */
+ def recordAndMaybeThrottle(clientId: String, value: Int, callback: => Unit): Int = {
+ val clientSensors = getOrCreateQuotaSensors(clientId)
+ var delayTimeMs = 0L
+ try {
+ clientSensors.quotaSensor.record(value)
+ // trigger the callback immediately if quota is not violated
+ callback
+ } catch {
+ case qve: QuotaViolationException =>
+ // Compute the delay
+ val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
+ delayTimeMs = delayTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
+ delayQueue.add(new ThrottledResponse(time, delayTimeMs, callback))
+ delayQueueSensor.record()
+ clientSensors.throttleTimeSensor.record(delayTimeMs)
+ // If delayed, add the element to the delayQueue
+ logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), delayTimeMs))
+ }
+ delayTimeMs.toInt
+ }
+
+ /*
+ * This calculates the amount of time needed to bring the metric within quota
+ * assuming that no new metrics are recorded.
+ *
+ * Basically, if O is the observed rate and T is the target rate over a window of W, to bring O down to T,
+ * we need to add a delay of X to W such that O * W / (W + X) = T.
+ * Solving for X, we get X = (O - T)/T * W.
+ */
+ private def delayTime(metricValue: Double, config: MetricConfig): Long =
+ {
+ val quota = config.quota()
+ val difference = metricValue - quota.bound
+ val time = difference / quota.bound * config.timeWindowMs() * config.samples()
+ time.round
+ }
+
+ /**
+ * Returns the consumer quota for the specified clientId
+ * @return
+ */
+ private[server] def quota(clientId: String): Quota = overriddenQuota.getOrElse(clientId, defaultQuota)
+
+ /*
+ * This function either returns the sensors for a given client id or creates them if they don't exist
+ * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
+ */
+ private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
+
+ // Names of the sensors to access
+ val quotaSensorName = apiKey + "-" + clientId
+ val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId
+ var quotaSensor: Sensor = null
+ var throttleTimeSensor: Sensor = null
+
+ /* Acquire the read lock to fetch the sensors. It is safe to call getSensor from multiple threads.
+ * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor
+ * will acquire the write lock and prevent the sensors from being read while they are being created.
+ * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the
+ * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added.
+ * This read lock waits until the writer thread has released it's lock i.e. fully initialized the sensor
+ * at which point it is safe to read
+ */
+ lock.readLock().lock()
+ try {
+ quotaSensor = metrics.getSensor(quotaSensorName)
+ throttleTimeSensor = metrics.getSensor(throttleTimeSensorName)
+ }
+ finally {
+ lock.readLock().unlock()
+ }
+
+ /* If the sensor is null, try to create it else return the created sensor
+ * Also if quota sensor is null, the throttle time sensor must be null
+ */
+ if (quotaSensor == null) {
+ /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it.
+ * Note that multiple threads may acquire the write lock if they all see a null sensor initially
+ * In this case, the writer checks the sensor after acquiring the lock again.
+ * This is safe from Double Checked Locking because the references are read
+ * after acquiring read locks and hence they cannot see a partially published reference
+ */
+ lock.writeLock().lock()
+ try {
+ quotaSensor = metrics.getSensor(quotaSensorName)
+ if (quotaSensor == null) {
+ // create the throttle time sensor also
+ throttleTimeSensor = metrics.sensor(throttleTimeSensorName)
+ throttleTimeSensor.add(new MetricName("throttle-time",
+ apiKey,
+ "Tracking average throttle-time per client",
+ "client-id",
+ clientId), new Avg())
+ quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota(clientId)))
+ quotaSensor.add(clientRateMetricName(clientId), new Rate())
+ }
+ } finally {
+ lock.writeLock().unlock()
+ }
+ }
+ // return the read or created sensors
+ ClientSensors(quotaSensor, throttleTimeSensor)
+ }
+
+ private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+ new MetricConfig()
+ .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+ .samples(config.numQuotaSamples)
+ .quota(quota)
+ }
+
+ /* Construct a Map of (clientId -> Quota)
+ * The input config is specified as a comma-separated K=V pairs
+ */
+ private def initQuotaMap(input: String): Map[String, Quota] = {
+ // If empty input, return an empty map
+ if (input.trim.length == 0)
+ Map[String, Quota]()
+ else
+ input.split(",").map(entry => {
+ val trimmedEntry = entry.trim
+ val pair: Array[String] = trimmedEntry.split("=")
+ if (pair.length != 2)
+ throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry))
+ pair(0) -> new Quota(pair(1).toDouble, true)
+ }).toMap
+ }
+
+ private def clientRateMetricName(clientId: String): MetricName = {
+ new MetricName("byte-rate", apiKey,
+ "Tracking byte-rate per client",
+ "client-id", clientId)
+ }
+
+ def shutdown() = {
+ throttledRequestReaper.shutdown()
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7ea509c..67f0cad 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,6 +17,7 @@
package kafka.server
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.TopicPartition
import kafka.api._
@@ -42,9 +43,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val zkClient: ZkClient,
val brokerId: Int,
val config: KafkaConfig,
- val metadataCache: MetadataCache) extends Logging {
+ val metadataCache: MetadataCache,
+ val metrics: Metrics) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
+ // Store all the quota managers for each type of request
+ private val quotaManagers = instantiateQuotaManagers(config)
/**
* Top-level method that handles all requests and multiplexes to the right api
@@ -250,6 +254,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
+ val numBytesAppended = produceRequest.sizeInBytes
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
@@ -265,21 +270,27 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- if (produceRequest.requiredAcks == 0) {
- // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
- // the request, since no response is expected by the producer, the server will close socket server so that
- // the producer client will know that some error has happened and will refresh its metadata
- if (errorInResponse) {
- info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0"
- .format(produceRequest.correlationId, produceRequest.clientId))
- requestChannel.closeConnection(request.processor, request)
+ def produceResponseCallback {
+ if (produceRequest.requiredAcks == 0) {
+ // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
+ // the request, since no response is expected by the producer, the server will close socket server so that
+ // the producer client will know that some error has happened and will refresh its metadata
+ if (errorInResponse) {
+ info(
+ "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format(
+ produceRequest.correlationId,
+ produceRequest.clientId))
+ requestChannel.closeConnection(request.processor, request)
+ } else {
+ requestChannel.noOperation(request.processor, request)
+ }
} else {
- requestChannel.noOperation(request.processor, request)
+ val response = ProducerResponse(produceRequest.correlationId, responseStatus)
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
- } else {
- val response = ProducerResponse(produceRequest.correlationId, responseStatus)
- requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
+
+ quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback)
}
// only allow appending to internal topic partitions
@@ -316,14 +327,27 @@ class KafkaApis(val requestChannel: RequestChannel,
.format(fetchRequest.correlationId, fetchRequest.clientId,
topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))
}
-
// record the bytes out metrics only when the response is being sent
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
}
val response = FetchResponse(fetchRequest.correlationId, responsePartitionData)
- requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
+ def fetchResponseCallback {
+ requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
+ }
+
+ // Do not throttle replication traffic
+ if (fetchRequest.isFromFollower) {
+ fetchResponseCallback
+ } else {
+ quotaManagers.get(RequestKeys.FetchKey) match {
+ case Some(quotaManager) =>
+ quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback)
+ case None =>
+ warn("Cannot throttle Api key %s".format(RequestKeys.nameForKey(RequestKeys.FetchKey)))
+ }
+ }
}
// call the replica manager to fetch messages from the local replica
@@ -604,9 +628,37 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback)
}
+ /*
+ * Returns a Map of all quota managers configured. The request Api key is the key for the Map
+ */
+ private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = {
+ val producerQuotaManagerCfg = ClientQuotaManagerConfig(
+ quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
+ quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides,
+ numQuotaSamples = cfg.numQuotaSamples,
+ quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+ )
+
+ val consumerQuotaManagerCfg = ClientQuotaManagerConfig(
+ quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
+ quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides,
+ numQuotaSamples = cfg.numQuotaSamples,
+ quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+ )
+
+ val quotaManagers = Map[Short, ClientQuotaManager](
+ RequestKeys.ProduceKey ->
+ new ClientQuotaManager(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime),
+ RequestKeys.FetchKey ->
+ new ClientQuotaManager(consumerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.FetchKey), new org.apache.kafka.common.utils.SystemTime)
+ )
+ quotaManagers
+ }
+
def close() {
- // TODO currently closing the API is an no-op since the API no longer maintain any modules
- // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
- debug("Shut down complete.")
+ quotaManagers.foreach { case(apiKey, quotaManager) =>
+ quotaManager.shutdown()
+ }
+ info("Shutdown complete.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a06f0bd..394f21b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,7 +26,10 @@ import kafka.consumer.ConsumerConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
+import org.apache.kafka.common.config.ConfigDef.Importance._
+import org.apache.kafka.common.config.ConfigDef.Range._
+import org.apache.kafka.common.config.ConfigDef.Type._
+import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.protocol.SecurityProtocol
@@ -132,12 +135,21 @@ object Defaults {
val OffsetCommitTimeoutMs = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs
val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks
+ /** ********* Quota Configuration ***********/
+ val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+ val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+ val ProducerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
+ val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
+ val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples
+ val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
+
val DeleteTopicEnable = false
val CompressionType = "producer"
+ /** ********* Kafka Metrics Configuration ***********/
val MetricNumSamples = 2
- val MetricSampleWindowMs = 1000
+ val MetricSampleWindowMs = 30000
val MetricReporterClasses = ""
}
@@ -250,15 +262,22 @@ object KafkaConfig {
val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms"
val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms"
val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
+ /** ********* Quota Configuration ***********/
+ val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
+ val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
+ val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
+ val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
+ val NumQuotaSamplesProp = "quota.window.num"
+ val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
val DeleteTopicEnableProp = "delete.topic.enable"
val CompressionTypeProp = "compression.type"
+ /** ********* Kafka Metrics Configuration ***********/
val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG
val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
-
/* Documentation */
/** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Zookeeper host string"
@@ -388,11 +407,22 @@ object KafkaConfig {
val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
"or this timeout is reached. This is similar to the producer request timeout."
val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden"
+ /** ********* Quota Configuration ***********/
+ val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
+ val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
+ val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " +
+ "Example: clientIdX=10485760,clientIdY=10485760"
+ val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " +
+ "Example: clientIdX=10485760,clientIdY=10485760"
+ val NumQuotaSamplesDoc = "The number of samples to retain in memory"
+ val QuotaWindowSizeSecondsDoc = "The time span of each sample"
+
val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
"('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " +
"'producer' which means retain the original compression codec set by the producer."
+ /** ********* Kafka Metrics Configuration ***********/
val MetricSampleWindowMsDoc = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC
val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
@@ -518,9 +548,19 @@ object KafkaConfig {
.define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc)
.define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc)
.define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc)
+
+ /** ********* Kafka Metrics Configuration ***********/
.define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc)
.define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc)
.define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
+
+ /** ********* Quota configuration ***********/
+ .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
+ .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
+ .define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc)
+ .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc)
+ .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
+ .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
}
def configNames() = {
@@ -548,7 +588,6 @@ object KafkaConfig {
props.putAll(overrides)
fromProps(props)
}
-
}
case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) {
@@ -661,10 +700,17 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter])
+ /** ********* Quota Configuration **************/
+ val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
+ val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
+ val producerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp)
+ val consumerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp)
+ val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)
+ val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)
+
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
val compressionType = getString(KafkaConfig.CompressionTypeProp)
-
val listeners = getListeners
val advertisedListeners = getAdvertisedListeners
val logRetentionTimeMillis = getLogRetentionTimeMillis
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 84d4730..6d65507 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,7 +18,6 @@
package kafka.server
import java.util
-import java.util.Properties
import kafka.admin._
import kafka.log.LogConfig
@@ -62,11 +61,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
reporters.add(new JmxReporter(jmxPrefix))
-
-
- // This exists so SocketServer (which uses Client libraries) can use the client Time objects without having to convert all of Kafka to use them
- // Once we get rid of kafka.utils.time, we can get rid of this too
- private val socketServerTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
+ // This exists because the Metrics package from clients has its own Time implementation.
+ // SocketServer/Quotas (which uses client libraries) have to use the client Time objects without having to convert all of Kafka to use them
+ // Eventually, we want to merge the Time objects in core and clients
+ private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
+ var metrics: Metrics = null
val brokerState: BrokerState = new BrokerState
@@ -80,7 +79,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
- val metrics: Metrics = new Metrics()
var consumerCoordinator: ConsumerCoordinator = null
@@ -92,7 +90,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
-
var zkClient: ZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
@@ -121,6 +118,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
+ metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime)
+
brokerState.newState(Starting)
/* start scheduler */
@@ -137,9 +136,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
- val metrics = new Metrics(metricConfig, reporters, socketServerTime)
-
-
socketServer = new SocketServer(config.brokerId,
config.listeners,
config.numNetworkThreads,
@@ -150,7 +146,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
config.maxConnectionsPerIp,
config.connectionsMaxIdleMs,
config.maxConnectionsPerIpOverrides,
- socketServerTime,
+ kafkaMetricsTime,
metrics)
socketServer.startup()
@@ -168,7 +164,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
- kafkaController, zkClient, config.brokerId, config, metadataCache)
+ kafkaController, zkClient, config.brokerId, config, metadataCache, metrics)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
@@ -362,6 +358,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
CoreUtils.swallow(kafkaController.shutdown())
if(zkClient != null)
CoreUtils.swallow(zkClient.close())
+ if (metrics != null)
+ CoreUtils.swallow(metrics.close())
brokerState.newState(NotRunning)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2e0bbcd..d829e18 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.metrics.Metrics
import scala.collection._
@@ -98,7 +99,7 @@ class ReplicaManager(val config: KafkaConfig,
val zkClient: ZkClient,
scheduler: Scheduler,
val logManager: LogManager,
- val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
+ val isShuttingDown: AtomicBoolean) extends Logging with KafkaMetricsGroup {
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
private val localBrokerId = config.brokerId
@@ -440,7 +441,6 @@ class ReplicaManager(val config: KafkaConfig,
fetchMinBytes: Int,
fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
-
val isFromFollower = replicaId >= 0
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ThrottledResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala
new file mode 100644
index 0000000..1f80d54
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ThrottledResponse.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{TimeUnit, Delayed}
+
+import org.apache.kafka.common.utils.Time
+
+
+/**
+ * Represents a request whose response has been delayed.
+ * @param time @Time instance to use
+ * @param delayTimeMs delay associated with this request
+ * @param callback Callback to trigger after delayTimeMs milliseconds
+ */
+private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Long, callback: => Unit) extends Delayed {
+ val endTime = time.milliseconds + delayTimeMs
+
+ def execute() = callback
+
+ override def getDelay(unit: TimeUnit): Long = {
+ unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS)
+ }
+
+ override def compareTo(d: Delayed): Int = {
+ val other = d.asInstanceOf[ThrottledResponse]
+ if (this.endTime < other.endTime) -1
+ else if (this.endTime > other.endTime) 1
+ else 0
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index fc226c8..dc46797 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -51,6 +51,9 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
info("Shutdown completed")
}
+ /**
+ * This method is repeatedly invoked until the thread shuts down or this method throws an exception
+ */
def doWork(): Unit
override def run(): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
new file mode 100644
index 0000000..a11bf90
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -0,0 +1,194 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import junit.framework.Assert
+import kafka.consumer.SimpleConsumer
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.KafkaMetric
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+import scala.collection.mutable
+
+class QuotasTest extends KafkaServerTestHarness {
+ private val producerBufferSize = 300000
+ private val producerId1 = "QuotasTestProducer-1"
+ private val producerId2 = "QuotasTestProducer-2"
+ private val consumerId1 = "QuotasTestConsumer-1"
+ private val consumerId2 = "QuotasTestConsumer-2"
+
+ val numServers = 2
+ val overridingProps = new Properties()
+
+ // Low enough quota that a producer sending a small payload in a tight loop should get throttled
+ overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000")
+ overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500")
+
+ // un-throttled
+ overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue)
+ overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue)
+
+ override def generateConfigs() = {
+ FixedPortTestUtils.createBrokerConfigs(numServers,
+ zkConnect,
+ enableControlledShutdown = false)
+ .map(KafkaConfig.fromProps(_, overridingProps))
+ }
+
+ var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+ var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ var replicaConsumers = mutable.Buffer[SimpleConsumer]()
+
+ var leaderNode: KafkaServer = null
+ var followerNode: KafkaServer = null
+ private val topic1 = "topic-1"
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ val producerProps = new Properties()
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "0")
+ producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
+ producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1)
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+ producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2)
+ producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+ val numPartitions = 1
+ val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers)
+ leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1)
+ followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1)
+ assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
+
+ // Create consumers
+ val consumerProps = new Properties
+ consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
+ consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+ consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
+
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1)
+ consumers += new KafkaConsumer(consumerProps)
+ // Create replica consumers with the same clientId as the high level consumer. These requests should never be throttled
+ replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId1)
+
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2)
+ consumers += new KafkaConsumer(consumerProps)
+ replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2)
+
+ }
+
+ @After
+ override def tearDown() {
+ producers.foreach( _.close )
+ consumers.foreach( _.close )
+ replicaConsumers.foreach( _.close )
+ super.tearDown()
+ }
+
+ @Test
+ def testThrottledProducerConsumer() {
+ val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+
+ val numRecords = 1000
+ produce(producers.head, numRecords)
+
+ val producerMetricName = new MetricName("throttle-time",
+ RequestKeys.nameForKey(RequestKeys.ProduceKey),
+ "Tracking throttle-time per client",
+ "client-id", producerId1)
+ Assert.assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
+
+ // Consumer should read in a bursty manner and get throttled immediately
+ consume(consumers.head, numRecords)
+ // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
+ val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
+ replicaConsumers.head.fetch(request)
+ val consumerMetricName = new MetricName("throttle-time",
+ RequestKeys.nameForKey(RequestKeys.FetchKey),
+ "Tracking throttle-time per client",
+ "client-id", consumerId1)
+ Assert.assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
+ }
+
+ @Test
+ def testProducerConsumerOverrideUnthrottled() {
+ val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+ val numRecords = 1000
+ produce(producers(1), numRecords)
+ val producerMetricName = new MetricName("throttle-time",
+ RequestKeys.nameForKey(RequestKeys.ProduceKey),
+ "Tracking throttle-time per client",
+ "client-id", producerId2)
+ Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(producerMetricName).value())
+
+ // The "client" consumer does not get throttled.
+ consume(consumers(1), numRecords)
+ // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
+ val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
+ replicaConsumers(1).fetch(request)
+ val consumerMetricName = new MetricName("throttle-time",
+ RequestKeys.nameForKey(RequestKeys.FetchKey),
+ "Tracking throttle-time per client",
+ "client-id", consumerId2)
+ Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(consumerMetricName).value())
+ }
+
+ def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
+ var numBytesProduced = 0
+ for (i <- 0 to count) {
+ val payload = i.toString.getBytes
+ numBytesProduced += payload.length
+ p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
+ new ErrorLoggingCallback(topic1, null, null, true)).get()
+ Thread.sleep(1)
+ }
+ numBytesProduced
+ }
+
+ def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
+ consumer.subscribe(topic1)
+ var numConsumed = 0
+ while (numConsumed < numRecords) {
+ for (cr <- consumer.poll(100)) {
+ numConsumed += 1
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
new file mode 100644
index 0000000..97dcca8
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.Collections
+
+import kafka.api.RequestKeys
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.{Metrics, Quota, MetricConfig}
+import org.apache.kafka.common.utils.MockTime
+import org.scalatest.junit.JUnit3Suite
+import org.junit.{Before, Test, Assert}
+
+class ClientQuotaManagerTest extends JUnit3Suite {
+ private val time = new MockTime
+
+ private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+ quotaBytesPerSecondOverrides = "p1=2000,p2=4000")
+
+ var numCallbacks: Int = 0
+ def callback {
+ numCallbacks += 1
+ }
+
+ @Before
+ def beforeMethod() {
+ numCallbacks = 0
+ }
+
+ @Test
+ def testQuotaParsing() {
+ val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time)
+ try {
+ Assert.assertEquals("Default producer quota should be 500",
+ new Quota(500, true), clientMetrics.quota("random-client-id"))
+ Assert.assertEquals("Should return the overridden value (2000)",
+ new Quota(2000, true), clientMetrics.quota("p1"))
+ Assert.assertEquals("Should return the overridden value (4000)",
+ new Quota(4000, true), clientMetrics.quota("p2"))
+ } finally {
+ clientMetrics.shutdown()
+ }
+ }
+
+ @Test
+ def testQuotaViolation() {
+ val metrics = newMetrics
+ val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
+ val queueSizeMetric = metrics.metrics().get(new MetricName("queue-size", "producer", ""))
+ try {
+ /* We have 10 second windows. Make sure that there is no quota violation
+ * if we produce under the quota
+ */
+ for (i <- 0 until 10) {
+ clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
+ time.sleep(1000)
+ }
+ Assert.assertEquals(10, numCallbacks)
+ Assert.assertEquals(0, queueSizeMetric.value().toInt)
+
+ // Create a spike.
+ // 400*10 + 2000 = 6000/10 = 600 bytes per second.
+ // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200
+ val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback)
+ Assert.assertEquals("Should be throttled", 2200, sleepTime)
+ Assert.assertEquals(1, queueSizeMetric.value().toInt)
+ // After a request is delayed, the callback cannot be triggered immediately
+ clientMetrics.throttledRequestReaper.doWork()
+ Assert.assertEquals(10, numCallbacks)
+ time.sleep(sleepTime)
+
+ // Callback can only be triggered after the the delay time passes
+ clientMetrics.throttledRequestReaper.doWork()
+ Assert.assertEquals(0, queueSizeMetric.value().toInt)
+ Assert.assertEquals(11, numCallbacks)
+
+ // Could continue to see delays until the bursty sample disappears
+ for (i <- 0 until 10) {
+ clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
+ time.sleep(1000)
+ }
+
+ Assert.assertEquals("Should be unthrottled since bursty sample has rolled over",
+ 0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback))
+ } finally {
+ clientMetrics.shutdown()
+ }
+ }
+
+ @Test
+ def testOverrideParse() {
+ var testConfig = ClientQuotaManagerConfig()
+ var clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
+
+ try {
+ // Case 1 - Default config
+ Assert.assertEquals(new Quota(ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, true),
+ clientMetrics.quota("p1"))
+ } finally {
+ clientMetrics.shutdown()
+ }
+
+
+ // Case 2 - Empty override
+ testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+ quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,")
+
+ clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
+ try {
+ Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1"))
+ Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2"))
+ } finally {
+ clientMetrics.shutdown()
+ }
+
+ // Case 3 - NumberFormatException for override
+ testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+ quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4")
+ try {
+ clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
+ Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
+ }
+ catch {
+ // Swallow.
+ case nfe: NumberFormatException =>
+ }
+
+ // Case 4 - IllegalArgumentException for override
+ testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
+ quotaBytesPerSecondOverrides = "p1=2000=3000")
+ try {
+ clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "producer", time)
+ Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
+ }
+ catch {
+ // Swallow.
+ case nfe: IllegalArgumentException =>
+ }
+
+ }
+
+ def newMetrics: Metrics = {
+ new Metrics(new MetricConfig(), Collections.emptyList(), time)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e26a730..9688b8c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -474,6 +474,12 @@ class KafkaConfigTest {
case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
+ case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string
+ case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string
+ case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
new file mode 100644
index 0000000..14a7f45
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import java.util.Collections
+import java.util.concurrent.{TimeUnit, DelayQueue}
+
+import org.apache.kafka.common.metrics.MetricConfig
+import org.apache.kafka.common.utils.MockTime
+import org.junit.{AfterClass, Before, Assert, Test}
+import org.scalatest.junit.JUnit3Suite
+
+class ThrottledResponseExpirationTest extends JUnit3Suite {
+ private val time = new MockTime
+ private var numCallbacks: Int = 0
+ private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(),
+ Collections.emptyList(),
+ time)
+
+ def callback {
+ numCallbacks += 1
+ }
+
+ @Before
+ def beforeMethod() {
+ numCallbacks = 0
+ }
+
+ @Test
+ def testExpire() {
+ val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, "producer", time)
+
+ val delayQueue = new DelayQueue[ThrottledResponse]()
+ val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue)
+ try {
+ // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp
+ delayQueue.add(new ThrottledResponse(time, 10, callback))
+ delayQueue.add(new ThrottledResponse(time, 30, callback))
+ delayQueue.add(new ThrottledResponse(time, 30, callback))
+ delayQueue.add(new ThrottledResponse(time, 20, callback))
+
+ for(itr <- 1 to 3) {
+ time.sleep(10)
+ reaper.doWork()
+ Assert.assertEquals(itr, numCallbacks)
+
+ }
+ reaper.doWork()
+ Assert.assertEquals(4, numCallbacks)
+ Assert.assertEquals(0, delayQueue.size())
+ reaper.doWork()
+ Assert.assertEquals(4, numCallbacks)
+ } finally {
+ clientMetrics.shutdown()
+ }
+ }
+
+ @Test
+ def testThrottledRequest() {
+ val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback)
+ val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback)
+ val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback)
+ Assert.assertEquals(10, t1.delayTimeMs)
+ Assert.assertEquals(20, t2.delayTimeMs)
+ Assert.assertEquals(20, t3.delayTimeMs)
+
+ for(itr <- 0 to 2) {
+ Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS))
+ Assert.assertEquals(20 - 10*itr, t2.getDelay(TimeUnit.MILLISECONDS))
+ Assert.assertEquals(20 - 10*itr, t3.getDelay(TimeUnit.MILLISECONDS))
+ time.sleep(10)
+ }
+ }
+}