You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/05/01 16:13:39 UTC
[2/4] kafka git commit: KAFKA-4954; Request handler utilization quotas
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b9bf3e4..b2a3456 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -419,6 +419,7 @@ private[kafka] class Processor(val id: Int,
"socket-server",
metricTags,
false,
+ true,
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
override def run() {
@@ -457,7 +458,7 @@ private[kafka] class Processor(val id: Int,
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
- curr.request.updateRequestMetrics
+ updateRequestMetrics(curr.request)
trace("Socket server received empty response to send, registering for read: " + curr)
val channelId = curr.request.connectionId
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
@@ -465,7 +466,7 @@ private[kafka] class Processor(val id: Int,
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
- curr.request.updateRequestMetrics
+ updateRequestMetrics(curr.request)
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
@@ -482,7 +483,7 @@ private[kafka] class Processor(val id: Int,
// `channel` can be null if the selector closed the connection because it was idle for too long
if (channel == null) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
- response.request.updateRequestMetrics()
+ response.request.updateRequestMetrics(0L)
}
else {
selector.send(response.responseSend)
@@ -505,14 +506,13 @@ private[kafka] class Processor(val id: Int,
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
- val session = {
- // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
- val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
- RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
- }
+ // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
+ val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
+ val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
+
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
- buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
- securityProtocol = securityProtocol)
+ buffer = receive.payload, startTimeNanos = time.nanoseconds,
+ listenerName = listenerName, securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
@@ -529,17 +529,24 @@ private[kafka] class Processor(val id: Int,
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
- resp.request.updateRequestMetrics()
+ updateRequestMetrics(resp.request)
selector.unmute(send.destination)
}
}
+ private def updateRequestMetrics(request: RequestChannel.Request) {
+ val channel = selector.channel(request.connectionId)
+ val openOrClosingChannel = if (channel != null) channel else selector.closingChannel(request.connectionId)
+ val networkThreadTimeNanos = if (openOrClosingChannel != null) openOrClosingChannel.getAndResetNetworkThreadTimeNanos() else 0L
+ request.updateRequestMetrics(networkThreadTimeNanos)
+ }
+
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
- inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
+ inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 84772db..04f5239 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -31,10 +31,11 @@ import scala.collection.JavaConverters._
/**
* Represents the sensors aggregated per client
+ * @param quotaEntity Quota entity representing <client-id>, <user> or <user, client-id>
* @param quotaSensor @Sensor that tracks the quota
* @param throttleTimeSensor @Sensor that tracks the throttle time
*/
-private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor)
+case class ClientSensors(quotaEntity: QuotaEntity, quotaSensor: Sensor, throttleTimeSensor: Sensor)
/**
* Configuration settings for quota management
@@ -58,6 +59,8 @@ object ClientQuotaManagerConfig {
val DefaultQuotaWindowSizeSeconds = 1
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600
+ val QuotaRequestPercentDefault = Int.MaxValue.toDouble
+ val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
@@ -126,12 +129,12 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String
*
* @param config @ClientQuotaManagerConfig quota configs
* @param metrics @Metrics Metrics instance
- * @param apiKey API Key for the request
+ * @param quotaType Quota type of this quota manager
* @param time @Time object to use
*/
-final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
- private val apiKey: QuotaType,
+ private val quotaType: QuotaType,
private val time: Time) extends Logging {
private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
@@ -140,19 +143,22 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val delayQueue = new DelayQueue[ThrottledResponse]()
private val sensorAccessor = new SensorAccess
val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
- throttledRequestReaper.start()
- private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
+ private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
delayQueueSensor.add(metrics.metricName("queue-size",
- apiKey.toString,
+ quotaType.toString,
"Tracks the size of the delay queue"), new Total())
+ start() // Use start method to keep findbugs happy
+ private def start() {
+ throttledRequestReaper.start()
+ }
/**
* Reaper thread that triggers callbacks on all throttled requests
* @param delayQueue DelayQueue to dequeue from
*/
class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
- "ThrottledRequestReaper-%s".format(apiKey), false) {
+ "ThrottledRequestReaper-%s".format(quotaType), false) {
override def doWork(): Unit = {
val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
@@ -166,17 +172,23 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
/**
- * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.)
- * @param clientId clientId that produced the data
- * @param value amount of data written in bytes
+ * Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
+ * If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
+ * Throttle time calculation may be overridden by sub-classes.
+ * @param sanitizedUser user principal of client
+ * @param clientId clientId that produced/fetched the data
+ * @param value amount of data in bytes or request processing time as a percentage
* @param callback Callback function. This will be triggered immediately if quota is not violated.
* If there is a quota violation, this callback will be triggered after a delay
* @return Number of milliseconds to delay the response in case of Quota violation.
* Zero otherwise
*/
- def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Int, callback: Int => Unit): Int = {
- val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
- val clientSensors = getOrCreateQuotaSensors(clientQuotaEntity)
+ def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
+ val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+ recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
+ }
+
+ def recordAndThrottleOnQuotaViolation(clientSensors: ClientSensors, value: Double, callback: Int => Unit): Int = {
var throttleTimeMs = 0
try {
clientSensors.quotaSensor.record(value)
@@ -185,8 +197,9 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
} catch {
case _: QuotaViolationException =>
// Compute the delay
+ val clientQuotaEntity = clientSensors.quotaEntity
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
- throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
+ throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -197,6 +210,15 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
/**
+ * Records that a user/clientId changed some metric being throttled without checking for
+ * quota violation. The aggregate value will subsequently be used for throttling when the
+ * next request is processed.
+ */
+ def recordNoThrottle(clientSensors: ClientSensors, value: Double) {
+ clientSensors.quotaSensor.record(value, time.milliseconds(), false)
+ }
+
+ /**
* Determines the quota-id for the client with the specified user principal
* and client-id and returns the quota entity that encapsulates the quota-id
* and the associated quota override or default quota.
@@ -325,13 +347,13 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* we need to add a delay of X to W such that O * W / (W + X) = T.
* Solving for X, we get X = (O - T)/T * W.
*/
- private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = {
+ protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
val quota = config.quota()
val difference = clientMetric.value() - quota.bound
// Use the precise window used by the rate calculation
val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
- throttleTimeMs.round.toInt
+ throttleTimeMs.round
}
// Casting to Rate because we only use Rate in Quota computation
@@ -346,39 +368,54 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* This function either returns the sensors for a given client id or creates them if they don't exist
* First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
*/
- private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = {
+ def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = {
+ val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
// Names of the sensors to access
ClientSensors(
+ clientQuotaEntity,
sensorAccessor.getOrCreate(
- getQuotaSensorName(quotaEntity.quotaId),
+ getQuotaSensorName(clientQuotaEntity.quotaId),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
lock, metrics,
- () => clientRateMetricName(quotaEntity.sanitizedUser, quotaEntity.clientId),
- () => getQuotaMetricConfig(quotaEntity.quota),
- () => new Rate()
+ () => clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
+ () => getQuotaMetricConfig(clientQuotaEntity.quota),
+ () => measurableStat
),
- sensorAccessor.getOrCreate(getThrottleTimeSensorName(quotaEntity.quotaId),
+ sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
lock,
metrics,
- () => throttleMetricName(quotaEntity),
+ () => throttleMetricName(clientQuotaEntity),
() => null,
() => new Avg()
)
)
}
- private def getThrottleTimeSensorName(quotaId: QuotaId): String = apiKey + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+ private def measurableStat: MeasurableStat = new Rate()
+
+ private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
- private def getQuotaSensorName(quotaId: QuotaId): String = apiKey + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+ private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
- private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+ protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
.samples(config.numQuotaSamples)
.quota(quota)
}
+ protected def createSensor(sensorName: String, metricName: MetricName): Sensor = {
+ sensorAccessor.getOrCreate(
+ sensorName,
+ ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+ lock, metrics,
+ () => metricName,
+ () => null,
+ () => measurableStat
+ )
+ }
+
/**
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
* for any of these levels.
@@ -409,7 +446,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
quota match {
case Some(newQuota) =>
- logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
+ logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
overriddenQuota.put(quotaId, newQuota)
(sanitizedUser, clientId) match {
case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
@@ -418,7 +455,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case (None, None) =>
}
case None =>
- logger.info(s"Removing ${apiKey} quota for ${userInfo}${clientIdInfo}")
+ logger.info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
overriddenQuota.remove(quotaId)
}
@@ -460,8 +497,8 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
- private def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
- metrics.metricName("byte-rate", apiKey.toString,
+ protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+ metrics.metricName("byte-rate", quotaType.toString,
"Tracking byte-rate per user/client-id",
"user", sanitizedUser,
"client-id", clientId)
@@ -469,7 +506,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
metrics.metricName("throttle-time",
- apiKey.toString,
+ quotaType.toString,
"Tracking average throttle-time per user/client-id",
"user", quotaEntity.sanitizedUser,
"client-id", quotaEntity.clientId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
new file mode 100644
index 0000000..7e80be6
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.utils.Time
+
+
+class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
+ private val metrics: Metrics,
+ private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) {
+ val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
+ val exemptSensor = createSensor(exemptSensorName, exemptMetricName)
+
+ def recordExempt(value: Double) {
+ exemptSensor.record(value)
+ }
+
+ override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
+ math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
+ }
+
+ override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+ metrics.metricName("request-time", QuotaType.Request.toString,
+ "Tracking request-time per user/client-id",
+ "user", sanitizedUser,
+ "client-id", clientId)
+ }
+
+ private def exemptMetricName: MetricName = {
+ metrics.metricName("exempt-request-time", QuotaType.Request.toString,
+ "Tracking exempt-request-time utilization percentage")
+ }
+
+ private def exemptSensorName: String = "exempt-" + QuotaType.Request
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 8d6de8c..2483199 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -130,6 +130,12 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
else
None
quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
+ val requestQuota =
+ if (config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp))
+ Some(new Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble, true))
+ else
+ None
+ quotaManagers.request.updateQuota(sanitizedUser, clientId, requestQuota)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e68f921..175bf62 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -63,19 +63,23 @@ object DynamicConfig {
//Properties
val ProducerByteRateOverrideProp = "producer_byte_rate"
val ConsumerByteRateOverrideProp = "consumer_byte_rate"
+ val RequestPercentageOverrideProp = "request_percentage"
//Defaults
val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+ val DefaultRequestOverride = ClientQuotaManagerConfig.QuotaRequestPercentDefault
//Documentation
val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer traffic."
val ConsumerOverrideDoc = "A rate representing the upper bound (bytes/sec) for consumer traffic."
+ val RequestOverrideDoc = "A percentage representing the upper bound of time spent for processing requests."
//Definitions
private val clientConfigs = new ConfigDef()
.define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
.define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
+ .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
def names = clientConfigs.names
@@ -88,6 +92,7 @@ object DynamicConfig {
private val userConfigs = CredentialProvider.userCredentialConfigs
.define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
.define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
+ .define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
def names = userConfigs.names
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 59f062d..1e1f0d5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -118,24 +118,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} catch {
case e: FatalExitError => throw e
- case e: Throwable =>
- if (request.requestObj != null) {
- request.requestObj.handleError(e, requestChannel, request)
- error("Error when handling request %s".format(request.requestObj), e)
- } else {
- val response = request.body[AbstractRequest].getErrorResponse(e)
-
- /* If request doesn't have a default error response, we just close the connection.
- For example, when produce request has acks set to 0 */
- if (response == null)
- requestChannel.closeConnection(request.processor, request)
- else
- requestChannel.sendResponse(new Response(request, response))
-
- error("Error when handling request %s".format(request.body[AbstractRequest]), e)
- }
- } finally
- request.apiLocalCompleteTimeMs = time.milliseconds
+ case e: Throwable => handleError(request, e)
+ } finally {
+ request.apiLocalCompleteTimeNanos = time.nanoseconds
+ }
}
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
@@ -165,16 +151,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val leaderAndIsrResponse =
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
- new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
- } else {
- val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
- new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
- }
-
- requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
+ val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
+ sendResponseExemptThrottle(request, new Response(request, leaderAndIsrResponse))
+ } else {
+ val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
+ }
} catch {
case e: FatalExitError => throw e
case e: KafkaStorageException =>
@@ -189,27 +174,27 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
- val response =
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
- // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
- // since this broker is no longer a replica for that offsets topic partition.
- // This is required to handle the following scenario :
- // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
- // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
- // is not cleared.
- result.foreach { case (topicPartition, error) =>
- if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
- groupCoordinator.handleGroupEmigration(topicPartition.partition)
- }
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+ // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
+ // since this broker is no longer a replica for that offsets topic partition.
+ // This is required to handle the following scenario :
+ // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
+ // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
+ // is not cleared.
+ result.foreach { case (topicPartition, error) =>
+ if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
+ groupCoordinator.handleGroupEmigration(topicPartition.partition)
}
- new StopReplicaResponse(error, result.asJava)
- } else {
- val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
- new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
}
+ val response = new StopReplicaResponse(error, result.asJava)
+ sendResponseExemptThrottle(request, new Response(request, response))
+ } else {
+ val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
+ }
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
}
@@ -217,23 +202,21 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]
- val updateMetadataResponse =
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
- if (deletedPartitions.nonEmpty)
- groupCoordinator.handleDeletedPartitions(deletedPartitions)
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
+ if (deletedPartitions.nonEmpty)
+ groupCoordinator.handleDeletedPartitions(deletedPartitions)
- if (adminManager.hasDelayedTopicOperations) {
- updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
- adminManager.tryCompleteDelayedTopicOperations(topic)
- }
+ if (adminManager.hasDelayedTopicOperations) {
+ updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
+ adminManager.tryCompleteDelayedTopicOperations(topic)
}
- new UpdateMetadataResponse(Errors.NONE)
- } else {
- new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
}
-
- requestChannel.sendResponse(new Response(request, updateMetadataResponse))
+ sendResponseExemptThrottle(request, new Response(request, new UpdateMetadataResponse(Errors.NONE)))
+ } else {
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
+ }
}
def handleControlledShutdownRequest(request: RequestChannel.Request) {
@@ -249,9 +232,9 @@ class KafkaApis(val requestChannel: RequestChannel,
case Success(partitionsRemaining) =>
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
Errors.NONE, partitionsRemaining)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
+ sendResponseExemptThrottle(request, new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
case Failure(throwable) =>
- controlledShutdownRequest.handleError(throwable, requestChannel, request)
+ sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
}
}
controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback)
@@ -270,8 +253,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
(topicPartition, error)
}.toMap
- val response = new OffsetCommitResponse(results.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, results.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
case (topicPartition, _) =>
@@ -300,8 +283,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"on partition $topicPartition failed due to ${error.exceptionName}")
}
}
- val response = new OffsetCommitResponse(combinedCommitStatus.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
if (authorizedTopics.isEmpty)
@@ -407,7 +390,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def produceResponseCallback(delayTimeMs: Int) {
+ def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
@@ -426,13 +409,15 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.noOperation(request.processor, request)
}
} else {
- val respBody = new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs)
- requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+ def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
+ new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
+ }
+ sendResponseMaybeThrottle(request, createResponseCallback)
}
}
// When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = time.milliseconds
+ request.apiRemoteCompleteTimeNanos = time.nanoseconds
quotas.produce.recordAndMaybeThrottle(
request.session.sanitizedUser,
@@ -534,20 +519,29 @@ class KafkaApis(val requestChannel: RequestChannel,
val response = new FetchResponse(fetchedPartitionData, 0)
val responseStruct = response.toStruct(versionId)
- def fetchResponseCallback(throttleTimeMs: Int) {
- trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
- val responseSend = response.toSend(responseStruct, throttleTimeMs, request.connectionId, request.header)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+ def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
+ def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
+ trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
+ val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
+ new RequestChannel.Response(request, responseSend)
+ }
+ def sendResponseCallback(requestThrottleTimeMs: Int) {
+ requestChannel.sendResponse(createResponse(requestThrottleTimeMs))
+ }
+ if (fetchRequest.isFromFollower)
+ sendResponseExemptThrottle(request, createResponse(0))
+ else
+ sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
}
// When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = time.milliseconds
+ request.apiRemoteCompleteTimeNanos = time.nanoseconds
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
quotas.leader.record(responseSize)
- fetchResponseCallback(throttleTimeMs = 0)
+ fetchResponseCallback(bandwidthThrottleTimeMs = 0)
} else {
quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
fetchResponseCallback)
@@ -597,8 +591,8 @@ class KafkaApis(val requestChannel: RequestChannel,
else
handleListOffsetRequestV1(request)
- val response = new ListOffsetResponse(mergedResponseMap.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
@@ -925,13 +919,15 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
- val responseBody = new MetadataResponse(
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new MetadataResponse(
+ throttleTimeMs,
brokers.map(_.getNode(request.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava
)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+
+ sendResponseMaybeThrottle(request, createResponse)
}
/**
@@ -944,68 +940,70 @@ class KafkaApis(val requestChannel: RequestChannel,
def authorizeTopicDescribe(partition: TopicPartition) =
authorize(request.session, Describe, new Resource(Topic, partition.topic))
- val offsetFetchResponse =
- // reject the request if not authorized to the group
- if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
- offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
- else {
- if (header.apiVersion == 0) {
- val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
- .partition(authorizeTopicDescribe)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val offsetFetchResponse =
+ // reject the request if not authorized to the group
+ if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
+ offsetFetchRequest.getErrorResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+ else {
+ if (header.apiVersion == 0) {
+ val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+ .partition(authorizeTopicDescribe)
- // version 0 reads offsets from ZK
- val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
- val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
- try {
- if (!metadataCache.contains(topicPartition.topic))
- (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
- else {
- val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
- payloadOpt match {
- case Some(payload) =>
- (topicPartition, new OffsetFetchResponse.PartitionData(
- payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
- case None =>
- (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+ // version 0 reads offsets from ZK
+ val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
+ val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
+ try {
+ if (!metadataCache.contains(topicPartition.topic))
+ (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+ else {
+ val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+ payloadOpt match {
+ case Some(payload) =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(
+ payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
+ case None =>
+ (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+ }
}
+ } catch {
+ case e: Throwable =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(
+ OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
}
- } catch {
- case e: Throwable =>
- (topicPartition, new OffsetFetchResponse.PartitionData(
- OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
- }
- }.toMap
+ }.toMap
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
- new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
- } else {
- // versions 1 and above read offsets from Kafka
- if (offsetFetchRequest.isAllPartitions) {
- val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
- if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(error)
- else {
- // clients are not allowed to see offsets for topics that are not authorized for Describe
- val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
- new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava)
- }
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+ new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
} else {
- val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
- .partition(authorizeTopicDescribe)
- val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
- Some(authorizedPartitions))
- if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(error)
- else {
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
- new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+ // versions 1 and above read offsets from Kafka
+ if (offsetFetchRequest.isAllPartitions) {
+ val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
+ if (error != Errors.NONE)
+ offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+ else {
+ // clients are not allowed to see offsets for topics that are not authorized for Describe
+ val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
+ new OffsetFetchResponse(throttleTimeMs, Errors.NONE, authorizedPartitionData.asJava)
+ }
+ } else {
+ val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+ .partition(authorizeTopicDescribe)
+ val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
+ Some(authorizedPartitions))
+ if (error != Errors.NONE)
+ offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+ else {
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+ new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+ }
}
}
}
+ trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
+ offsetFetchResponse
}
-
- trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
- requestChannel.sendResponse(new Response(request, offsetFetchResponse))
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleFindCoordinatorRequest(request: RequestChannel.Request) {
@@ -1014,8 +1012,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
!authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
- val responseBody = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
// TODO: Authorize by transactional id if coordinator type is TRANSACTION
@@ -1035,24 +1033,26 @@ class KafkaApis(val requestChannel: RequestChannel,
throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
}
- val responseBody = if (topicMetadata.error != Errors.NONE) {
- new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
- } else {
- val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
- .find(_.partition == partition)
- .map(_.leader())
-
- coordinatorEndpoint match {
- case Some(endpoint) if !endpoint.isEmpty =>
- new FindCoordinatorResponse(Errors.NONE, endpoint)
- case _ =>
- new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = if (topicMetadata.error != Errors.NONE) {
+ new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ } else {
+ val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
+ .find(_.partition == partition)
+ .map(_.leader())
+
+ coordinatorEndpoint match {
+ case Some(endpoint) if !endpoint.isEmpty =>
+ new FindCoordinatorResponse(throttleTimeMs, Errors.NONE, endpoint)
+ case _ =>
+ new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ }
}
+ trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+ .format(responseBody, request.header.correlationId, request.header.clientId))
+ responseBody
}
-
- trace("Sending FindCoordinator response %s for correlation id %d to client %s."
- .format(responseBody, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ sendResponseMaybeThrottle(request, createResponse)
}
}
@@ -1074,19 +1074,20 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}.toMap
- val responseBody = new DescribeGroupsResponse(groups.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new DescribeGroupsResponse(throttleTimeMs, groups.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleListGroupsRequest(request: RequestChannel.Request) {
- val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
- ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
+ if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+ def createResponse(throttleTimeMs: Int): AbstractResponse = ListGroupsResponse.fromError(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
val (error, groups) = groupCoordinator.handleListGroups()
val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
- new ListGroupsResponse(error, allGroups.asJava)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new ListGroupsResponse(throttleTimeMs, error, allGroups.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -1095,23 +1096,27 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
- val responseBody = new JoinGroupResponse(joinResult.error, joinResult.generationId,
- joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = new JoinGroupResponse(throttleTimeMs, joinResult.error, joinResult.generationId,
+ joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
- trace("Sending join group response %s for correlation id %d to client %s."
- .format(responseBody, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ trace("Sending join group response %s for correlation id %d to client %s."
+ .format(responseBody, request.header.correlationId, request.header.clientId))
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
- val responseBody = new JoinGroupResponse(
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new JoinGroupResponse(
+ throttleTimeMs,
Errors.GROUP_AUTHORIZATION_FAILED,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
Collections.emptyMap())
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ sendResponseMaybeThrottle(request, createResponse)
} else {
// let the coordinator to handle join-group
val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
@@ -1133,8 +1138,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val syncGroupRequest = request.body[SyncGroupRequest]
def sendResponseCallback(memberState: Array[Byte], error: Errors) {
- val responseBody = new SyncGroupResponse(error, ByteBuffer.wrap(memberState))
- requestChannel.sendResponse(new Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new SyncGroupResponse(throttleTimeMs, error, ByteBuffer.wrap(memberState))
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
@@ -1155,15 +1160,18 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a heartbeat response
def sendResponseCallback(error: Errors) {
- val response = new HeartbeatResponse(error)
- trace("Sending heartbeat response %s for correlation id %d to client %s."
- .format(response, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val response = new HeartbeatResponse(throttleTimeMs, error)
+ trace("Sending heartbeat response %s for correlation id %d to client %s."
+ .format(response, request.header.correlationId, request.header.clientId))
+ response
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
- val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED)
- requestChannel.sendResponse(new Response(request, heartbeatResponse))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new HeartbeatResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
}
else {
// let the coordinator to handle heartbeat
@@ -1180,15 +1188,18 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a leave-group response
def sendResponseCallback(error: Errors) {
- val response = new LeaveGroupResponse(error)
- trace("Sending leave group response %s for correlation id %d to client %s."
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val response = new LeaveGroupResponse(throttleTimeMs, error)
+ trace("Sending leave group response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ response
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
- val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED)
- requestChannel.sendResponse(new Response(request, leaveGroupResponse))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaveGroupResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
// let the coordinator to handle leave-group
groupCoordinator.handleLeaveGroup(
@@ -1199,8 +1210,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleSaslHandshakeRequest(request: RequestChannel.Request) {
- val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -1210,20 +1221,26 @@ class KafkaApis(val requestChannel: RequestChannel,
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
- val responseSend =
- if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
- ApiVersionsResponse.API_VERSIONS_RESPONSE.toSend(request.connectionId, request.header)
- else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+ def sendResponseCallback(throttleTimeMs: Int) {
+ val responseSend =
+ if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
+ ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, throttleTimeMs).toSend(request.connectionId, request.header)
+ else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
+ requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+ }
+ sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
}
def handleCreateTopicsRequest(request: RequestChannel.Request) {
val createTopicsRequest = request.body[CreateTopicsRequest]
def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
- val responseBody = new CreateTopicsResponse(results.asJava)
- trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava)
+ trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!controller.isActive) {
@@ -1279,11 +1296,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def sendResponseCallback(results: Map[String, Errors]): Unit = {
- val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
- unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
- val responseBody = new DeleteTopicsResponse(completeResults.asJava)
- trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
+ unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
+ val responseBody = new DeleteTopicsResponse(throttleTimeMs, completeResults.asJava)
+ trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!controller.isActive) {
@@ -1335,11 +1355,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
-
- // When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = time.milliseconds
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new DeleteRecordsResponse(throttleTimeMs, mergedResponseStatus.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
if (authorizedForDeleteTopics.isEmpty)
@@ -1359,9 +1376,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// Send response callback
def sendResponseCallback(result: InitPidResult): Unit = {
- val responseBody: InitPidResponse = new InitPidResponse(result.error, result.pid, result.epoch)
- trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch)
+ trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback)
}
@@ -1370,9 +1390,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val endTxnRequest = request.body[EndTxnRequest]
def sendResponseCallback(error: Errors) {
- val responseBody = new EndTxnResponse(error)
- trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = new EndTxnResponse(throttleTimeMs, error)
+ trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId(),
@@ -1383,11 +1406,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
+ authorizeClusterAction(request)
val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
- requestChannel.sendResponse(new RequestChannel.Response(request, new WriteTxnMarkersResponse(emptyResponse)))
+ val responseBody = new WriteTxnMarkersResponse(emptyResponse)
+ sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
}
-
def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {
val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
val transactionalId = addPartitionsToTxnRequest.transactionalId
@@ -1395,9 +1419,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// Send response callback
def sendResponseCallback(error: Errors): Unit = {
- val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(error)
- trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, error)
+ trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1415,9 +1442,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// Send response callback
def sendResponseCallback(error: Errors): Unit = {
- val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(error)
- trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
+ trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1429,7 +1459,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
val emptyResponse = new java.util.HashMap[TopicPartition, Errors]()
- requestChannel.sendResponse(new RequestChannel.Response(request, new TxnOffsetCommitResponse(emptyResponse)))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, emptyResponse)
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
@@ -1440,11 +1471,93 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseBody = new OffsetsForLeaderEpochResponse(
replicaManager.getResponseFor(requestInfo)
)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
+ }
+
+ private def handleError(request: RequestChannel.Request, e: Throwable) {
+ val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
+ if (request.requestObj != null) {
+ def sendResponseCallback(throttleTimeMs: Int) {
+ request.requestObj.handleError(e, requestChannel, request)
+ error("Error when handling request %s".format(request.requestObj), e)
+ }
+
+ if (mayThrottle) {
+ val clientId : String =
+ if (request.requestObj.isInstanceOf[ControlledShutdownRequest])
+ request.requestObj.asInstanceOf[ControlledShutdownRequest].clientId.getOrElse("")
+ else
+ throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest")
+ sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
+ } else
+ sendResponseExemptThrottle(request, () => sendResponseCallback(0))
+ } else {
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val response = request.body[AbstractRequest].getErrorResponse(throttleTimeMs, e)
+
+ /* If request doesn't have a default error response, we just close the connection.
+ For example, when produce request has acks set to 0 */
+ if (response == null)
+ requestChannel.closeConnection(request.processor, request)
+ response
+ }
+ error("Error when handling request %s".format(request.body[AbstractRequest]), e)
+ if (mayThrottle)
+ sendResponseMaybeThrottle(request, createResponse)
+ else
+ sendResponseExemptThrottle(request, new RequestChannel.Response(request, createResponse(0)))
+ }
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
+
+ private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) {
+ def sendResponseCallback(throttleTimeMs: Int) {
+ val response = createResponse(throttleTimeMs)
+ if (response != null)
+ sendResponse(request, response)
+ }
+ sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+ }
+
+ private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) {
+
+ if (request.apiRemoteCompleteTimeNanos == -1) {
+ // When this callback is triggered, the remote API call has completed
+ request.apiRemoteCompleteTimeNanos = time.nanoseconds
+ }
+ val quotaSensors = quotas.request.getOrCreateQuotaSensors(request.session.sanitizedUser, clientId)
+ def recordNetworkThreadTimeNanos(timeNanos: Long) {
+ quotas.request.recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))
+ }
+ request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+ quotas.request.recordAndThrottleOnQuotaViolation(
+ quotaSensors,
+ nanosToPercentage(request.requestThreadTimeNanos),
+ sendResponseCallback)
+ }
+
+ private def sendResponseExemptThrottle(request: RequestChannel.Request, response: Response) {
+ sendResponseExemptThrottle(request, () => requestChannel.sendResponse(response))
+ }
+
+ private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) {
+ def recordNetworkThreadTimeNanos(timeNanos: Long) {
+ quotas.request.recordExempt(nanosToPercentage(timeNanos))
+ }
+ request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+ quotas.request.recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
+ sendResponseCallback()
+ }
+
+ private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
+ requestChannel.sendResponse(new Response(request, response))
+ }
+
+ private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index c9c31ad..a1600cb 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -50,7 +50,10 @@ class KafkaRequestHandler(id: Int,
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
req = requestChannel.receiveRequest(300)
- val idleTime = time.nanoseconds - startSelectTime
+ val endTime = time.nanoseconds
+ if (req != null)
+ req.requestDequeueTimeNanos = endTime
+ val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
@@ -59,7 +62,6 @@ class KafkaRequestHandler(id: Int,
latch.countDown()
return
}
- req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 671ad63..dee39a3 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time
object QuotaType {
case object Fetch extends QuotaType
case object Produce extends QuotaType
+ case object Request extends QuotaType
case object LeaderReplication extends QuotaType
case object FollowerReplication extends QuotaType
}
@@ -36,10 +37,11 @@ object QuotaFactory {
override def isQuotaExceeded(): Boolean = false
}
- case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
+ case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, request: ClientRequestQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
def shutdown() {
fetch.shutdown
produce.shutdown
+ request.shutdown
}
}
@@ -47,6 +49,7 @@ object QuotaFactory {
QuotaManagers(
new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time),
new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
+ new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time)
)
@@ -66,6 +69,13 @@ object QuotaFactory {
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
+ def clientRequestConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
+ ClientQuotaManagerConfig(
+ numQuotaSamples = cfg.numQuotaSamples,
+ quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+ )
+ }
+
def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig =
ReplicationQuotaManagerConfig(
numQuotaSamples = cfg.numReplicationQuotaSamples,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index d9822cd..ff9fef0 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -206,7 +206,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
val hostStr = s"${node.host}:${node.port}"
assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
val brokerVersionInfo = tryBrokerVersionInfo.get
- assertEquals(0, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+ assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index aa1717a..f21c1df 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -26,13 +26,15 @@ import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import kafka.server.QuotaType
+import org.apache.kafka.common.metrics.KafkaMetric
abstract class BaseQuotaTest extends IntegrationTestHarness {
def userPrincipal : String
def producerQuotaId : QuotaId
def consumerQuotaId : QuotaId
- def overrideQuotas(producerQuota: Long, consumerQuota: Long)
+ def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
def removeQuotaOverrides()
override val serverCount = 2
@@ -55,10 +57,13 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+ this.consumerConfig.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "0")
+ this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
// Low enough quota that a producer sending a small payload in a tight loop should get throttled
val defaultProducerQuota = 8000
val defaultConsumerQuota = 2500
+ val defaultRequestQuota = Int.MaxValue
var leaderNode: KafkaServer = null
var followerNode: KafkaServer = null
@@ -99,8 +104,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, Long.MaxValue.toString)
props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, Long.MaxValue.toString)
- overrideQuotas(Long.MaxValue, Long.MaxValue)
- waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+ overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+ waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
val numRecords = 1000
assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -114,8 +119,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
@Test
def testQuotaOverrideDelete() {
// Override producer and consumer quotas to unlimited
- overrideQuotas(Long.MaxValue, Long.MaxValue)
- waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+ overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+ waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
val numRecords = 1000
assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -136,6 +141,28 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
}
+ @Test
+ def testThrottledRequest() {
+
+ overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
+ waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
+
+ val consumer = consumers.head
+ consumer.subscribe(Collections.singleton(topic1))
+ val endTimeMs = System.currentTimeMillis + 10000
+ var throttled = false
+ while (!throttled && System.currentTimeMillis < endTimeMs) {
+ consumer.poll(100)
+ val throttleMetric = consumerRequestThrottleMetric
+ throttled = throttleMetric != null && throttleMetric.value > 0
+ }
+
+ assertTrue("Should have been throttled", throttled)
+
+ assertNotNull("Exempt requests not recorded", exemptRequestMetric)
+ assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
+ }
+
def produceUntilThrottled(p: KafkaProducer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
var numProduced = 0
var throttled = false
@@ -169,31 +196,47 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
numConsumed
}
- def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) {
+ def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
TestUtils.retry(10000) {
val quotaManagers = leaderNode.apis.quotas
val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId)
val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId)
+ val overrideProducerRequestQuota = quotaManagers.request.quota(userPrincipal, producerClientId)
+ val overrideConsumerRequestQuota = quotaManagers.request.quota(userPrincipal, consumerClientId)
assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+ assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
+ assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
}
}
- private def throttleMetricName(apiKey: ApiKeys, quotaId: QuotaId): MetricName = {
+ private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
leaderNode.metrics.metricName("throttle-time",
- apiKey.name,
+ quotaType.toString,
"Tracking throttle-time per user/client-id",
"user", quotaId.sanitizedUser.getOrElse(""),
"client-id", quotaId.clientId.getOrElse(""))
}
- private def producerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId))
- private def consumerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.FETCH, consumerQuotaId))
- def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
+ def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {
+ leaderNode.metrics.metrics.get(throttleMetricName(quotaType, quotaId))
+ }
+
+ private def producerThrottleMetric = throttleMetric(QuotaType.Produce, producerQuotaId)
+ private def consumerThrottleMetric = throttleMetric(QuotaType.Fetch, consumerQuotaId)
+ private def consumerRequestThrottleMetric = throttleMetric(QuotaType.Request, consumerQuotaId)
+
+ private def exemptRequestMetric: KafkaMetric = {
+ val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+ leaderNode.metrics.metrics.get(metricName)
+ }
+
+ def quotaProperties(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Properties = {
val props = new Properties()
props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+ props.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
props
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index d71713f..f8594e1 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -33,14 +33,15 @@ class ClientIdQuotaTest extends BaseQuotaTest {
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
super.setUp()
}
-
- override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+ override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
val producerProps = new Properties()
producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+ producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(producerClientId, producerProps)
val consumerProps = new Properties()
consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+ consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(consumerClientId, consumerProps)
}
override def removeQuotaOverrides() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 82b109d..333c851 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -39,18 +39,20 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp()
- val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota)
+ val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
- waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+ waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
- override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+ override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
val producerProps = new Properties()
producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+ producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(userPrincipal, producerClientId, producerProps)
val consumerProps = new Properties()
consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+ consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
}