You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/05/01 16:13:38 UTC
[1/4] kafka git commit: KAFKA-4954; Request handler utilization quotas
Repository: kafka
Updated Branches:
refs/heads/trunk 6185bc027 -> 0104b657a
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index c8d0a77..d0e514c 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -41,13 +41,13 @@ class UserQuotaTest extends BaseQuotaTest with SaslTestHarness {
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp()
- val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota)
+ val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default, defaultProps)
- waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+ waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
- override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
- val props = quotaProperties(producerQuota, consumerQuota)
+ override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+ val props = quotaProperties(producerQuota, consumerQuota, requestQuota)
updateQuotaOverride(props)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 248b91e..c52020c 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -51,12 +51,12 @@ class ApiVersionsRequestTest extends BaseRequestTest {
@Test
def testApiVersionsRequestWithUnsupportedVersion() {
val apiVersionsRequest = new ApiVersionsRequest(0)
- val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, Some(Short.MaxValue))
+ val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, Some(Short.MaxValue), 0)
assertEquals(Errors.UNSUPPORTED_VERSION, apiVersionsResponse.error)
}
- private def sendApiVersionsRequest(request: ApiVersionsRequest, apiVersion: Option[Short] = None): ApiVersionsResponse = {
+ private def sendApiVersionsRequest(request: ApiVersionsRequest, apiVersion: Option[Short] = None, responseVersion: Short = 1): ApiVersionsResponse = {
val response = connectAndSend(request, ApiKeys.API_VERSIONS, apiVersion = apiVersion)
- ApiVersionsResponse.parse(response, 0)
+ ApiVersionsResponse.parse(response, responseVersion)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index a26bc2e..a7a12eb 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -146,7 +146,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
skipResponseHeader(response)
}
- private def skipResponseHeader(response: Array[Byte]): ByteBuffer = {
+ protected def skipResponseHeader(response: Array[Byte]): ByteBuffer = {
val responseBuffer = ByteBuffer.wrap(response)
// Parse the header to ensure its valid and move the buffer forward
ResponseHeader.parse(responseBuffer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index b581341..183ba0b 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -269,6 +269,68 @@ class ClientQuotaManagerTest {
}
@Test
+ def testRequestPercentageQuotaViolation() {
+ val metrics = newMetrics
+ val quotaManager = new ClientRequestQuotaManager(config, metrics, time)
+ quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some(Quota.upperBound(1)))
+ val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
+ def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond
+ try {
+ /* We have 10 second windows. Make sure that there is no quota violation
+ * if we are under the quota
+ */
+ for (_ <- 0 until 10) {
+ quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+ time.sleep(1000)
+ }
+ assertEquals(10, numCallbacks)
+ assertEquals(0, queueSizeMetric.value().toInt)
+
+ // Create a spike.
+ // quota = 1% (10ms per second)
+ // 4*10 + 67.1 = 107.1/10.5 = 10.2ms per second.
+ // (10.2 - quota)/quota*window-size = (10.2-10)/10*10.5 seconds = 210ms
+ // 10.5 seconds interval because the last window is half complete
+ time.sleep(500)
+ val throttleTime = quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(67.1), callback)
+
+ assertEquals("Should be throttled", 210, throttleTime)
+ assertEquals(1, queueSizeMetric.value().toInt)
+ // After a request is delayed, the callback cannot be triggered immediately
+ quotaManager.throttledRequestReaper.doWork()
+ assertEquals(10, numCallbacks)
+ time.sleep(throttleTime)
+
+ // Callback can only be triggered after the delay time passes
+ quotaManager.throttledRequestReaper.doWork()
+ assertEquals(0, queueSizeMetric.value().toInt)
+ assertEquals(11, numCallbacks)
+
+ // Could continue to see delays until the bursty sample disappears
+ for (_ <- 0 until 11) {
+ quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+ time.sleep(1000)
+ }
+
+ assertEquals("Should be unthrottled since bursty sample has rolled over",
+ 0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
+
+ // Create a very large spike which requires > one quota window to bring within quota
+ assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(500), callback))
+ for (_ <- 0 until 10) {
+ time.sleep(1000)
+ assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
+ }
+ time.sleep(1000)
+ assertEquals("Should be unthrottled since bursty sample has rolled over",
+ 0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
+
+ } finally {
+ quotaManager.shutdown()
+ }
+ }
+
+ @Test
def testExpireThrottleTimeSensor() {
val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
new file mode 100644
index 0000000..bca7bb0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -0,0 +1,420 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.server
+
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.{LinkedHashMap, Properties}
+import java.util.concurrent.{Executors, Future, TimeUnit}
+import kafka.admin.AdminUtils
+import kafka.network.RequestChannel.Session
+import kafka.security.auth._
+import kafka.utils.TestUtils
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
+import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLayer}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal}
+import org.junit.Assert._
+import org.junit.{Before, After, Test}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+class RequestQuotaTest extends BaseRequestTest {
+
+ override def numBrokers: Int = 1
+
+ private val topic = "topic-1"
+ private val numPartitions = 1
+ private val tp = new TopicPartition(topic, 0)
+ private val unthrottledClientId = "unthrottled-client"
+ private val brokerId: Integer = 0
+ private var leaderNode: KafkaServer = null
+
+ // Run tests concurrently since a throttle could be up to 1 second because quota percentage allocated is very low
+ case class Task(val apiKey: ApiKeys, val future: Future[_])
+ private val executor = Executors.newCachedThreadPool
+ private val tasks = new ListBuffer[Task]
+
+ override def propertyOverrides(properties: Properties): Unit = {
+ properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+ properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+ properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName)
+ properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
+ }
+
+ @Before
+ override def setUp() {
+
+ RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
+ super.setUp()
+
+ TestUtils.createTopic(zkUtils, topic, numPartitions, 1, servers)
+ leaderNode = servers.head
+
+ // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
+ val quotaProps = new Properties()
+ quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
+ AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
+ quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
+ AdminUtils.changeClientIdConfig(zkUtils, unthrottledClientId, quotaProps)
+
+ TestUtils.retry(10000) {
+ val quotaManager = servers(0).apis.quotas.request
+ assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"))
+ assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId))
+ }
+ }
+
+ @After
+ override def tearDown() {
+ try {
+ executor.shutdownNow()
+ } finally {
+ super.tearDown()
+ }
+ }
+
+ @Test
+ def testResponseThrottleTime() {
+
+ for (apiKey <- RequestQuotaTest.ClientActions) {
+ val builder = requestBuilder(apiKey)
+ submitTest(apiKey, () => {
+ checkRequestThrottleTime(apiKey)
+ })
+ }
+ waitAndCheckResults()
+ }
+
+ @Test
+ def testUnthrottledClient() {
+
+ for (apiKey <- RequestQuotaTest.ClientActions) {
+ val builder = requestBuilder(apiKey)
+ submitTest(apiKey, () => {
+ checkUnthrottledClient(apiKey)
+ })
+ }
+ waitAndCheckResults()
+ }
+
+ @Test
+ def testExemptRequestTime() {
+
+ for (apiKey <- RequestQuotaTest.ClusterActions) {
+ submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
+ }
+ waitAndCheckResults()
+ }
+
+ @Test
+ def testUnauthorizedThrottle() {
+
+ RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
+
+ for (apiKey <- ApiKeys.values) {
+ submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
+ }
+ waitAndCheckResults()
+ }
+
+ private def throttleTimeMetricValue(clientId: String): Double = {
+ val metricName = leaderNode.metrics.metricName("throttle-time",
+ QuotaType.Request.toString,
+ "",
+ "user", "",
+ "client-id", clientId)
+ val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).throttleTimeSensor
+ metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
+ }
+
+ private def requestTimeMetricValue(clientId: String): Double = {
+ val metricName = leaderNode.metrics.metricName("request-time",
+ QuotaType.Request.toString,
+ "",
+ "user", "",
+ "client-id", clientId)
+ val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).quotaSensor
+ metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
+ }
+
+ private def exemptRequestMetricValue: Double = {
+ val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+ metricValue(leaderNode.metrics.metrics.get(metricName), leaderNode.quotaManagers.request.exemptSensor)
+ }
+
+ private def metricValue(metric: KafkaMetric, sensor: Sensor): Double = {
+ sensor.synchronized {
+ if (metric == null) -1.0 else metric.value
+ }
+ }
+
+ private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = {
+ apiKey match {
+ case ApiKeys.PRODUCE =>
+ new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+ collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava)
+
+ case ApiKeys.FETCH =>
+ val partitionMap = new LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
+ partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
+ requests.FetchRequest.Builder.forConsumer(0, 0, partitionMap)
+
+ case ApiKeys.METADATA =>
+ new requests.MetadataRequest.Builder(List(topic).asJava)
+
+ case ApiKeys.LIST_OFFSETS =>
+ requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava)
+
+ case ApiKeys.LEADER_AND_ISR =>
+ new LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
+ Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+ Set(new Node(brokerId, "localhost", 0)).asJava)
+
+ case ApiKeys.STOP_REPLICA =>
+ new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava)
+
+ case ApiKeys.UPDATE_METADATA_KEY =>
+ val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+ val securityProtocol = SecurityProtocol.PLAINTEXT
+ val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
+ Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
+ ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava
+ new requests.UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId, Int.MaxValue, partitionState, brokers)
+
+ case ApiKeys.CONTROLLED_SHUTDOWN_KEY =>
+ new requests.ControlledShutdownRequest.Builder(brokerId)
+
+ case ApiKeys.OFFSET_COMMIT =>
+ new requests.OffsetCommitRequest.Builder("test-group",
+ Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
+ setMemberId("").setGenerationId(1).setRetentionTime(1000)
+
+ case ApiKeys.OFFSET_FETCH =>
+ new requests.OffsetFetchRequest.Builder("test-group", List(tp).asJava)
+
+ case ApiKeys.FIND_COORDINATOR =>
+ new requests.FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+
+ case ApiKeys.JOIN_GROUP =>
+ new JoinGroupRequest.Builder("test-join-group", 10000, "", "consumer",
+ List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+ .setRebalanceTimeout(60000)
+
+ case ApiKeys.HEARTBEAT =>
+ new HeartbeatRequest.Builder("test-group", 1, "")
+
+ case ApiKeys.LEAVE_GROUP =>
+ new LeaveGroupRequest.Builder("test-leave-group", "")
+
+ case ApiKeys.SYNC_GROUP =>
+ new SyncGroupRequest.Builder("test-sync-group", 1, "", Map[String, ByteBuffer]().asJava)
+
+ case ApiKeys.DESCRIBE_GROUPS =>
+ new DescribeGroupsRequest.Builder(List("test-group").asJava)
+
+ case ApiKeys.LIST_GROUPS =>
+ new ListGroupsRequest.Builder()
+
+ case ApiKeys.SASL_HANDSHAKE =>
+ new SaslHandshakeRequest.Builder("PLAIN")
+
+ case ApiKeys.API_VERSIONS =>
+ new ApiVersionsRequest.Builder
+
+ case ApiKeys.CREATE_TOPICS =>
+ new CreateTopicsRequest.Builder(Map("topic-2" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 0)
+
+ case ApiKeys.DELETE_TOPICS =>
+ new DeleteTopicsRequest.Builder(Set("topic-2").asJava, 5000)
+
+ case ApiKeys.DELETE_RECORDS =>
+ new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
+
+ case ApiKeys.INIT_PRODUCER_ID =>
+ new InitPidRequest.Builder("abc")
+
+ case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
+ new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
+
+ case ApiKeys.ADD_PARTITIONS_TO_TXN =>
+ new AddPartitionsToTxnRequest.Builder("txn1", 1, 0, List(tp).asJava)
+
+ case ApiKeys.ADD_OFFSETS_TO_TXN =>
+ new AddOffsetsToTxnRequest.Builder("txn1", 1, 0, "test-txn-group")
+
+ case ApiKeys.END_TXN =>
+ new EndTxnRequest.Builder("txn1", 1, 0, TransactionResult.forId(false))
+
+ case ApiKeys.WRITE_TXN_MARKERS =>
+ new WriteTxnMarkersRequest.Builder(0, List.empty.asJava)
+
+ case ApiKeys.TXN_OFFSET_COMMIT =>
+ new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, 3600, Map.empty.asJava)
+
+ case key =>
+ throw new IllegalArgumentException("Unsupported API key " + apiKey)
+ }
+ }
+
+ private def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
+ val apiKey = requestBuilder.apiKey
+ val request = requestBuilder.build()
+ val header = new RequestHeader(apiKey.id, request.version, clientId, correlationId)
+ val response = requestAndReceive(socket, request.serialize(header).array)
+ val responseBuffer = skipResponseHeader(response)
+ apiKey.parseResponse(request.version, responseBuffer)
+ }
+
+ case class Client(val clientId: String, val apiKey: ApiKeys) {
+ var correlationId: Int = 0
+ val builder = requestBuilder(apiKey)
+ def runUntil(until: (Struct) => Boolean): Boolean = {
+ val startMs = System.currentTimeMillis
+ var done = false
+ val socket = connect()
+ try {
+ while (!done && System.currentTimeMillis < startMs + 10000) {
+ correlationId += 1
+ val response = requestResponse(socket, clientId, correlationId, builder)
+ done = until.apply(response)
+ }
+ } finally {
+ socket.close()
+ }
+ done
+ }
+
+ override def toString: String = {
+ val requestTime = requestTimeMetricValue(clientId)
+ val throttleTime = throttleTimeMetricValue(clientId)
+ s"Client $clientId apiKey ${apiKey} requests $correlationId requestTime $requestTime throttleTime $throttleTime"
+ }
+ }
+
+ private def submitTest(apiKey: ApiKeys, test: () => Unit) {
+ val future = executor.submit(new Runnable() {
+ def run() {
+ test.apply()
+ }
+ })
+ tasks += Task(apiKey, future)
+ }
+
+ private def waitAndCheckResults() {
+ for (task <- tasks) {
+ try {
+ task.future.get(15, TimeUnit.SECONDS)
+ } catch {
+ case e: Throwable => {
+ error(s"Test failed for api-key ${task.apiKey} with exception $e")
+ throw e
+ }
+ }
+ }
+ }
+
+ private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = {
+ apiKey match {
+ case ApiKeys.PRODUCE => new ProduceResponse(response).getThrottleTime
+ case ApiKeys.FETCH => new requests.FetchResponse(response).throttleTimeMs
+ case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
+ case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
+ case ApiKeys.OFFSET_COMMIT => new requests.OffsetCommitResponse(response).throttleTimeMs
+ case ApiKeys.OFFSET_FETCH => new requests.OffsetFetchResponse(response).throttleTimeMs
+ case ApiKeys.FIND_COORDINATOR => new requests.FindCoordinatorResponse(response).throttleTimeMs
+ case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
+ case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
+ case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
+ case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
+ case ApiKeys.DESCRIBE_GROUPS => new DescribeGroupsResponse(response).throttleTimeMs
+ case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
+ case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
+ case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs
+ case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
+ case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
+ case ApiKeys.INIT_PRODUCER_ID => new InitPidResponse(response).throttleTimeMs
+ case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
+ case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
+ case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs
+ case ApiKeys.TXN_OFFSET_COMMIT => new TxnOffsetCommitResponse(response).throttleTimeMs
+ case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
+ }
+ }
+
+ private def checkRequestThrottleTime(apiKey: ApiKeys) {
+
+ // Request until throttled using client-id with default small quota
+ val clientId = apiKey.toString
+ val client = Client(clientId, apiKey)
+ val throttled = client.runUntil(response => responseThrottleTime(apiKey, response) > 0)
+
+ assertTrue(s"Response not throttled: $client", throttled)
+ assertTrue(s"Throttle time metrics not updated: $client" , throttleTimeMetricValue(clientId) > 0)
+ }
+
+ private def checkUnthrottledClient(apiKey: ApiKeys) {
+
+ // Test that request from client with large quota is not throttled
+ val unthrottledClient = Client(unthrottledClientId, apiKey)
+ unthrottledClient.runUntil(response => responseThrottleTime(apiKey, response) <= 0.0)
+ assertEquals(1, unthrottledClient.correlationId)
+ assertTrue(s"Client should not have been throttled: $unthrottledClient", throttleTimeMetricValue(unthrottledClientId) <= 0.0)
+ }
+
+ private def checkExemptRequestMetric(apiKey: ApiKeys) {
+ val exemptTarget = exemptRequestMetricValue + 0.02
+ val clientId = apiKey.toString
+ val client = Client(clientId, apiKey)
+ val updated = client.runUntil(response => exemptRequestMetricValue > exemptTarget)
+
+ assertTrue(s"Exempt-request-time metric not updated: $client", updated)
+ assertTrue(s"Client should not have been throttled: $client", throttleTimeMetricValue(clientId) <= 0.0)
+ }
+
+ private def checkUnauthorizedRequestThrottle(apiKey: ApiKeys) {
+ val clientId = "unauthorized-" + apiKey.toString
+ val client = Client(clientId, apiKey)
+ val throttled = client.runUntil(response => throttleTimeMetricValue(clientId) > 0.0)
+ assertTrue(s"Unauthorized client should have been throttled: $client", throttled)
+ }
+}
+
+object RequestQuotaTest {
+ val ClusterActions = ApiKeys.values.toSet.filter(apiKey => apiKey.clusterAction)
+ val ClientActions = ApiKeys.values.toSet -- ClusterActions - ApiKeys.SASL_HANDSHAKE
+
+ val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
+ // Principal used for all client connections. This is modified by tests which
+ // check unauthorized code path
+ var principal = KafkaPrincipal.ANONYMOUS
+ class TestAuthorizer extends SimpleAclAuthorizer {
+ override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
+ session.principal != UnauthorizedPrincipal
+ }
+ }
+ class TestPrincipalBuilder extends DefaultPrincipalBuilder {
+ override def buildPrincipal(transportLayer: TransportLayer, authenticator: Authenticator) = {
+ principal
+ }
+ }
+}
[3/4] kafka git commit: KAFKA-4954; Request handler utilization quotas
Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 3ff6aca..99e4e8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class EndTxnResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
@@ -34,15 +35,22 @@ public class EndTxnResponse extends AbstractResponse {
// InvalidProducerEpoch
private final Errors error;
+ private final int throttleTimeMs;
- public EndTxnResponse(Errors error) {
+ public EndTxnResponse(int throttleTimeMs, Errors error) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
}
public EndTxnResponse(Struct struct) {
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -50,6 +58,7 @@ public class EndTxnResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 78715be..4c6998b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -211,7 +211,7 @@ public class FetchRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
@@ -220,7 +220,7 @@ public class FetchRequest extends AbstractRequest {
null, MemoryRecords.EMPTY);
responseData.put(entry.getKey(), partitionResponse);
}
- return new FetchResponse(responseData, 0);
+ return new FetchResponse(responseData, throttleTimeMs);
}
public int replicaId() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index 46f3426..b2eaf63 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -84,12 +84,13 @@ public class FindCoordinatorRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
- case 1:
return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
+ case 1:
+ return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index f96f123..b558b62 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
public class FindCoordinatorResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
private static final String COORDINATOR_KEY_NAME = "coordinator";
@@ -42,17 +43,24 @@ public class FindCoordinatorResponse extends AbstractResponse {
private static final String HOST_KEY_NAME = "host";
private static final String PORT_KEY_NAME = "port";
+ private final int throttleTimeMs;
private final String errorMessage;
private final Errors error;
private final Node node;
public FindCoordinatorResponse(Errors error, Node node) {
+ this(DEFAULT_THROTTLE_TIME, error, node);
+ }
+
+ public FindCoordinatorResponse(int throttleTimeMs, Errors error, Node node) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.node = node;
this.errorMessage = null;
}
public FindCoordinatorResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
@@ -66,6 +74,10 @@ public class FindCoordinatorResponse extends AbstractResponse {
node = new Node(nodeId, host, port);
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -77,6 +89,8 @@ public class FindCoordinatorResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 44591a0..7e08a55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -75,11 +75,13 @@ public class HeartbeatRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
return new HeartbeatResponse(Errors.forException(e));
+ case 1:
+ return new HeartbeatResponse(throttleTimeMs, Errors.forException(e));
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.HEARTBEAT.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 9bc400c..a90212b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
public class HeartbeatResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
@@ -37,15 +38,26 @@ public class HeartbeatResponse extends AbstractResponse {
* GROUP_AUTHORIZATION_FAILED (30)
*/
private final Errors error;
+ private final int throttleTimeMs;
public HeartbeatResponse(Errors error) {
+ this(DEFAULT_THROTTLE_TIME, error);
+ }
+
+ public HeartbeatResponse(int throttleTimeMs, Errors error) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
}
public HeartbeatResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -53,6 +65,8 @@ public class HeartbeatResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
index eff05e2..57d32e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -77,8 +77,8 @@ public class InitPidRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
- return new InitPidResponse(Errors.forException(e));
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new InitPidResponse(throttleTimeMs, Errors.forException(e));
}
public static InitPidRequest parse(ByteBuffer buffer, short version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
index 4b65aea..3c858af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -29,27 +29,35 @@ public class InitPidResponse extends AbstractResponse {
* OK
*
*/
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private final int throttleTimeMs;
private final Errors error;
private final long producerId;
private final short epoch;
- public InitPidResponse(Errors error, long producerId, short epoch) {
+ public InitPidResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.producerId = producerId;
this.epoch = epoch;
}
public InitPidResponse(Struct struct) {
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
this.epoch = struct.getShort(EPOCH_KEY_NAME);
}
- public InitPidResponse(Errors errors) {
- this(errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+ public InitPidResponse(int throttleTimeMs, Errors errors) {
+ this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
}
public long producerId() {
@@ -67,6 +75,7 @@ public class InitPidResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(PRODUCER_ID_KEY_NAME, producerId);
struct.set(EPOCH_KEY_NAME, epoch);
struct.set(ERROR_CODE_KEY_NAME, error.code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 994d9a2..1080fe7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -149,7 +149,7 @@ public class JoinGroupRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
@@ -161,6 +161,15 @@ public class JoinGroupRequest extends AbstractRequest {
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
Collections.<String, ByteBuffer>emptyMap());
+ case 2:
+ return new JoinGroupResponse(
+ throttleTimeMs,
+ Errors.forException(e),
+ JoinGroupResponse.UNKNOWN_GENERATION_ID,
+ JoinGroupResponse.UNKNOWN_PROTOCOL,
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+ Collections.<String, ByteBuffer>emptyMap());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 1f702c7..a1c9e2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -28,6 +28,7 @@ import java.util.Map;
public class JoinGroupResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
@@ -54,6 +55,7 @@ public class JoinGroupResponse extends AbstractResponse {
public static final int UNKNOWN_GENERATION_ID = -1;
public static final String UNKNOWN_MEMBER_ID = "";
+ private final int throttleTimeMs;
private final Errors error;
private final int generationId;
private final String groupProtocol;
@@ -67,6 +69,17 @@ public class JoinGroupResponse extends AbstractResponse {
String memberId,
String leaderId,
Map<String, ByteBuffer> groupMembers) {
+ this(DEFAULT_THROTTLE_TIME, error, generationId, groupProtocol, memberId, leaderId, groupMembers);
+ }
+
+ public JoinGroupResponse(int throttleTimeMs,
+ Errors error,
+ int generationId,
+ String groupProtocol,
+ String memberId,
+ String leaderId,
+ Map<String, ByteBuffer> groupMembers) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.generationId = generationId;
this.groupProtocol = groupProtocol;
@@ -76,6 +89,7 @@ public class JoinGroupResponse extends AbstractResponse {
}
public JoinGroupResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
members = new HashMap<>();
for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
@@ -91,6 +105,10 @@ public class JoinGroupResponse extends AbstractResponse {
leaderId = struct.getString(LEADER_ID_KEY_NAME);
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -126,6 +144,8 @@ public class JoinGroupResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.JOIN_GROUP.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
struct.set(GENERATION_ID_KEY_NAME, generationId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 8843755..36426c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -179,7 +179,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<TopicPartition, Errors> responses = new HashMap<>(partitionStates.size());
for (TopicPartition partition : partitionStates.keySet()) {
responses.put(partition, Errors.forException(e));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 4b5820b..76e076e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -67,11 +67,13 @@ public class LeaveGroupRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
return new LeaveGroupResponse(Errors.forException(e));
+ case 1:
+ return new LeaveGroupResponse(throttleTimeMs, Errors.forException(e));
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.LEAVE_GROUP.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 49b704b..ccfc8a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
public class LeaveGroupResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
@@ -36,15 +37,26 @@ public class LeaveGroupResponse extends AbstractResponse {
* GROUP_AUTHORIZATION_FAILED (30)
*/
private final Errors error;
+ private final int throttleTimeMs;
public LeaveGroupResponse(Errors error) {
+ this(DEFAULT_THROTTLE_TIME, error);
+ }
+
+ public LeaveGroupResponse(int throttleTimeMs, Errors error) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
}
public LeaveGroupResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -52,6 +64,8 @@ public class LeaveGroupResponse extends AbstractResponse {
@Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index cceff92..3d4f2b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -49,11 +49,13 @@ public class ListGroupsRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
return new ListGroupsResponse(Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
+ case 1:
+ return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.LIST_GROUPS.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index d0409ef..13f589f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -27,6 +27,7 @@ import java.util.List;
public class ListGroupsResponse extends AbstractResponse {
+ public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final String ERROR_CODE_KEY_NAME = "error_code";
public static final String GROUPS_KEY_NAME = "groups";
public static final String GROUP_ID_KEY_NAME = "group_id";
@@ -40,14 +41,21 @@ public class ListGroupsResponse extends AbstractResponse {
*/
private final Errors error;
+ private final int throttleTimeMs;
private final List<Group> groups;
public ListGroupsResponse(Errors error, List<Group> groups) {
+ this(DEFAULT_THROTTLE_TIME, error, groups);
+ }
+
+ public ListGroupsResponse(int throttleTimeMs, Errors error, List<Group> groups) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.groups = groups;
}
public ListGroupsResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
this.groups = new ArrayList<>();
for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
@@ -58,6 +66,10 @@ public class ListGroupsResponse extends AbstractResponse {
}
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public List<Group> groups() {
return groups;
}
@@ -88,6 +100,8 @@ public class ListGroupsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.LIST_GROUPS.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
List<Struct> groupList = new ArrayList<>();
for (Group group : groups) {
@@ -101,7 +115,11 @@ public class ListGroupsResponse extends AbstractResponse {
}
public static ListGroupsResponse fromError(Errors error) {
- return new ListGroupsResponse(error, Collections.<Group>emptyList());
+ return fromError(DEFAULT_THROTTLE_TIME, error);
+ }
+
+ public static ListGroupsResponse fromError(int throttleTimeMs, Errors error) {
+ return new ListGroupsResponse(throttleTimeMs, error, Collections.<Group>emptyList());
}
public static ListGroupsResponse parse(ByteBuffer buffer, short version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 3327071..7dbffd1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -169,7 +169,7 @@ public class ListOffsetRequest extends AbstractRequest {
super(version);
this.replicaId = replicaId;
this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null;
- this.partitionTimestamps = version == 1 ? (Map<TopicPartition, Long>) targetTimes : null;
+ this.partitionTimestamps = version >= 1 ? (Map<TopicPartition, Long>) targetTimes : null;
this.duplicatePartitions = Collections.emptySet();
}
@@ -202,7 +202,7 @@ public class ListOffsetRequest extends AbstractRequest {
@Override
@SuppressWarnings("deprecation")
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
short versionId = version();
@@ -224,6 +224,8 @@ public class ListOffsetRequest extends AbstractRequest {
case 0:
case 1:
return new ListOffsetResponse(responseData);
+ case 2:
+ return new ListOffsetResponse(throttleTimeMs, responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.LIST_OFFSETS.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 3f049b4..61c2a55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -33,6 +33,7 @@ public class ListOffsetResponse extends AbstractResponse {
public static final long UNKNOWN_TIMESTAMP = -1L;
public static final long UNKNOWN_OFFSET = -1L;
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String RESPONSES_KEY_NAME = "responses";
// topic level field names
@@ -105,16 +106,23 @@ public class ListOffsetResponse extends AbstractResponse {
}
}
+ private final int throttleTimeMs;
private final Map<TopicPartition, PartitionData> responseData;
/**
- * Constructor for all versions.
+ * Constructor for all versions without throttle time
*/
public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
+ this(DEFAULT_THROTTLE_TIME, responseData);
+ }
+
+ public ListOffsetResponse(int throttleTimeMs, Map<TopicPartition, PartitionData> responseData) {
+ this.throttleTimeMs = throttleTimeMs;
this.responseData = responseData;
}
public ListOffsetResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
responseData = new HashMap<>();
for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicResponse = (Struct) topicResponseObj;
@@ -140,6 +148,10 @@ public class ListOffsetResponse extends AbstractResponse {
}
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Map<TopicPartition, PartitionData> responseData() {
return responseData;
}
@@ -151,6 +163,8 @@ public class ListOffsetResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.LIST_OFFSETS.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 97072d5..3c20139 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -105,7 +105,7 @@ public class MetadataRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
Errors error = Errors.forException(e);
List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
@@ -121,6 +121,8 @@ public class MetadataRequest extends AbstractRequest {
case 1:
case 2:
return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
+ case 3:
+ return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.METADATA.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 51aaa23..bd79653 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -35,6 +35,7 @@ import java.util.Set;
public class MetadataResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String BROKERS_KEY_NAME = "brokers";
private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
@@ -80,6 +81,7 @@ public class MetadataResponse extends AbstractResponse {
private static final String REPLICAS_KEY_NAME = "replicas";
private static final String ISR_KEY_NAME = "isr";
+ private final int throttleTimeMs;
private final Collection<Node> brokers;
private final Node controller;
private final List<TopicMetadata> topicMetadata;
@@ -89,6 +91,11 @@ public class MetadataResponse extends AbstractResponse {
* Constructor for all versions.
*/
public MetadataResponse(List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
+ this(DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
+ }
+
+ public MetadataResponse(int throttleTimeMs, List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
+ this.throttleTimeMs = throttleTimeMs;
this.brokers = brokers;
this.controller = getControllerNode(controllerId, brokers);
this.topicMetadata = topicMetadata;
@@ -96,6 +103,7 @@ public class MetadataResponse extends AbstractResponse {
}
public MetadataResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
Map<Integer, Node> brokers = new HashMap<>();
Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
for (Object brokerStruct : brokerStructs) {
@@ -179,6 +187,10 @@ public class MetadataResponse extends AbstractResponse {
return null;
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
/**
* Get a map of the topics which had metadata errors
* @return the map
@@ -365,6 +377,8 @@ public class MetadataResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.METADATA.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
List<Struct> brokerArray = new ArrayList<>();
for (Node node : brokers) {
Struct broker = struct.instance(BROKERS_KEY_NAME);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 45975d0..4402c4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -133,6 +133,7 @@ public class OffsetCommitRequest extends AbstractRequest {
DEFAULT_RETENTION_TIME, offsetData, version);
case 1:
case 2:
+ case 3:
long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime;
return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version);
default:
@@ -245,7 +246,7 @@ public class OffsetCommitRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<TopicPartition, Errors> responseData = new HashMap<>();
for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
responseData.put(entry.getKey(), Errors.forException(e));
@@ -257,6 +258,8 @@ public class OffsetCommitRequest extends AbstractRequest {
case 1:
case 2:
return new OffsetCommitResponse(responseData);
+ case 3:
+ return new OffsetCommitResponse(throttleTimeMs, responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_COMMIT.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index b1dae37..d8d647d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -30,6 +30,7 @@ import java.util.Map;
public class OffsetCommitResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String RESPONSES_KEY_NAME = "responses";
// topic level fields
@@ -57,12 +58,19 @@ public class OffsetCommitResponse extends AbstractResponse {
*/
private final Map<TopicPartition, Errors> responseData;
+ private final int throttleTimeMs;
public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) {
+ this(DEFAULT_THROTTLE_TIME, responseData);
+ }
+
+ public OffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> responseData) {
+ this.throttleTimeMs = throttleTimeMs;
this.responseData = responseData;
}
public OffsetCommitResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
responseData = new HashMap<>();
for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicResponse = (Struct) topicResponseObj;
@@ -79,6 +87,8 @@ public class OffsetCommitResponse extends AbstractResponse {
@Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<>();
@@ -100,6 +110,10 @@ public class OffsetCommitResponse extends AbstractResponse {
return struct;
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Map<TopicPartition, Errors> responseData() {
return responseData;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 71ec3f6..1d810e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -116,6 +116,10 @@ public class OffsetFetchRequest extends AbstractRequest {
}
public OffsetFetchResponse getErrorResponse(Errors error) {
+ return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, error);
+ }
+
+ public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Errors error) {
short versionId = version();
Map<TopicPartition, OffsetFetchResponse.PartitionData> responsePartitions = new HashMap<>();
@@ -133,6 +137,8 @@ public class OffsetFetchRequest extends AbstractRequest {
case 1:
case 2:
return new OffsetFetchResponse(error, responsePartitions);
+ case 3:
+ return new OffsetFetchResponse(throttleTimeMs, error, responsePartitions);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_FETCH.latestVersion()));
@@ -140,8 +146,8 @@ public class OffsetFetchRequest extends AbstractRequest {
}
@Override
- public OffsetFetchResponse getErrorResponse(Throwable e) {
- return getErrorResponse(Errors.forException(e));
+ public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return getErrorResponse(throttleTimeMs, Errors.forException(e));
}
public String groupId() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 69507f0..f795a5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
public class OffsetFetchResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String RESPONSES_KEY_NAME = "responses";
private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -67,6 +68,7 @@ public class OffsetFetchResponse extends AbstractResponse {
private final Map<TopicPartition, PartitionData> responseData;
private final Errors error;
+ private final int throttleTimeMs;
public static final class PartitionData {
public final long offset;
@@ -85,16 +87,28 @@ public class OffsetFetchResponse extends AbstractResponse {
}
/**
- * Constructor for all versions.
+ * Constructor for all versions without throttle time.
* @param error Potential coordinator or group level error code (for api version 2 and later)
* @param responseData Fetched offset information grouped by topic-partition
*/
public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) {
+ this(DEFAULT_THROTTLE_TIME, error, responseData);
+ }
+
+ /**
+ * Constructor with throttle time
+ * @param throttleTimeMs The time in milliseconds that this response was throttled
+ * @param error Potential coordinator or group level error code (for api version 2 and later)
+ * @param responseData Fetched offset information grouped by topic-partition
+ */
+ public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition, PartitionData> responseData) {
+ this.throttleTimeMs = throttleTimeMs;
this.responseData = responseData;
this.error = error;
}
public OffsetFetchResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
Errors topLevelError = Errors.NONE;
this.responseData = new HashMap<>();
for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
@@ -128,6 +142,10 @@ public class OffsetFetchResponse extends AbstractResponse {
}
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public boolean hasError() {
return this.error != Errors.NONE;
}
@@ -147,6 +165,8 @@ public class OffsetFetchResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.OFFSET_FETCH.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 3c285f1..f898a75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -127,7 +127,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
for (TopicPartition tp : epochsByPartition.keySet()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 482811e..b63f6c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -224,7 +224,7 @@ public class ProduceRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
/* In case the producer doesn't actually want any response */
if (acks == 0)
return null;
@@ -241,7 +241,7 @@ public class ProduceRequest extends AbstractRequest {
case 1:
case 2:
case 3:
- return new ProduceResponse(responseMap);
+ return new ProduceResponse(responseMap, throttleTimeMs);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.PRODUCE.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 152391e..06c1f6e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -46,7 +46,6 @@ public class ProduceResponse extends AbstractResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
public static final long INVALID_OFFSET = -1L;
- public static final int DEFAULT_THROTTLE_TIME = 0;
/**
* Possible error code:
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index 56c29c5..e49b727 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -40,6 +40,29 @@ public class SaslHandshakeRequest extends AbstractRequest {
private final String mechanism;
+ public static class Builder extends AbstractRequest.Builder<SaslHandshakeRequest> {
+ private final String mechanism;
+
+ public Builder(String mechanism) {
+ super(ApiKeys.SASL_HANDSHAKE);
+ this.mechanism = mechanism;
+ }
+
+ @Override
+ public SaslHandshakeRequest build(short version) {
+ return new SaslHandshakeRequest(mechanism);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(type=SaslHandshakeRequest").
+ append(", mechanism=").append(mechanism).
+ append(")");
+ return bld.toString();
+ }
+ }
+
public SaslHandshakeRequest(String mechanism) {
super(ApiKeys.SASL_HANDSHAKE.latestVersion());
this.mechanism = mechanism;
@@ -55,7 +78,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 325ee06..48aa16e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -103,7 +103,7 @@ public class StopReplicaRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<TopicPartition, Errors> responses = new HashMap<>(partitions.size());
for (TopicPartition partition : partitions) {
responses.put(partition, Errors.forException(e));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index dbc19ac..82df84a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -98,13 +98,18 @@ public class SyncGroupRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
return new SyncGroupResponse(
Errors.forException(e),
ByteBuffer.wrap(new byte[]{}));
+ case 1:
+ return new SyncGroupResponse(
+ throttleTimeMs,
+ Errors.forException(e),
+ ByteBuffer.wrap(new byte[]{}));
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 4a06491..b99a99f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
public class SyncGroupResponse extends AbstractResponse {
+ public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final String ERROR_CODE_KEY_NAME = "error_code";
public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
@@ -39,18 +40,29 @@ public class SyncGroupResponse extends AbstractResponse {
*/
private final Errors error;
+ private final int throttleTimeMs;
private final ByteBuffer memberState;
public SyncGroupResponse(Errors error, ByteBuffer memberState) {
+ this(DEFAULT_THROTTLE_TIME, error, memberState);
+ }
+
+ public SyncGroupResponse(int throttleTimeMs, Errors error, ByteBuffer memberState) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.memberState = memberState;
}
public SyncGroupResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -62,6 +74,8 @@ public class SyncGroupResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
return struct;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 584f733..cca8875 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -155,12 +155,12 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
}
@Override
- public TxnOffsetCommitResponse getErrorResponse(Throwable e) {
+ public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
Map<TopicPartition, Errors> errors = new HashMap<>(offsets.size());
for (TopicPartition partition : offsets.keySet())
errors.put(partition, error);
- return new TxnOffsetCommitResponse(errors);
+ return new TxnOffsetCommitResponse(throttleTimeMs, errors);
}
public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 5574aea..37b9a50 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
public class TxnOffsetCommitResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String PARTITIONS_KEY_NAME = "partitions";
private static final String TOPIC_KEY_NAME = "topic";
@@ -43,12 +44,15 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
// InvalidCommitOffsetSize
private final Map<TopicPartition, Errors> errors;
+ private final int throttleTimeMs;
- public TxnOffsetCommitResponse(Map<TopicPartition, Errors> errors) {
+ public TxnOffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
+ this.throttleTimeMs = throttleTimeMs;
this.errors = errors;
}
public TxnOffsetCommitResponse(Struct struct) {
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
Map<TopicPartition, Errors> errors = new HashMap<>();
Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
for (Object topicPartitionObj : topicPartitionsArray) {
@@ -67,6 +71,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(errors);
Object[] partitionsArray = new Object[mappedPartitions.size()];
int i = 0;
@@ -91,6 +96,10 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
return struct;
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Map<TopicPartition, Errors> errors() {
return errors;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index fc7a33f..be2eaab 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -284,7 +284,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
if (versionId <= 3)
return new UpdateMetadataResponse(Errors.forException(e));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 998d504..7cded24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -192,7 +192,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
}
@Override
- public WriteTxnMarkersResponse getErrorResponse(Throwable e) {
+ public WriteTxnMarkersResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 1ee9d0f..55b4fc6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -158,7 +158,8 @@ public class NetworkClientTest {
}
private void maybeSetExpectedApiVersionsResponse() {
- ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize((short) 0, new ResponseHeader(0));
+ short apiVersionsResponseVersion = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
+ ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 934c895..2017ef9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -384,7 +384,7 @@ public class SenderTest {
public boolean matches(AbstractRequest body) {
return body instanceof InitPidRequest;
}
- }, new InitPidResponse(Errors.NONE, producerId, (short) 0));
+ }, new InitPidResponse(0, Errors.NONE, producerId, (short) 0));
sender.run(time.milliseconds());
assertTrue(transactionManager.hasPid());
assertEquals(producerId, transactionManager.pidAndEpoch().producerId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index a1efa58..1ab86aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -199,7 +199,7 @@ public class TransactionManagerTest {
assertEquals(epoch, addOffsetsToTxnRequest.producerEpoch());
return true;
}
- }, new AddOffsetsToTxnResponse(Errors.NONE));
+ }, new AddOffsetsToTxnResponse(0, Errors.NONE));
sender.run(time.milliseconds()); // Send AddOffsetsRequest
assertTrue(transactionManager.hasPendingOffsetCommits()); // We should now have created and queued the offset commit request.
@@ -219,7 +219,7 @@ public class TransactionManagerTest {
assertEquals(epoch, txnOffsetCommitRequest.producerEpoch());
return true;
}
- }, new TxnOffsetCommitResponse(txnOffsetCommitResponse));
+ }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
sender.run(time.milliseconds()); // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
@@ -528,7 +528,7 @@ public class TransactionManagerTest {
assertEquals(initPidRequest.transactionTimeoutMs(), transactionTimeoutMs);
return true;
}
- }, new InitPidResponse(error, pid, epoch), shouldDisconnect);
+ }, new InitPidResponse(0, error, pid, epoch), shouldDisconnect);
}
private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
@@ -563,7 +563,7 @@ public class TransactionManagerTest {
assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
return true;
}
- }, new AddPartitionsToTxnResponse(error));
+ }, new AddPartitionsToTxnResponse(0, error));
}
private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
@@ -577,7 +577,7 @@ public class TransactionManagerTest {
assertEquals(result, endTxnRequest.command());
return true;
}
- }, new EndTxnResponse(error));
+ }, new EndTxnResponse(0, error));
}
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 1de6789..345ace1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.network;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
@@ -27,6 +29,7 @@ import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import javax.net.ssl.SSLContext;
@@ -40,6 +43,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -460,6 +464,41 @@ public class SslTransportLayerTest {
NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
}
+ /**
+ * Tests that time spent on the network thread is accumulated on each channel
+ */
+ @Test
+ public void testNetworkThreadTimeRecorded() throws Exception {
+ selector.close();
+ this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
+ "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder);
+
+ String node = "0";
+ server = createEchoServer(SecurityProtocol.SSL);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ String message = TestUtils.randomString(10 * 1024);
+ NetworkTestUtils.waitForChannelReady(selector, node);
+ KafkaChannel channel = selector.channel(node);
+ assertTrue("SSL handshake time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+ assertEquals("Time not reset", 0, channel.getAndResetNetworkThreadTimeNanos());
+
+ selector.mute(node);
+ selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes())));
+ while (selector.completedSends().isEmpty()) {
+ selector.poll(100L);
+ }
+ assertTrue("Send time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+ assertEquals("Time not reset", 0, channel.getAndResetNetworkThreadTimeNanos());
+
+ selector.unmute(node);
+ while (selector.completedReceives().isEmpty()) {
+ selector.poll(100L);
+ }
+ assertTrue("Receive time not recorded", channel.getAndResetNetworkThreadTimeNanos() > 0);
+ }
+
@Test
public void testCloseSsl() throws Exception {
testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index a4aeb25..8410e6a 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -16,6 +16,14 @@
*/
package org.apache.kafka.common.protocol;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.junit.Test;
public class ApiKeysTest {
@@ -35,4 +43,27 @@ public class ApiKeysTest {
ApiKeys.PRODUCE.requestSchema((short) Protocol.REQUESTS[ApiKeys.PRODUCE.id].length);
}
+ /**
+ * All valid client responses which may be throttled should have a field named
+ * 'throttle_time_ms' to return the throttle time to the client. Exclusions are
+ * <ul>
+ * <li>Cluster actions used only for inter-broker are throttled only if unauthorized
+ * <li> SASL_HANDSHAKE is not throttled when used for authentication when a connection
+ * is established. At any other time, this request returns an error response that
+ * may be throttled.
+ * </ul>
+ */
+ @Test
+ public void testResponseThrottleTime() {
+ List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE);
+
+ for (ApiKeys apiKey: ApiKeys.values()) {
+ Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
+ Field throttleTimeField = responseSchema.get("throttle_time_ms");
+ if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
+ assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField);
+ else
+ assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9e283c0..422f9e6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -841,7 +841,7 @@ public class RequestResponseTest {
}
private InitPidResponse createInitPidResponse() {
- return new InitPidResponse(Errors.NONE, 3332, (short) 3);
+ return new InitPidResponse(0, Errors.NONE, 3332, (short) 3);
}
@@ -871,7 +871,7 @@ public class RequestResponseTest {
}
private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse() {
- return new AddPartitionsToTxnResponse(Errors.NONE);
+ return new AddPartitionsToTxnResponse(0, Errors.NONE);
}
private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {
@@ -879,7 +879,7 @@ public class RequestResponseTest {
}
private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
- return new AddOffsetsToTxnResponse(Errors.NONE);
+ return new AddOffsetsToTxnResponse(0, Errors.NONE);
}
private EndTxnRequest createEndTxnRequest() {
@@ -887,7 +887,7 @@ public class RequestResponseTest {
}
private EndTxnResponse createEndTxnResponse() {
- return new EndTxnResponse(Errors.NONE);
+ return new EndTxnResponse(0, Errors.NONE);
}
private WriteTxnMarkersRequest createWriteTxnMarkersRequest() {
@@ -914,7 +914,7 @@ public class RequestResponseTest {
private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);
- return new TxnOffsetCommitResponse(errorPerPartitions);
+ return new TxnOffsetCommitResponse(0, errorPerPartitions);
}
private static class ByteBufferChannel implements GatheringByteChannel {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7bfe76a..2d41869 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,7 +41,7 @@ import scala.reflect.{ClassTag, classTag}
object RequestChannel extends Logging {
val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
- buffer = shutdownReceive, startTimeMs = 0, listenerName = new ListenerName(""),
+ buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""),
securityProtocol = SecurityProtocol.PLAINTEXT)
private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -57,14 +57,15 @@ object RequestChannel extends Logging {
}
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
- startTimeMs: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
+ startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
- @volatile var requestDequeueTimeMs = -1L
- @volatile var apiLocalCompleteTimeMs = -1L
- @volatile var responseCompleteTimeMs = -1L
- @volatile var responseDequeueTimeMs = -1L
- @volatile var apiRemoteCompleteTimeMs = -1L
+ @volatile var requestDequeueTimeNanos = -1L
+ @volatile var apiLocalCompleteTimeNanos = -1L
+ @volatile var responseCompleteTimeNanos = -1L
+ @volatile var responseDequeueTimeNanos = -1L
+ @volatile var apiRemoteCompleteTimeNanos = -1L
+ @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
val requestId = buffer.getShort()
@@ -122,26 +123,33 @@ object RequestChannel extends Logging {
trace("Processor %d received request : %s".format(processor, requestDesc(true)))
- def updateRequestMetrics() {
- val endTimeMs = Time.SYSTEM.milliseconds
- // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
+ def requestThreadTimeNanos = {
+ if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
+ math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
+ }
+
+ def updateRequestMetrics(networkThreadTimeNanos: Long) {
+ val endTimeNanos = Time.SYSTEM.nanoseconds
+ // In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote
// processing time is really small. This value is set in KafkaApis from a request handling thread.
// This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
- // see a negative value here. In that case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
- if (apiLocalCompleteTimeMs < 0)
- apiLocalCompleteTimeMs = responseCompleteTimeMs
- // If the apiRemoteCompleteTimeMs is not set (i.e., for requests that do not go through a purgatory), then it is
- // the same as responseCompleteTimeMs.
- if (apiRemoteCompleteTimeMs < 0)
- apiRemoteCompleteTimeMs = responseCompleteTimeMs
-
- val requestQueueTime = math.max(requestDequeueTimeMs - startTimeMs, 0)
- val apiLocalTime = math.max(apiLocalCompleteTimeMs - requestDequeueTimeMs, 0)
- val apiRemoteTime = math.max(apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs, 0)
- val apiThrottleTime = math.max(responseCompleteTimeMs - apiRemoteCompleteTimeMs, 0)
- val responseQueueTime = math.max(responseDequeueTimeMs - responseCompleteTimeMs, 0)
- val responseSendTime = math.max(endTimeMs - responseDequeueTimeMs, 0)
- val totalTime = endTimeMs - startTimeMs
+ // see a negative value here. In that case, use responseCompleteTimeNanos as apiLocalCompleteTimeNanos.
+ if (apiLocalCompleteTimeNanos < 0)
+ apiLocalCompleteTimeNanos = responseCompleteTimeNanos
+ // If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that do not go through a purgatory), then it is
+ // the same as responseCompleteTimeNans.
+ if (apiRemoteCompleteTimeNanos < 0)
+ apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
+
+ def nanosToMs(nanos: Long) = math.max(TimeUnit.NANOSECONDS.toMillis(nanos), 0)
+
+ val requestQueueTime = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
+ val apiLocalTime = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
+ val apiRemoteTime = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
+ val apiThrottleTime = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
+ val responseQueueTime = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
+ val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+ val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
val fetchMetricNames =
if (requestId == ApiKeys.FETCH.id) {
val isFromFollower = body[FetchRequest].isFromFollower
@@ -164,16 +172,32 @@ object RequestChannel extends Logging {
m.totalTimeHist.update(totalTime)
}
+ // Records network handler thread usage. This is included towards the request quota for the
+ // user/client. Throttling is only performed when request handler thread usage
+ // is recorded, just before responses are queued for delivery.
+ // The time recorded here is the time spent on the network thread for receiving this request
+ // and sending the response. Note that for the first request on a connection, the time includes
+ // the total time spent on authentication, which may be significant for SASL/SSL.
+ recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))
+
if (requestLogger.isDebugEnabled) {
val detailsEnabled = requestLogger.isTraceEnabled
- requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
- .format(requestDesc(detailsEnabled), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal, listenerName.value))
+ def nanosToMs(nanos: Long) = TimeUnit.NANOSECONDS.toMicros(math.max(nanos, 0)).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
+ val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
+ val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
+ val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
+ val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
+ val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
+ val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+ requestLogger.trace("Completed request:%s from connection %s;totalTime:%f,requestQueueTime:%f,localTime:%f,remoteTime:%f,responseQueueTime:%f,sendTime:%f,securityProtocol:%s,principal:%s,listener:%s"
+ .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value))
}
}
}
case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
- request.responseCompleteTimeMs = Time.SYSTEM.milliseconds
+ request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
+ if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
def this(request: Request, responseSend: Send) =
this(request.processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
@@ -253,7 +277,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
def receiveResponse(processor: Int): RequestChannel.Response = {
val response = responseQueues(processor).poll()
if (response != null)
- response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
+ response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
response
}
[2/4] kafka git commit: KAFKA-4954; Request handler utilization quotas
Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b9bf3e4..b2a3456 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -419,6 +419,7 @@ private[kafka] class Processor(val id: Int,
"socket-server",
metricTags,
false,
+ true,
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
override def run() {
@@ -457,7 +458,7 @@ private[kafka] class Processor(val id: Int,
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
- curr.request.updateRequestMetrics
+ updateRequestMetrics(curr.request)
trace("Socket server received empty response to send, registering for read: " + curr)
val channelId = curr.request.connectionId
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
@@ -465,7 +466,7 @@ private[kafka] class Processor(val id: Int,
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
- curr.request.updateRequestMetrics
+ updateRequestMetrics(curr.request)
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
@@ -482,7 +483,7 @@ private[kafka] class Processor(val id: Int,
// `channel` can be null if the selector closed the connection because it was idle for too long
if (channel == null) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
- response.request.updateRequestMetrics()
+ response.request.updateRequestMetrics(0L)
}
else {
selector.send(response.responseSend)
@@ -505,14 +506,13 @@ private[kafka] class Processor(val id: Int,
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
- val session = {
- // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
- val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
- RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
- }
+ // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
+ val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
+ val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
+
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
- buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
- securityProtocol = securityProtocol)
+ buffer = receive.payload, startTimeNanos = time.nanoseconds,
+ listenerName = listenerName, securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
@@ -529,17 +529,24 @@ private[kafka] class Processor(val id: Int,
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
- resp.request.updateRequestMetrics()
+ updateRequestMetrics(resp.request)
selector.unmute(send.destination)
}
}
+ private def updateRequestMetrics(request: RequestChannel.Request) {
+ val channel = selector.channel(request.connectionId)
+ val openOrClosingChannel = if (channel != null) channel else selector.closingChannel(request.connectionId)
+ val networkThreadTimeNanos = if (openOrClosingChannel != null) openOrClosingChannel.getAndResetNetworkThreadTimeNanos() else 0L
+ request.updateRequestMetrics(networkThreadTimeNanos)
+ }
+
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
- inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
+ inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 84772db..04f5239 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -31,10 +31,11 @@ import scala.collection.JavaConverters._
/**
* Represents the sensors aggregated per client
+ * @param quotaEntity Quota entity representing <client-id>, <user> or <user, client-id>
* @param quotaSensor @Sensor that tracks the quota
* @param throttleTimeSensor @Sensor that tracks the throttle time
*/
-private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor)
+case class ClientSensors(quotaEntity: QuotaEntity, quotaSensor: Sensor, throttleTimeSensor: Sensor)
/**
* Configuration settings for quota management
@@ -58,6 +59,8 @@ object ClientQuotaManagerConfig {
val DefaultQuotaWindowSizeSeconds = 1
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600
+ val QuotaRequestPercentDefault = Int.MaxValue.toDouble
+ val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
@@ -126,12 +129,12 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String
*
* @param config @ClientQuotaManagerConfig quota configs
* @param metrics @Metrics Metrics instance
- * @param apiKey API Key for the request
+ * @param quotaType Quota type of this quota manager
* @param time @Time object to use
*/
-final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
- private val apiKey: QuotaType,
+ private val quotaType: QuotaType,
private val time: Time) extends Logging {
private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
@@ -140,19 +143,22 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val delayQueue = new DelayQueue[ThrottledResponse]()
private val sensorAccessor = new SensorAccess
val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
- throttledRequestReaper.start()
- private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
+ private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
delayQueueSensor.add(metrics.metricName("queue-size",
- apiKey.toString,
+ quotaType.toString,
"Tracks the size of the delay queue"), new Total())
+ start() // Use start method to keep findbugs happy
+ private def start() {
+ throttledRequestReaper.start()
+ }
/**
* Reaper thread that triggers callbacks on all throttled requests
* @param delayQueue DelayQueue to dequeue from
*/
class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
- "ThrottledRequestReaper-%s".format(apiKey), false) {
+ "ThrottledRequestReaper-%s".format(quotaType), false) {
override def doWork(): Unit = {
val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
@@ -166,17 +172,23 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
/**
- * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.)
- * @param clientId clientId that produced the data
- * @param value amount of data written in bytes
+ * Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
+ * If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
+ * Throttle time calculation may be overridden by sub-classes.
+ * @param sanitizedUser user principal of client
+ * @param clientId clientId that produced/fetched the data
+ * @param value amount of data in bytes or request processing time as a percentage
* @param callback Callback function. This will be triggered immediately if quota is not violated.
* If there is a quota violation, this callback will be triggered after a delay
* @return Number of milliseconds to delay the response in case of Quota violation.
* Zero otherwise
*/
- def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Int, callback: Int => Unit): Int = {
- val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
- val clientSensors = getOrCreateQuotaSensors(clientQuotaEntity)
+ def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
+ val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+ recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
+ }
+
+ def recordAndThrottleOnQuotaViolation(clientSensors: ClientSensors, value: Double, callback: Int => Unit): Int = {
var throttleTimeMs = 0
try {
clientSensors.quotaSensor.record(value)
@@ -185,8 +197,9 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
} catch {
case _: QuotaViolationException =>
// Compute the delay
+ val clientQuotaEntity = clientSensors.quotaEntity
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
- throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
+ throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -197,6 +210,15 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
/**
+ * Records that a user/clientId changed some metric being throttled without checking for
+ * quota violation. The aggregate value will subsequently be used for throttling when the
+ * next request is processed.
+ */
+ def recordNoThrottle(clientSensors: ClientSensors, value: Double) {
+ clientSensors.quotaSensor.record(value, time.milliseconds(), false)
+ }
+
+ /**
* Determines the quota-id for the client with the specified user principal
* and client-id and returns the quota entity that encapsulates the quota-id
* and the associated quota override or default quota.
@@ -325,13 +347,13 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* we need to add a delay of X to W such that O * W / (W + X) = T.
* Solving for X, we get X = (O - T)/T * W.
*/
- private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = {
+ protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
val quota = config.quota()
val difference = clientMetric.value() - quota.bound
// Use the precise window used by the rate calculation
val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
- throttleTimeMs.round.toInt
+ throttleTimeMs.round
}
// Casting to Rate because we only use Rate in Quota computation
@@ -346,39 +368,54 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* This function either returns the sensors for a given client id or creates them if they don't exist
* First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
*/
- private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = {
+ def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = {
+ val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
// Names of the sensors to access
ClientSensors(
+ clientQuotaEntity,
sensorAccessor.getOrCreate(
- getQuotaSensorName(quotaEntity.quotaId),
+ getQuotaSensorName(clientQuotaEntity.quotaId),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
lock, metrics,
- () => clientRateMetricName(quotaEntity.sanitizedUser, quotaEntity.clientId),
- () => getQuotaMetricConfig(quotaEntity.quota),
- () => new Rate()
+ () => clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
+ () => getQuotaMetricConfig(clientQuotaEntity.quota),
+ () => measurableStat
),
- sensorAccessor.getOrCreate(getThrottleTimeSensorName(quotaEntity.quotaId),
+ sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
lock,
metrics,
- () => throttleMetricName(quotaEntity),
+ () => throttleMetricName(clientQuotaEntity),
() => null,
() => new Avg()
)
)
}
- private def getThrottleTimeSensorName(quotaId: QuotaId): String = apiKey + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+ private def measurableStat: MeasurableStat = new Rate()
+
+ private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
- private def getQuotaSensorName(quotaId: QuotaId): String = apiKey + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+ private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
- private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+ protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
.samples(config.numQuotaSamples)
.quota(quota)
}
+ protected def createSensor(sensorName: String, metricName: MetricName): Sensor = {
+ sensorAccessor.getOrCreate(
+ sensorName,
+ ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+ lock, metrics,
+ () => metricName,
+ () => null,
+ () => measurableStat
+ )
+ }
+
/**
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
* for any of these levels.
@@ -409,7 +446,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
quota match {
case Some(newQuota) =>
- logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
+ logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
overriddenQuota.put(quotaId, newQuota)
(sanitizedUser, clientId) match {
case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
@@ -418,7 +455,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case (None, None) =>
}
case None =>
- logger.info(s"Removing ${apiKey} quota for ${userInfo}${clientIdInfo}")
+ logger.info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
overriddenQuota.remove(quotaId)
}
@@ -460,8 +497,8 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
- private def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
- metrics.metricName("byte-rate", apiKey.toString,
+ protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+ metrics.metricName("byte-rate", quotaType.toString,
"Tracking byte-rate per user/client-id",
"user", sanitizedUser,
"client-id", clientId)
@@ -469,7 +506,7 @@ final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
metrics.metricName("throttle-time",
- apiKey.toString,
+ quotaType.toString,
"Tracking average throttle-time per user/client-id",
"user", quotaEntity.sanitizedUser,
"client-id", quotaEntity.clientId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
new file mode 100644
index 0000000..7e80be6
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.utils.Time
+
+
+class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
+ private val metrics: Metrics,
+ private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) {
+ val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
+ val exemptSensor = createSensor(exemptSensorName, exemptMetricName)
+
+ def recordExempt(value: Double) {
+ exemptSensor.record(value)
+ }
+
+ override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
+ math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
+ }
+
+ override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+ metrics.metricName("request-time", QuotaType.Request.toString,
+ "Tracking request-time per user/client-id",
+ "user", sanitizedUser,
+ "client-id", clientId)
+ }
+
+ private def exemptMetricName: MetricName = {
+ metrics.metricName("exempt-request-time", QuotaType.Request.toString,
+ "Tracking exempt-request-time utilization percentage")
+ }
+
+ private def exemptSensorName: String = "exempt-" + QuotaType.Request
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 8d6de8c..2483199 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -130,6 +130,12 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
else
None
quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
+ val requestQuota =
+ if (config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp))
+ Some(new Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble, true))
+ else
+ None
+ quotaManagers.request.updateQuota(sanitizedUser, clientId, requestQuota)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e68f921..175bf62 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -63,19 +63,23 @@ object DynamicConfig {
//Properties
val ProducerByteRateOverrideProp = "producer_byte_rate"
val ConsumerByteRateOverrideProp = "consumer_byte_rate"
+ val RequestPercentageOverrideProp = "request_percentage"
//Defaults
val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+ val DefaultRequestOverride = ClientQuotaManagerConfig.QuotaRequestPercentDefault
//Documentation
val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer traffic."
val ConsumerOverrideDoc = "A rate representing the upper bound (bytes/sec) for consumer traffic."
+ val RequestOverrideDoc = "A percentage representing the upper bound of time spent for processing requests."
//Definitions
private val clientConfigs = new ConfigDef()
.define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
.define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
+ .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
def names = clientConfigs.names
@@ -88,6 +92,7 @@ object DynamicConfig {
private val userConfigs = CredentialProvider.userCredentialConfigs
.define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
.define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
+ .define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
def names = userConfigs.names
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 59f062d..1e1f0d5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -118,24 +118,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} catch {
case e: FatalExitError => throw e
- case e: Throwable =>
- if (request.requestObj != null) {
- request.requestObj.handleError(e, requestChannel, request)
- error("Error when handling request %s".format(request.requestObj), e)
- } else {
- val response = request.body[AbstractRequest].getErrorResponse(e)
-
- /* If request doesn't have a default error response, we just close the connection.
- For example, when produce request has acks set to 0 */
- if (response == null)
- requestChannel.closeConnection(request.processor, request)
- else
- requestChannel.sendResponse(new Response(request, response))
-
- error("Error when handling request %s".format(request.body[AbstractRequest]), e)
- }
- } finally
- request.apiLocalCompleteTimeMs = time.milliseconds
+ case e: Throwable => handleError(request, e)
+ } finally {
+ request.apiLocalCompleteTimeNanos = time.nanoseconds
+ }
}
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
@@ -165,16 +151,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val leaderAndIsrResponse =
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
- new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
- } else {
- val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
- new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
- }
-
- requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
+ val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
+ sendResponseExemptThrottle(request, new Response(request, leaderAndIsrResponse))
+ } else {
+ val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
+ }
} catch {
case e: FatalExitError => throw e
case e: KafkaStorageException =>
@@ -189,27 +174,27 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
- val response =
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
- // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
- // since this broker is no longer a replica for that offsets topic partition.
- // This is required to handle the following scenario :
- // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
- // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
- // is not cleared.
- result.foreach { case (topicPartition, error) =>
- if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
- groupCoordinator.handleGroupEmigration(topicPartition.partition)
- }
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+ // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
+ // since this broker is no longer a replica for that offsets topic partition.
+ // This is required to handle the following scenario :
+ // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
+ // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
+ // is not cleared.
+ result.foreach { case (topicPartition, error) =>
+ if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
+ groupCoordinator.handleGroupEmigration(topicPartition.partition)
}
- new StopReplicaResponse(error, result.asJava)
- } else {
- val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
- new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
}
+ val response = new StopReplicaResponse(error, result.asJava)
+ sendResponseExemptThrottle(request, new Response(request, response))
+ } else {
+ val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
+ }
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
}
@@ -217,23 +202,21 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]
- val updateMetadataResponse =
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
- if (deletedPartitions.nonEmpty)
- groupCoordinator.handleDeletedPartitions(deletedPartitions)
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
+ if (deletedPartitions.nonEmpty)
+ groupCoordinator.handleDeletedPartitions(deletedPartitions)
- if (adminManager.hasDelayedTopicOperations) {
- updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
- adminManager.tryCompleteDelayedTopicOperations(topic)
- }
+ if (adminManager.hasDelayedTopicOperations) {
+ updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
+ adminManager.tryCompleteDelayedTopicOperations(topic)
}
- new UpdateMetadataResponse(Errors.NONE)
- } else {
- new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
}
-
- requestChannel.sendResponse(new Response(request, updateMetadataResponse))
+ sendResponseExemptThrottle(request, new Response(request, new UpdateMetadataResponse(Errors.NONE)))
+ } else {
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
+ }
}
def handleControlledShutdownRequest(request: RequestChannel.Request) {
@@ -249,9 +232,9 @@ class KafkaApis(val requestChannel: RequestChannel,
case Success(partitionsRemaining) =>
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
Errors.NONE, partitionsRemaining)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
+ sendResponseExemptThrottle(request, new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
case Failure(throwable) =>
- controlledShutdownRequest.handleError(throwable, requestChannel, request)
+ sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
}
}
controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback)
@@ -270,8 +253,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
(topicPartition, error)
}.toMap
- val response = new OffsetCommitResponse(results.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, results.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
case (topicPartition, _) =>
@@ -300,8 +283,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"on partition $topicPartition failed due to ${error.exceptionName}")
}
}
- val response = new OffsetCommitResponse(combinedCommitStatus.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
if (authorizedTopics.isEmpty)
@@ -407,7 +390,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def produceResponseCallback(delayTimeMs: Int) {
+ def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
@@ -426,13 +409,15 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.noOperation(request.processor, request)
}
} else {
- val respBody = new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs)
- requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+ def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
+ new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
+ }
+ sendResponseMaybeThrottle(request, createResponseCallback)
}
}
// When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = time.milliseconds
+ request.apiRemoteCompleteTimeNanos = time.nanoseconds
quotas.produce.recordAndMaybeThrottle(
request.session.sanitizedUser,
@@ -534,20 +519,29 @@ class KafkaApis(val requestChannel: RequestChannel,
val response = new FetchResponse(fetchedPartitionData, 0)
val responseStruct = response.toStruct(versionId)
- def fetchResponseCallback(throttleTimeMs: Int) {
- trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
- val responseSend = response.toSend(responseStruct, throttleTimeMs, request.connectionId, request.header)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+ def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
+ def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
+ trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
+ val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
+ new RequestChannel.Response(request, responseSend)
+ }
+ def sendResponseCallback(requestThrottleTimeMs: Int) {
+ requestChannel.sendResponse(createResponse(requestThrottleTimeMs))
+ }
+ if (fetchRequest.isFromFollower)
+ sendResponseExemptThrottle(request, createResponse(0))
+ else
+ sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
}
// When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = time.milliseconds
+ request.apiRemoteCompleteTimeNanos = time.nanoseconds
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
quotas.leader.record(responseSize)
- fetchResponseCallback(throttleTimeMs = 0)
+ fetchResponseCallback(bandwidthThrottleTimeMs = 0)
} else {
quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
fetchResponseCallback)
@@ -597,8 +591,8 @@ class KafkaApis(val requestChannel: RequestChannel,
else
handleListOffsetRequestV1(request)
- val response = new ListOffsetResponse(mergedResponseMap.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
@@ -925,13 +919,15 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
- val responseBody = new MetadataResponse(
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new MetadataResponse(
+ throttleTimeMs,
brokers.map(_.getNode(request.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava
)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+
+ sendResponseMaybeThrottle(request, createResponse)
}
/**
@@ -944,68 +940,70 @@ class KafkaApis(val requestChannel: RequestChannel,
def authorizeTopicDescribe(partition: TopicPartition) =
authorize(request.session, Describe, new Resource(Topic, partition.topic))
- val offsetFetchResponse =
- // reject the request if not authorized to the group
- if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
- offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
- else {
- if (header.apiVersion == 0) {
- val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
- .partition(authorizeTopicDescribe)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val offsetFetchResponse =
+ // reject the request if not authorized to the group
+ if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
+ offsetFetchRequest.getErrorResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+ else {
+ if (header.apiVersion == 0) {
+ val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+ .partition(authorizeTopicDescribe)
- // version 0 reads offsets from ZK
- val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
- val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
- try {
- if (!metadataCache.contains(topicPartition.topic))
- (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
- else {
- val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
- payloadOpt match {
- case Some(payload) =>
- (topicPartition, new OffsetFetchResponse.PartitionData(
- payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
- case None =>
- (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+ // version 0 reads offsets from ZK
+ val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
+ val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
+ try {
+ if (!metadataCache.contains(topicPartition.topic))
+ (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+ else {
+ val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+ payloadOpt match {
+ case Some(payload) =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(
+ payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
+ case None =>
+ (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+ }
}
+ } catch {
+ case e: Throwable =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(
+ OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
}
- } catch {
- case e: Throwable =>
- (topicPartition, new OffsetFetchResponse.PartitionData(
- OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
- }
- }.toMap
+ }.toMap
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
- new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
- } else {
- // versions 1 and above read offsets from Kafka
- if (offsetFetchRequest.isAllPartitions) {
- val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
- if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(error)
- else {
- // clients are not allowed to see offsets for topics that are not authorized for Describe
- val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
- new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava)
- }
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+ new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
} else {
- val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
- .partition(authorizeTopicDescribe)
- val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
- Some(authorizedPartitions))
- if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(error)
- else {
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
- new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+ // versions 1 and above read offsets from Kafka
+ if (offsetFetchRequest.isAllPartitions) {
+ val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
+ if (error != Errors.NONE)
+ offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+ else {
+ // clients are not allowed to see offsets for topics that are not authorized for Describe
+ val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
+ new OffsetFetchResponse(throttleTimeMs, Errors.NONE, authorizedPartitionData.asJava)
+ }
+ } else {
+ val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
+ .partition(authorizeTopicDescribe)
+ val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
+ Some(authorizedPartitions))
+ if (error != Errors.NONE)
+ offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+ else {
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+ new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+ }
}
}
}
+ trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
+ offsetFetchResponse
}
-
- trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
- requestChannel.sendResponse(new Response(request, offsetFetchResponse))
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleFindCoordinatorRequest(request: RequestChannel.Request) {
@@ -1014,8 +1012,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
!authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
- val responseBody = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
// TODO: Authorize by transactional id if coordinator type is TRANSACTION
@@ -1035,24 +1033,26 @@ class KafkaApis(val requestChannel: RequestChannel,
throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
}
- val responseBody = if (topicMetadata.error != Errors.NONE) {
- new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
- } else {
- val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
- .find(_.partition == partition)
- .map(_.leader())
-
- coordinatorEndpoint match {
- case Some(endpoint) if !endpoint.isEmpty =>
- new FindCoordinatorResponse(Errors.NONE, endpoint)
- case _ =>
- new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = if (topicMetadata.error != Errors.NONE) {
+ new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ } else {
+ val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
+ .find(_.partition == partition)
+ .map(_.leader())
+
+ coordinatorEndpoint match {
+ case Some(endpoint) if !endpoint.isEmpty =>
+ new FindCoordinatorResponse(throttleTimeMs, Errors.NONE, endpoint)
+ case _ =>
+ new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ }
}
+ trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+ .format(responseBody, request.header.correlationId, request.header.clientId))
+ responseBody
}
-
- trace("Sending FindCoordinator response %s for correlation id %d to client %s."
- .format(responseBody, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ sendResponseMaybeThrottle(request, createResponse)
}
}
@@ -1074,19 +1074,20 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}.toMap
- val responseBody = new DescribeGroupsResponse(groups.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new DescribeGroupsResponse(throttleTimeMs, groups.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleListGroupsRequest(request: RequestChannel.Request) {
- val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
- ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
+ if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+ def createResponse(throttleTimeMs: Int): AbstractResponse = ListGroupsResponse.fromError(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
val (error, groups) = groupCoordinator.handleListGroups()
val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
- new ListGroupsResponse(error, allGroups.asJava)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new ListGroupsResponse(throttleTimeMs, error, allGroups.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -1095,23 +1096,27 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
- val responseBody = new JoinGroupResponse(joinResult.error, joinResult.generationId,
- joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = new JoinGroupResponse(throttleTimeMs, joinResult.error, joinResult.generationId,
+ joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
- trace("Sending join group response %s for correlation id %d to client %s."
- .format(responseBody, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ trace("Sending join group response %s for correlation id %d to client %s."
+ .format(responseBody, request.header.correlationId, request.header.clientId))
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
- val responseBody = new JoinGroupResponse(
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new JoinGroupResponse(
+ throttleTimeMs,
Errors.GROUP_AUTHORIZATION_FAILED,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
Collections.emptyMap())
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ sendResponseMaybeThrottle(request, createResponse)
} else {
// let the coordinator to handle join-group
val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
@@ -1133,8 +1138,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val syncGroupRequest = request.body[SyncGroupRequest]
def sendResponseCallback(memberState: Array[Byte], error: Errors) {
- val responseBody = new SyncGroupResponse(error, ByteBuffer.wrap(memberState))
- requestChannel.sendResponse(new Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new SyncGroupResponse(throttleTimeMs, error, ByteBuffer.wrap(memberState))
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
@@ -1155,15 +1160,18 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a heartbeat response
def sendResponseCallback(error: Errors) {
- val response = new HeartbeatResponse(error)
- trace("Sending heartbeat response %s for correlation id %d to client %s."
- .format(response, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val response = new HeartbeatResponse(throttleTimeMs, error)
+ trace("Sending heartbeat response %s for correlation id %d to client %s."
+ .format(response, request.header.correlationId, request.header.clientId))
+ response
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
- val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED)
- requestChannel.sendResponse(new Response(request, heartbeatResponse))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new HeartbeatResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
}
else {
// let the coordinator to handle heartbeat
@@ -1180,15 +1188,18 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a leave-group response
def sendResponseCallback(error: Errors) {
- val response = new LeaveGroupResponse(error)
- trace("Sending leave group response %s for correlation id %d to client %s."
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val response = new LeaveGroupResponse(throttleTimeMs, error)
+ trace("Sending leave group response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ response
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
- val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED)
- requestChannel.sendResponse(new Response(request, leaveGroupResponse))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaveGroupResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+ sendResponseMaybeThrottle(request, createResponse)
} else {
// let the coordinator to handle leave-group
groupCoordinator.handleLeaveGroup(
@@ -1199,8 +1210,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleSaslHandshakeRequest(request: RequestChannel.Request) {
- val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
- requestChannel.sendResponse(new RequestChannel.Response(request, response))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -1210,20 +1221,26 @@ class KafkaApis(val requestChannel: RequestChannel,
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
- val responseSend =
- if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
- ApiVersionsResponse.API_VERSIONS_RESPONSE.toSend(request.connectionId, request.header)
- else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+ def sendResponseCallback(throttleTimeMs: Int) {
+ val responseSend =
+ if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
+ ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, throttleTimeMs).toSend(request.connectionId, request.header)
+ else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
+ requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+ }
+ sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
}
def handleCreateTopicsRequest(request: RequestChannel.Request) {
val createTopicsRequest = request.body[CreateTopicsRequest]
def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
- val responseBody = new CreateTopicsResponse(results.asJava)
- trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava)
+ trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!controller.isActive) {
@@ -1279,11 +1296,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def sendResponseCallback(results: Map[String, Errors]): Unit = {
- val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
- unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
- val responseBody = new DeleteTopicsResponse(completeResults.asJava)
- trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
+ unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
+ val responseBody = new DeleteTopicsResponse(throttleTimeMs, completeResults.asJava)
+ trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
if (!controller.isActive) {
@@ -1335,11 +1355,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
- requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
-
- // When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = time.milliseconds
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new DeleteRecordsResponse(throttleTimeMs, mergedResponseStatus.asJava)
+ sendResponseMaybeThrottle(request, createResponse)
}
if (authorizedForDeleteTopics.isEmpty)
@@ -1359,9 +1376,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// Send response callback
def sendResponseCallback(result: InitPidResult): Unit = {
- val responseBody: InitPidResponse = new InitPidResponse(result.error, result.pid, result.epoch)
- trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch)
+ trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback)
}
@@ -1370,9 +1390,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val endTxnRequest = request.body[EndTxnRequest]
def sendResponseCallback(error: Errors) {
- val responseBody = new EndTxnResponse(error)
- trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody = new EndTxnResponse(throttleTimeMs, error)
+ trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId(),
@@ -1383,11 +1406,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
+ authorizeClusterAction(request)
val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
- requestChannel.sendResponse(new RequestChannel.Response(request, new WriteTxnMarkersResponse(emptyResponse)))
+ val responseBody = new WriteTxnMarkersResponse(emptyResponse)
+ sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
}
-
def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {
val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
val transactionalId = addPartitionsToTxnRequest.transactionalId
@@ -1395,9 +1419,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// Send response callback
def sendResponseCallback(error: Errors): Unit = {
- val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(error)
- trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, error)
+ trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1415,9 +1442,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// Send response callback
def sendResponseCallback(error: Errors): Unit = {
- val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(error)
- trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
+ trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
+ responseBody
+ }
+ sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
@@ -1429,7 +1459,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
val emptyResponse = new java.util.HashMap[TopicPartition, Errors]()
- requestChannel.sendResponse(new RequestChannel.Response(request, new TxnOffsetCommitResponse(emptyResponse)))
+ def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, emptyResponse)
+ sendResponseMaybeThrottle(request, createResponse)
}
def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
@@ -1440,11 +1471,93 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseBody = new OffsetsForLeaderEpochResponse(
replicaManager.getResponseFor(requestInfo)
)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
+ }
+
+ private def handleError(request: RequestChannel.Request, e: Throwable) {
+ val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
+ if (request.requestObj != null) {
+ def sendResponseCallback(throttleTimeMs: Int) {
+ request.requestObj.handleError(e, requestChannel, request)
+ error("Error when handling request %s".format(request.requestObj), e)
+ }
+
+ if (mayThrottle) {
+ val clientId : String =
+ if (request.requestObj.isInstanceOf[ControlledShutdownRequest])
+ request.requestObj.asInstanceOf[ControlledShutdownRequest].clientId.getOrElse("")
+ else
+ throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest")
+ sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
+ } else
+ sendResponseExemptThrottle(request, () => sendResponseCallback(0))
+ } else {
+ def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ val response = request.body[AbstractRequest].getErrorResponse(throttleTimeMs, e)
+
+ /* If request doesn't have a default error response, we just close the connection.
+ For example, when produce request has acks set to 0 */
+ if (response == null)
+ requestChannel.closeConnection(request.processor, request)
+ response
+ }
+ error("Error when handling request %s".format(request.body[AbstractRequest]), e)
+ if (mayThrottle)
+ sendResponseMaybeThrottle(request, createResponse)
+ else
+ sendResponseExemptThrottle(request, new RequestChannel.Response(request, createResponse(0)))
+ }
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
+
+ private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) {
+ def sendResponseCallback(throttleTimeMs: Int) {
+ val response = createResponse(throttleTimeMs)
+ if (response != null)
+ sendResponse(request, response)
+ }
+ sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+ }
+
+ private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) {
+
+ if (request.apiRemoteCompleteTimeNanos == -1) {
+ // When this callback is triggered, the remote API call has completed
+ request.apiRemoteCompleteTimeNanos = time.nanoseconds
+ }
+ val quotaSensors = quotas.request.getOrCreateQuotaSensors(request.session.sanitizedUser, clientId)
+ def recordNetworkThreadTimeNanos(timeNanos: Long) {
+ quotas.request.recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))
+ }
+ request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+ quotas.request.recordAndThrottleOnQuotaViolation(
+ quotaSensors,
+ nanosToPercentage(request.requestThreadTimeNanos),
+ sendResponseCallback)
+ }
+
+ private def sendResponseExemptThrottle(request: RequestChannel.Request, response: Response) {
+ sendResponseExemptThrottle(request, () => requestChannel.sendResponse(response))
+ }
+
+ private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) {
+ def recordNetworkThreadTimeNanos(timeNanos: Long) {
+ quotas.request.recordExempt(nanosToPercentage(timeNanos))
+ }
+ request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
+
+ quotas.request.recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
+ sendResponseCallback()
+ }
+
+ private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
+ requestChannel.sendResponse(new Response(request, response))
+ }
+
+ private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index c9c31ad..a1600cb 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -50,7 +50,10 @@ class KafkaRequestHandler(id: Int,
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
req = requestChannel.receiveRequest(300)
- val idleTime = time.nanoseconds - startSelectTime
+ val endTime = time.nanoseconds
+ if (req != null)
+ req.requestDequeueTimeNanos = endTime
+ val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
@@ -59,7 +62,6 @@ class KafkaRequestHandler(id: Int,
latch.countDown()
return
}
- req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 671ad63..dee39a3 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time
object QuotaType {
case object Fetch extends QuotaType
case object Produce extends QuotaType
+ case object Request extends QuotaType
case object LeaderReplication extends QuotaType
case object FollowerReplication extends QuotaType
}
@@ -36,10 +37,11 @@ object QuotaFactory {
override def isQuotaExceeded(): Boolean = false
}
- case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
+ case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, request: ClientRequestQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
def shutdown() {
fetch.shutdown
produce.shutdown
+ request.shutdown
}
}
@@ -47,6 +49,7 @@ object QuotaFactory {
QuotaManagers(
new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time),
new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
+ new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time)
)
@@ -66,6 +69,13 @@ object QuotaFactory {
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
+ def clientRequestConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
+ ClientQuotaManagerConfig(
+ numQuotaSamples = cfg.numQuotaSamples,
+ quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+ )
+ }
+
def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig =
ReplicationQuotaManagerConfig(
numQuotaSamples = cfg.numReplicationQuotaSamples,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index d9822cd..ff9fef0 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -206,7 +206,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
val hostStr = s"${node.host}:${node.port}"
assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
val brokerVersionInfo = tryBrokerVersionInfo.get
- assertEquals(0, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+ assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index aa1717a..f21c1df 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -26,13 +26,15 @@ import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import kafka.server.QuotaType
+import org.apache.kafka.common.metrics.KafkaMetric
abstract class BaseQuotaTest extends IntegrationTestHarness {
def userPrincipal : String
def producerQuotaId : QuotaId
def consumerQuotaId : QuotaId
- def overrideQuotas(producerQuota: Long, consumerQuota: Long)
+ def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
def removeQuotaOverrides()
override val serverCount = 2
@@ -55,10 +57,13 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+ this.consumerConfig.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "0")
+ this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
// Low enough quota that a producer sending a small payload in a tight loop should get throttled
val defaultProducerQuota = 8000
val defaultConsumerQuota = 2500
+ val defaultRequestQuota = Int.MaxValue
var leaderNode: KafkaServer = null
var followerNode: KafkaServer = null
@@ -99,8 +104,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, Long.MaxValue.toString)
props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, Long.MaxValue.toString)
- overrideQuotas(Long.MaxValue, Long.MaxValue)
- waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+ overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+ waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
val numRecords = 1000
assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -114,8 +119,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
@Test
def testQuotaOverrideDelete() {
// Override producer and consumer quotas to unlimited
- overrideQuotas(Long.MaxValue, Long.MaxValue)
- waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+ overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+ waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
val numRecords = 1000
assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
@@ -136,6 +141,28 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
}
+ @Test
+ def testThrottledRequest() {
+
+ overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
+ waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
+
+ val consumer = consumers.head
+ consumer.subscribe(Collections.singleton(topic1))
+ val endTimeMs = System.currentTimeMillis + 10000
+ var throttled = false
+ while (!throttled && System.currentTimeMillis < endTimeMs) {
+ consumer.poll(100)
+ val throttleMetric = consumerRequestThrottleMetric
+ throttled = throttleMetric != null && throttleMetric.value > 0
+ }
+
+ assertTrue("Should have been throttled", throttled)
+
+ assertNotNull("Exempt requests not recorded", exemptRequestMetric)
+ assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
+ }
+
def produceUntilThrottled(p: KafkaProducer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
var numProduced = 0
var throttled = false
@@ -169,31 +196,47 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
numConsumed
}
- def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) {
+ def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
TestUtils.retry(10000) {
val quotaManagers = leaderNode.apis.quotas
val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId)
val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId)
+ val overrideProducerRequestQuota = quotaManagers.request.quota(userPrincipal, producerClientId)
+ val overrideConsumerRequestQuota = quotaManagers.request.quota(userPrincipal, consumerClientId)
assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+ assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
+ assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
}
}
- private def throttleMetricName(apiKey: ApiKeys, quotaId: QuotaId): MetricName = {
+ private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
leaderNode.metrics.metricName("throttle-time",
- apiKey.name,
+ quotaType.toString,
"Tracking throttle-time per user/client-id",
"user", quotaId.sanitizedUser.getOrElse(""),
"client-id", quotaId.clientId.getOrElse(""))
}
- private def producerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId))
- private def consumerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.FETCH, consumerQuotaId))
- def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
+ def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {
+ leaderNode.metrics.metrics.get(throttleMetricName(quotaType, quotaId))
+ }
+
+ private def producerThrottleMetric = throttleMetric(QuotaType.Produce, producerQuotaId)
+ private def consumerThrottleMetric = throttleMetric(QuotaType.Fetch, consumerQuotaId)
+ private def consumerRequestThrottleMetric = throttleMetric(QuotaType.Request, consumerQuotaId)
+
+ private def exemptRequestMetric: KafkaMetric = {
+ val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+ leaderNode.metrics.metrics.get(metricName)
+ }
+
+ def quotaProperties(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Properties = {
val props = new Properties()
props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+ props.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
props
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index d71713f..f8594e1 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -33,14 +33,15 @@ class ClientIdQuotaTest extends BaseQuotaTest {
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
super.setUp()
}
-
- override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+ override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
val producerProps = new Properties()
producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+ producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(producerClientId, producerProps)
val consumerProps = new Properties()
consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+ consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(consumerClientId, consumerProps)
}
override def removeQuotaOverrides() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 82b109d..333c851 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -39,18 +39,20 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp()
- val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota)
+ val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
- waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+ waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
- override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+ override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
val producerProps = new Properties()
producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+ producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(userPrincipal, producerClientId, producerProps)
val consumerProps = new Properties()
consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+ consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
}
[4/4] kafka git commit: KAFKA-4954; Request handler utilization quotas
Posted by ju...@apache.org.
KAFKA-4954; Request handler utilization quotas
See KIP-124 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+Request+rate+quotas) for details
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #2744 from rajinisivaram/KAFKA-4954
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0104b657
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0104b657
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0104b657
Branch: refs/heads/trunk
Commit: 0104b657a154fb15e716d872a0e6084f9da650bf
Parents: 6185bc0
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon May 1 09:13:31 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon May 1 09:13:31 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 30 +-
.../org/apache/kafka/common/metrics/Sensor.java | 9 +-
.../kafka/common/network/KafkaChannel.java | 22 +-
.../apache/kafka/common/network/Selector.java | 23 +
.../apache/kafka/common/protocol/ApiKeys.java | 86 +--
.../apache/kafka/common/protocol/Protocol.java | 212 ++++++--
.../kafka/common/requests/AbstractRequest.java | 9 +-
.../kafka/common/requests/AbstractResponse.java | 1 +
.../common/requests/AddOffsetsToTxnRequest.java | 4 +-
.../requests/AddOffsetsToTxnResponse.java | 11 +-
.../requests/AddPartitionsToTxnRequest.java | 4 +-
.../requests/AddPartitionsToTxnResponse.java | 11 +-
.../common/requests/ApiVersionsRequest.java | 8 +-
.../common/requests/ApiVersionsResponse.java | 31 +-
.../requests/ControlledShutdownRequest.java | 2 +-
.../common/requests/CreateTopicsRequest.java | 4 +-
.../common/requests/CreateTopicsResponse.java | 14 +
.../common/requests/DeleteRecordsRequest.java | 4 +-
.../common/requests/DeleteRecordsResponse.java | 12 +-
.../common/requests/DeleteTopicsRequest.java | 4 +-
.../common/requests/DeleteTopicsResponse.java | 14 +
.../common/requests/DescribeGroupsRequest.java | 4 +-
.../common/requests/DescribeGroupsResponse.java | 20 +-
.../kafka/common/requests/EndTxnRequest.java | 4 +-
.../kafka/common/requests/EndTxnResponse.java | 11 +-
.../kafka/common/requests/FetchRequest.java | 4 +-
.../common/requests/FindCoordinatorRequest.java | 5 +-
.../requests/FindCoordinatorResponse.java | 14 +
.../kafka/common/requests/HeartbeatRequest.java | 4 +-
.../common/requests/HeartbeatResponse.java | 14 +
.../kafka/common/requests/InitPidRequest.java | 4 +-
.../kafka/common/requests/InitPidResponse.java | 15 +-
.../kafka/common/requests/JoinGroupRequest.java | 11 +-
.../common/requests/JoinGroupResponse.java | 20 +
.../common/requests/LeaderAndIsrRequest.java | 2 +-
.../common/requests/LeaveGroupRequest.java | 4 +-
.../common/requests/LeaveGroupResponse.java | 14 +
.../common/requests/ListGroupsRequest.java | 4 +-
.../common/requests/ListGroupsResponse.java | 20 +-
.../common/requests/ListOffsetRequest.java | 6 +-
.../common/requests/ListOffsetResponse.java | 16 +-
.../kafka/common/requests/MetadataRequest.java | 4 +-
.../kafka/common/requests/MetadataResponse.java | 14 +
.../common/requests/OffsetCommitRequest.java | 5 +-
.../common/requests/OffsetCommitResponse.java | 14 +
.../common/requests/OffsetFetchRequest.java | 10 +-
.../common/requests/OffsetFetchResponse.java | 22 +-
.../requests/OffsetsForLeaderEpochRequest.java | 2 +-
.../kafka/common/requests/ProduceRequest.java | 4 +-
.../kafka/common/requests/ProduceResponse.java | 1 -
.../common/requests/SaslHandshakeRequest.java | 25 +-
.../common/requests/StopReplicaRequest.java | 2 +-
.../kafka/common/requests/SyncGroupRequest.java | 7 +-
.../common/requests/SyncGroupResponse.java | 14 +
.../common/requests/TxnOffsetCommitRequest.java | 4 +-
.../requests/TxnOffsetCommitResponse.java | 11 +-
.../common/requests/UpdateMetadataRequest.java | 2 +-
.../common/requests/WriteTxnMarkersRequest.java | 2 +-
.../apache/kafka/clients/NetworkClientTest.java | 3 +-
.../clients/producer/internals/SenderTest.java | 2 +-
.../internals/TransactionManagerTest.java | 10 +-
.../common/network/SslTransportLayerTest.java | 39 ++
.../kafka/common/protocol/ApiKeysTest.java | 31 ++
.../common/requests/RequestResponseTest.java | 10 +-
.../scala/kafka/network/RequestChannel.scala | 82 ++-
.../main/scala/kafka/network/SocketServer.scala | 31 +-
.../scala/kafka/server/ClientQuotaManager.scala | 101 ++--
.../server/ClientRequestQuotaManager.scala | 54 ++
.../main/scala/kafka/server/ConfigHandler.scala | 6 +
.../main/scala/kafka/server/DynamicConfig.scala | 5 +
.../src/main/scala/kafka/server/KafkaApis.scala | 539 +++++++++++--------
.../kafka/server/KafkaRequestHandler.scala | 6 +-
.../main/scala/kafka/server/QuotaFactory.scala | 12 +-
.../integration/kafka/api/AdminClientTest.scala | 2 +-
.../integration/kafka/api/BaseQuotaTest.scala | 65 ++-
.../kafka/api/ClientIdQuotaTest.scala | 5 +-
.../kafka/api/UserClientIdQuotaTest.scala | 8 +-
.../integration/kafka/api/UserQuotaTest.scala | 8 +-
.../kafka/server/ApiVersionsRequestTest.scala | 6 +-
.../unit/kafka/server/BaseRequestTest.scala | 2 +-
.../kafka/server/ClientQuotaManagerTest.scala | 62 +++
.../unit/kafka/server/RequestQuotaTest.scala | 420 +++++++++++++++
82 files changed, 1879 insertions(+), 484 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7bd0311..df9e2fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -42,13 +42,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -100,7 +99,7 @@ public class NetworkClient implements KafkaClient {
private final ApiVersions apiVersions;
- private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>();
+ private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
private final List<ClientResponse> abortedSends = new LinkedList<>();
@@ -471,7 +470,7 @@ public class NetworkClient implements KafkaClient {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
// Always expect the response version id to be the same as the request version id
ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
- Struct responseBody = apiKey.responseSchema(requestHeader.apiVersion()).read(responseBuffer);
+ Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer);
correlate(requestHeader, responseHeader);
return AbstractResponse.getResponse(apiKey, responseBody);
}
@@ -564,10 +563,14 @@ public class NetworkClient implements KafkaClient {
InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
final String node = req.destination;
if (apiVersionsResponse.error() != Errors.NONE) {
- log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.",
- node, apiVersionsResponse.error());
- this.selector.close(node);
- processDisconnection(responses, node, now);
+ if (req.request.version() == 0 || apiVersionsResponse.error() != Errors.UNSUPPORTED_VERSION) {
+ log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.",
+ node, apiVersionsResponse.error());
+ this.selector.close(node);
+ processDisconnection(responses, node, now);
+ } else {
+ nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder((short) 0));
+ }
return;
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
@@ -605,7 +608,7 @@ public class NetworkClient implements KafkaClient {
// connection.
if (discoverBrokerVersions) {
this.connectionStates.checkingApiVersions(node);
- nodesNeedingApiVersionsFetch.add(node);
+ nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
log.debug("Completed connection to node {}. Fetching API versions.", node);
} else {
this.connectionStates.ready(node);
@@ -615,13 +618,14 @@ public class NetworkClient implements KafkaClient {
}
private void handleInitiateApiVersionRequests(long now) {
- Iterator<String> iter = nodesNeedingApiVersionsFetch.iterator();
+ Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
- String node = iter.next();
+ Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
+ String node = entry.getKey();
if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
- ApiVersionsRequest.Builder apiVersionRequest = new ApiVersionsRequest.Builder();
- ClientRequest clientRequest = newClientRequest(node, apiVersionRequest, now, true);
+ ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
+ ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5ca9fce..ae331e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -168,16 +168,21 @@ public final class Sensor {
* bound
*/
public void record(double value, long timeMs) {
+ record(value, timeMs, true);
+ }
+
+ public void record(double value, long timeMs, boolean checkQuotas) {
if (shouldRecord()) {
this.lastRecordTime = timeMs;
synchronized (this) {
// increment all the stats
for (Stat stat : this.stats)
stat.record(config, value, timeMs);
- checkQuotas(timeMs);
+ if (checkQuotas)
+ checkQuotas(timeMs);
}
for (Sensor parent : parents)
- parent.record(value, timeMs);
+ parent.record(value, timeMs, checkQuotas);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index f1bf86c..ea03ff0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -31,6 +31,9 @@ public class KafkaChannel {
private final String id;
private final TransportLayer transportLayer;
private final Authenticator authenticator;
+ // Tracks accumulated network thread time. This is updated on the network thread.
+ // The values are read and reset after each response is sent.
+ private long networkThreadTimeNanos;
private final int maxReceiveSize;
private NetworkReceive receive;
private Send send;
@@ -43,6 +46,7 @@ public class KafkaChannel {
this.id = id;
this.transportLayer = transportLayer;
this.authenticator = authenticator;
+ this.networkThreadTimeNanos = 0L;
this.maxReceiveSize = maxReceiveSize;
this.disconnected = false;
this.muted = false;
@@ -164,6 +168,23 @@ public class KafkaChannel {
return result;
}
+ /**
+ * Accumulates network thread time for this channel.
+ */
+ public void addNetworkThreadTimeNanos(long nanos) {
+ networkThreadTimeNanos += nanos;
+ }
+
+ /**
+ * Returns accumulated network thread time for this channel and resets
+ * the value to zero.
+ */
+ public long getAndResetNetworkThreadTimeNanos() {
+ long current = networkThreadTimeNanos;
+ networkThreadTimeNanos = 0;
+ return current;
+ }
+
private long receive(NetworkReceive receive) throws IOException {
return receive.readFrom(transportLayer);
}
@@ -175,5 +196,4 @@ public class KafkaChannel {
return send.completed();
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index fd3ab47..8dd3ad6 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -101,6 +101,7 @@ public class Selector implements Selectable {
private final ChannelBuilder channelBuilder;
private final int maxReceiveSize;
private final boolean metricsPerConnection;
+ private final boolean recordTimePerConnection;
private final IdleExpiryManager idleExpiryManager;
/**
@@ -122,6 +123,7 @@ public class Selector implements Selectable {
String metricGrpPrefix,
Map<String, String> metricTags,
boolean metricsPerConnection,
+ boolean recordTimePerConnection,
ChannelBuilder channelBuilder) {
try {
this.nioSelector = java.nio.channels.Selector.open();
@@ -144,9 +146,21 @@ public class Selector implements Selectable {
this.sensors = new SelectorMetrics(metrics);
this.channelBuilder = channelBuilder;
this.metricsPerConnection = metricsPerConnection;
+ this.recordTimePerConnection = recordTimePerConnection;
this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
}
+ public Selector(int maxReceiveSize,
+ long connectionMaxIdleMs,
+ Metrics metrics,
+ Time time,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ boolean metricsPerConnection,
+ ChannelBuilder channelBuilder) {
+ this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder);
+ }
+
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
}
@@ -326,6 +340,7 @@ public class Selector implements Selectable {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
+ long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
@@ -380,10 +395,18 @@ public class Selector implements Selectable {
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, true);
+ } finally {
+ maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
}
+ // Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy)
+ private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
+ if (recordTimePerConnection)
+ channel.addNetworkThreadTimeNanos(time.nanoseconds() - startTimeNanos);
+ }
+
@Override
public List<Send> completedSends() {
return this.completedSends;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 9e7ce1d..b98a33e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -25,35 +26,43 @@ import java.nio.ByteBuffer;
* Identifiers for all the Kafka APIs
*/
public enum ApiKeys {
- PRODUCE(0, "Produce"),
- FETCH(1, "Fetch"),
- LIST_OFFSETS(2, "Offsets"),
- METADATA(3, "Metadata"),
- LEADER_AND_ISR(4, "LeaderAndIsr"),
- STOP_REPLICA(5, "StopReplica"),
- UPDATE_METADATA_KEY(6, "UpdateMetadata"),
- CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
- OFFSET_COMMIT(8, "OffsetCommit"),
- OFFSET_FETCH(9, "OffsetFetch"),
- FIND_COORDINATOR(10, "FindCoordinator"),
- JOIN_GROUP(11, "JoinGroup"),
- HEARTBEAT(12, "Heartbeat"),
- LEAVE_GROUP(13, "LeaveGroup"),
- SYNC_GROUP(14, "SyncGroup"),
- DESCRIBE_GROUPS(15, "DescribeGroups"),
- LIST_GROUPS(16, "ListGroups"),
- SASL_HANDSHAKE(17, "SaslHandshake"),
- API_VERSIONS(18, "ApiVersions"),
- CREATE_TOPICS(19, "CreateTopics"),
- DELETE_TOPICS(20, "DeleteTopics"),
- DELETE_RECORDS(21, "DeleteRecords"),
- INIT_PRODUCER_ID(22, "InitProducerId"),
- OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch"),
- ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn"),
- ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn"),
- END_TXN(26, "EndTxn"),
- WRITE_TXN_MARKERS(27, "WriteTxnMarkers"),
- TXN_OFFSET_COMMIT(28, "TxnOffsetCommit");
+ PRODUCE(0, "Produce", false),
+ FETCH(1, "Fetch", false),
+ LIST_OFFSETS(2, "Offsets", false),
+ METADATA(3, "Metadata", false),
+ LEADER_AND_ISR(4, "LeaderAndIsr", true),
+ STOP_REPLICA(5, "StopReplica", true),
+ UPDATE_METADATA_KEY(6, "UpdateMetadata", true),
+ CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true),
+ OFFSET_COMMIT(8, "OffsetCommit", false),
+ OFFSET_FETCH(9, "OffsetFetch", false),
+ FIND_COORDINATOR(10, "FindCoordinator", false),
+ JOIN_GROUP(11, "JoinGroup", false),
+ HEARTBEAT(12, "Heartbeat", false),
+ LEAVE_GROUP(13, "LeaveGroup", false),
+ SYNC_GROUP(14, "SyncGroup", false),
+ DESCRIBE_GROUPS(15, "DescribeGroups", false),
+ LIST_GROUPS(16, "ListGroups", false),
+ SASL_HANDSHAKE(17, "SaslHandshake", false),
+ API_VERSIONS(18, "ApiVersions", false) {
+ @Override
+ public Struct parseResponse(short version, ByteBuffer buffer) {
+ // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
+ // using a version higher than that supported by the broker, a version 0 response is sent
+ // to the client indicating UNSUPPORTED_VERSION.
+ return parseResponse(version, buffer, (short) 0);
+ }
+ },
+ CREATE_TOPICS(19, "CreateTopics", false),
+ DELETE_TOPICS(20, "DeleteTopics", false),
+ DELETE_RECORDS(21, "DeleteRecords", false),
+ INIT_PRODUCER_ID(22, "InitProducerId", false),
+ OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true),
+ ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false),
+ ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false),
+ END_TXN(26, "EndTxn", false),
+ WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true),
+ TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
@@ -76,11 +85,15 @@ public enum ApiKeys {
/** an english description of the api--this is for debugging and can change */
public final String name;
- ApiKeys(int id, String name) {
+ /** indicates if this is a ClusterAction request used only by brokers */
+ public final boolean clusterAction;
+
+ ApiKeys(int id, String name, boolean clusterAction) {
if (id < 0)
throw new IllegalArgumentException("id must not be negative, id: " + id);
this.id = (short) id;
this.name = name;
+ this.clusterAction = clusterAction;
}
public static ApiKeys forId(int id) {
@@ -122,6 +135,19 @@ public enum ApiKeys {
return responseSchema(version).read(buffer);
}
+ protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
+ int bufferPosition = buffer.position();
+ try {
+ return responseSchema(version).read(buffer);
+ } catch (SchemaException e) {
+ if (version != fallbackVersion) {
+ buffer.position(bufferPosition);
+ return responseSchema(fallbackVersion).read(buffer);
+ } else
+ throw e;
+ }
+ }
+
private Schema schemaFor(Schema[][] schemas, short version) {
if (id > schemas.length)
throw new IllegalArgumentException("No schema available for API key " + this);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 54f533e..3da2b3f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -65,6 +65,9 @@ public class Protocol {
/* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */
public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
+ /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
+ public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
+
public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
new Field("host", STRING, "The hostname of the broker."),
new Field("port", INT32,
@@ -129,9 +132,19 @@ public class Protocol {
"The broker id of the controller broker."),
new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
+ public static final Schema METADATA_RESPONSE_V3 = new Schema(
+ newThrottleTimeField(),
+ new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
+ "Host and port information for all brokers."),
+ new Field("cluster_id", NULLABLE_STRING,
+ "The cluster id that this broker belongs to."),
+ new Field("controller_id", INT32,
+ "The broker id of the controller broker."),
+ new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
+
- public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2};
- public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2};
+ public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
+ public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
/* Produce api */
@@ -190,11 +203,7 @@ public class Protocol {
INT16),
new Field("base_offset",
INT64))))))),
- new Field("throttle_time_ms",
- INT32,
- "Duration in milliseconds for which the request was throttled" +
- " due to quota violation. (Zero if the request did not violate any quota.)",
- 0));
+ newThrottleTimeField());
/**
* PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
* The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
@@ -215,11 +224,7 @@ public class Protocol {
"If CreateTime is used for the topic, the timestamp will be -1. " +
"If LogAppendTime is used for the topic, the timestamp will be " +
"the broker local time when the messages are appended."))))))),
- new Field("throttle_time_ms",
- INT32,
- "Duration in milliseconds for which the request was throttled" +
- " due to quota violation. (Zero if the request did not violate any quota.)",
- 0));
+ newThrottleTimeField());
public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
@@ -316,6 +321,9 @@ public class Protocol {
new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
"Topics to commit offsets."));
+ /* v3 request is same as v2. Throttle time has been added to response */
+ public static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
+
public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
@@ -329,13 +337,18 @@ public class Protocol {
public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
- public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
+ public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
/* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
- public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
+ public static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
+ newThrottleTimeField(),
+ new Field("responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
/* Offset fetch api */
@@ -401,8 +414,17 @@ public class Protocol {
new Field("error_code",
INT16));
- public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2};
- public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2};
+ /* v3 request is the same as v2. Throttle time has been added to v3 response */
+ public static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
+ public static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema(
+ newThrottleTimeField(),
+ new Field("responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
+ new Field("error_code",
+ INT16));
+
+ public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
+ public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
/* List offset api */
public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -445,6 +467,9 @@ public class Protocol {
new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
"Topics to list offsets."));
+ /* v2 request is the same as v1. Throttle time has been added to response */
+ public static final Schema LIST_OFFSET_REQUEST_V2 = LIST_OFFSET_REQUEST_V1;
+
public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
@@ -477,9 +502,13 @@ public class Protocol {
public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new Field("responses",
new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
+ public static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
+ newThrottleTimeField(),
+ new Field("responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
- public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1};
- public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1};
+ public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
+ public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
/* Fetch api */
public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -630,11 +659,7 @@ public class Protocol {
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
- public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
- INT32,
- "Duration in milliseconds for which the request was throttled" +
- " due to quota violation. (Zero if the request did not violate any quota.)",
- 0),
+ public static final Schema FETCH_RESPONSE_V1 = new Schema(newThrottleTimeField(),
new Field("responses",
new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
// Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
@@ -703,19 +728,11 @@ public class Protocol {
new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
public static final Schema FETCH_RESPONSE_V4 = new Schema(
- new Field("throttle_time_ms",
- INT32,
- "Duration in milliseconds for which the request was throttled " +
- "due to quota violation (zero if the request did not violate any quota).",
- 0),
+ newThrottleTimeField(),
new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
public static final Schema FETCH_RESPONSE_V5 = new Schema(
- new Field("throttle_time_ms",
- INT32,
- "Duration in milliseconds for which the request was throttled " +
- "due to quota violation (zero if the request did not violate any quota).",
- 0),
+ newThrottleTimeField(),
new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
@@ -724,19 +741,29 @@ public class Protocol {
/* List groups api */
public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
+ /* v1 request is the same as v0. Throttle time has been added to response */
+ public static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
+
public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
new Field("protocol_type", STRING));
public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+ public static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
+ newThrottleTimeField(),
+ new Field("error_code", INT16),
+ new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
- public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0};
- public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0};
+ public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
+ public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
/* Describe group api */
public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
new ArrayOf(STRING),
"List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
+ /* v1 request is the same as v0. Throttle time has been added to response */
+ public static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
+
public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
STRING,
"The memberId assigned by the coordinator"),
@@ -770,9 +797,12 @@ public class Protocol {
"Current group members (only provided if the group is not Dead)"));
public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+ public static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
+ newThrottleTimeField(),
+ new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
- public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
- public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
+ public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
+ public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
/* Find coordinator api */
public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
@@ -802,6 +832,7 @@ public class Protocol {
"Host and port information for the coordinator for a consumer group."));
public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
+ newThrottleTimeField(),
new Field("error_code", INT16),
new Field("error_message", NULLABLE_STRING),
new Field("coordinator",
@@ -870,6 +901,9 @@ public class Protocol {
new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
"List of protocols that the member supports"));
+ /* v2 request is the same as v1. Throttle time has been added to response */
+ public static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
+
public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
new Field("member_metadata", BYTES));
@@ -890,9 +924,27 @@ public class Protocol {
new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
-
- public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1};
- public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1};
+ public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
+ newThrottleTimeField(),
+ new Field("error_code", INT16),
+ new Field("generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("group_protocol",
+ STRING,
+ "The group protocol selected by the coordinator"),
+ new Field("leader_id",
+ STRING,
+ "The leader of the group"),
+ new Field("member_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("members",
+ new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
+
+
+ public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
+ public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
/* SyncGroup api */
public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
@@ -901,10 +953,18 @@ public class Protocol {
new Field("generation_id", INT32),
new Field("member_id", STRING),
new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
+
+ /* v1 request is the same as v0. Throttle time has been added to response */
+ public static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
+
public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
new Field("member_assignment", BYTES));
- public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0};
- public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0};
+ public static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
+ newThrottleTimeField(),
+ new Field("error_code", INT16),
+ new Field("member_assignment", BYTES));
+ public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
+ public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
/* Heartbeat api */
public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -915,10 +975,16 @@ public class Protocol {
STRING,
"The member id assigned by the group coordinator."));
+ /* v1 request is the same as v0. Throttle time has been added to response */
+ public static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
+
public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+ public static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
+ newThrottleTimeField(),
+ new Field("error_code", INT16));
- public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
- public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+ public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
+ public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
/* Leave group api */
public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -926,10 +992,16 @@ public class Protocol {
STRING,
"The member id assigned by the group coordinator."));
+ /* v1 request is the same as v0. Throttle time has been added to response */
+ public static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0;
+
public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+ public static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema(
+ newThrottleTimeField(),
+ new Field("error_code", INT16));
- public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0};
- public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0};
+ public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
+ public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
/* Leader and ISR api */
public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
@@ -1082,15 +1154,22 @@ public class Protocol {
/* ApiVersion api */
public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
+ /* v1 request is the same as v0. Throttle time has been added to response */
+ public static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0;
+
public static final Schema API_VERSIONS_V0 = new Schema(new Field("api_key", INT16, "API key."),
new Field("min_version", INT16, "Minimum supported version."),
new Field("max_version", INT16, "Maximum supported version."));
public static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."));
+ public static final Schema API_VERSIONS_RESPONSE_V1 = new Schema(
+ new Field("error_code", INT16, "Error code."),
+ new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
+ newThrottleTimeField());
- public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0};
- public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0};
+ public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
+ public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
/* Admin requests common */
public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"),
@@ -1154,9 +1233,16 @@ public class Protocol {
new Field("topic_errors",
new ArrayOf(TOPIC_ERROR),
"An array of per topic errors."));
+ /* v2 request is the same as v1. Throttle time has been added to the response */
+ public static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1;
+ public static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema(
+ newThrottleTimeField(),
+ new Field("topic_errors",
+ new ArrayOf(TOPIC_ERROR),
+ "An array of per topic errors."));
- public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1};
- public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1};
+ public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
+ public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
/* DeleteTopic api */
public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
@@ -1171,9 +1257,16 @@ public class Protocol {
new Field("topic_error_codes",
new ArrayOf(TOPIC_ERROR_CODE),
"An array of per topic error codes."));
+ /* v1 request is the same as v0. Throttle time has been added to the response */
+ public static final Schema DELETE_TOPICS_REQUEST_V1 = DELETE_TOPICS_REQUEST_V0;
+ public static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema(
+ newThrottleTimeField(),
+ new Field("topic_error_codes",
+ new ArrayOf(TOPIC_ERROR_CODE),
+ "An array of per topic error codes."));
- public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0};
- public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0};
+ public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
+ public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
new Field("offset", INT64, "The offset before which the messages will be deleted."));
@@ -1191,7 +1284,9 @@ public class Protocol {
public static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
new Field("partitions", new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0)));
- public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+ public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
+ new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
@@ -1207,6 +1302,7 @@ public class Protocol {
);
public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
new Field("error_code",
INT16,
"An integer error code."),
@@ -1289,6 +1385,7 @@ public class Protocol {
"The partitions to add to the transaction.")
);
public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
new Field("error_code",
INT16,
"An integer error code.")
@@ -1312,6 +1409,7 @@ public class Protocol {
"Consumer group id whose offsets should be included in the transaction.")
);
public static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
new Field("error_code",
INT16,
"An integer error code.")
@@ -1336,6 +1434,7 @@ public class Protocol {
);
public static final Schema END_TXN_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
new Field("error_code",
INT16,
"An integer error code.")
@@ -1425,6 +1524,7 @@ public class Protocol {
);
public static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
new Field("topics",
new ArrayOf(new Schema(
new Field("topic", STRING),
@@ -1535,6 +1635,12 @@ public class Protocol {
return apiKey < CURR_VERSION.length && apiVersion >= MIN_VERSIONS[apiKey] && apiVersion <= CURR_VERSION[apiKey];
}
+ private static Field newThrottleTimeField() {
+ return new Field("throttle_time_ms", INT32,
+ "Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota.)",
+ 0);
+ }
+
private static String indentString(int size) {
StringBuilder b = new StringBuilder(size);
for (int i = 0; i < size; i++)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 07bde63..04f2602 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -95,7 +95,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
/**
* Get an error response for a request
*/
- public abstract AbstractResponse getErrorResponse(Throwable e);
+ public AbstractResponse getErrorResponse(Throwable e) {
+ return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
+ }
+
+ /**
+ * Get an error response for a request with specified throttle time in the response if applicable
+ */
+ public abstract AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e);
/**
* Factory method for getting a request object based on ApiKey ID and a buffer
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 2286783..b76cb21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public abstract class AbstractResponse extends AbstractRequestResponse {
+ public static final int DEFAULT_THROTTLE_TIME = 0;
public Send toSend(String destination, RequestHeader requestHeader) {
return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 4245e82..733e806 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -96,8 +96,8 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
}
@Override
- public AddOffsetsToTxnResponse getErrorResponse(Throwable e) {
- return new AddOffsetsToTxnResponse(Errors.forException(e));
+ public AddOffsetsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new AddOffsetsToTxnResponse(throttleTimeMs, Errors.forException(e));
}
public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 2426514..8c41ae4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class AddOffsetsToTxnResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
@@ -35,15 +36,22 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
// InvalidProducerEpoch
private final Errors error;
+ private final int throttleTimeMs;
- public AddOffsetsToTxnResponse(Errors error) {
+ public AddOffsetsToTxnResponse(int throttleTimeMs, Errors error) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
}
public AddOffsetsToTxnResponse(Struct struct) {
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -51,6 +59,7 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 9a983d0..5bbea61 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -125,8 +125,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
}
@Override
- public AddPartitionsToTxnResponse getErrorResponse(Throwable e) {
- return new AddPartitionsToTxnResponse(Errors.forException(e));
+ public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new AddPartitionsToTxnResponse(throttleTimeMs, Errors.forException(e));
}
public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 0337044..893fcda 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class AddPartitionsToTxnResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
@@ -35,15 +36,22 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
// InvalidProducerEpoch
private final Errors error;
+ private final int throttleTimeMs;
- public AddPartitionsToTxnResponse(Errors error) {
+ public AddPartitionsToTxnResponse(int throttleTimeMs, Errors error) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
}
public AddPartitionsToTxnResponse(Struct struct) {
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Errors error() {
return error;
}
@@ -51,6 +59,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 07dd5f5..6f63040 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -30,6 +30,10 @@ public class ApiVersionsRequest extends AbstractRequest {
super(ApiKeys.API_VERSIONS);
}
+ public Builder(short version) {
+ super(ApiKeys.API_VERSIONS, version);
+ }
+
@Override
public ApiVersionsRequest build(short version) {
return new ApiVersionsRequest(version);
@@ -55,11 +59,13 @@ public class ApiVersionsRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+ case 1:
+ return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 382da89..d434c75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -31,7 +31,8 @@ import java.util.Map;
public class ApiVersionsResponse extends AbstractResponse {
- public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();
+ public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME);
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final String ERROR_CODE_KEY_NAME = "error_code";
public static final String API_VERSIONS_KEY_NAME = "api_versions";
public static final String API_KEY_NAME = "api_key";
@@ -44,6 +45,7 @@ public class ApiVersionsResponse extends AbstractResponse {
* UNSUPPORTED_VERSION (33)
*/
private final Errors error;
+ private final int throttleTimeMs;
private final Map<Short, ApiVersion> apiKeyToApiVersion;
public static final class ApiVersion {
@@ -72,11 +74,17 @@ public class ApiVersionsResponse extends AbstractResponse {
}
public ApiVersionsResponse(Errors error, List<ApiVersion> apiVersions) {
+ this(DEFAULT_THROTTLE_TIME, error, apiVersions);
+ }
+
+ public ApiVersionsResponse(int throttleTimeMs, Errors error, List<ApiVersion> apiVersions) {
+ this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions);
}
public ApiVersionsResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
List<ApiVersion> tempApiVersions = new ArrayList<>();
for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
@@ -92,6 +100,8 @@ public class ApiVersionsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.API_VERSIONS.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
struct.set(ERROR_CODE_KEY_NAME, error.code());
List<Struct> apiVersionList = new ArrayList<>();
for (ApiVersion apiVersion : apiKeyToApiVersion.values()) {
@@ -105,15 +115,26 @@ public class ApiVersionsResponse extends AbstractResponse {
return struct;
}
+ public static ApiVersionsResponse apiVersionsResponse(short version, int throttleTimeMs) {
+ if (throttleTimeMs == 0 || version == 0)
+ return API_VERSIONS_RESPONSE;
+ else
+ return createApiVersionsResponse(throttleTimeMs);
+ }
+
/**
* Returns Errors.UNSUPPORTED_VERSION response with version 0 since we don't support the requested version.
*/
public static Send unsupportedVersionSend(String destination, RequestHeader requestHeader) {
- ApiVersionsResponse response = new ApiVersionsResponse(Errors.UNSUPPORTED_VERSION,
+ ApiVersionsResponse response = new ApiVersionsResponse(DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION,
Collections.<ApiVersion>emptyList());
return response.toSend(destination, (short) 0, requestHeader.toResponseHeader());
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Collection<ApiVersion> apiVersions() {
return apiKeyToApiVersion.values();
}
@@ -127,15 +148,15 @@ public class ApiVersionsResponse extends AbstractResponse {
}
public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
- return new ApiVersionsResponse(ApiKeys.API_VERSIONS.responseSchema(version).read(buffer));
+ return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
}
- private static ApiVersionsResponse createApiVersionsResponse() {
+ public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs) {
List<ApiVersion> versionList = new ArrayList<>();
for (ApiKeys apiKey : ApiKeys.values()) {
versionList.add(new ApiVersion(apiKey));
}
- return new ApiVersionsResponse(Errors.NONE, versionList);
+ return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList);
}
private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 4b5ec13..ee41665 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -62,7 +62,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 072dde8..a0626cc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -209,7 +209,7 @@ public class CreateTopicsRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>();
for (String topic : topics.keySet()) {
Errors error = Errors.forException(e);
@@ -223,6 +223,8 @@ public class CreateTopicsRequest extends AbstractRequest {
case 0:
case 1:
return new CreateTopicsResponse(topicErrors);
+ case 2:
+ return new CreateTopicsResponse(throttleTimeMs, topicErrors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_TOPICS.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 33a4b4a..54f9764 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
public class CreateTopicsResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
private static final String TOPIC_KEY_NAME = "topic";
private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -82,8 +83,14 @@ public class CreateTopicsResponse extends AbstractResponse {
*/
private final Map<String, Error> errors;
+ private final int throttleTimeMs;
public CreateTopicsResponse(Map<String, Error> errors) {
+ this(DEFAULT_THROTTLE_TIME, errors);
+ }
+
+ public CreateTopicsResponse(int throttleTimeMs, Map<String, Error> errors) {
+ this.throttleTimeMs = throttleTimeMs;
this.errors = errors;
}
@@ -100,12 +107,15 @@ public class CreateTopicsResponse extends AbstractResponse {
errors.put(topic, new Error(error, errorMessage));
}
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
this.errors = errors;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.CREATE_TOPICS.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
for (Map.Entry<String, Error> topicError : errors.entrySet()) {
@@ -121,6 +131,10 @@ public class CreateTopicsResponse extends AbstractResponse {
return struct;
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Map<String, Error> errors() {
return errors;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
index 96f064c..fcd9836 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -119,7 +119,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> responseMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
@@ -129,7 +129,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
short versionId = version();
switch (versionId) {
case 0:
- return new DeleteRecordsResponse(responseMap);
+ return new DeleteRecordsResponse(throttleTimeMs, responseMap);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 45b518b..64165eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -33,6 +33,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
public static final long INVALID_LOW_WATERMARK = -1L;
// request level key names
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPICS_KEY_NAME = "topics";
// topic level key names
@@ -44,6 +45,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
private static final String LOW_WATERMARK_KEY_NAME = "low_watermark";
private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private final int throttleTimeMs;
private final Map<TopicPartition, PartitionResponse> responses;
/**
@@ -80,6 +82,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
}
public DeleteRecordsResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
responses = new HashMap<>();
for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
Struct topicStruct = (Struct) topicStructObj;
@@ -97,13 +100,16 @@ public class DeleteRecordsResponse extends AbstractResponse {
/**
* Constructor for version 0.
*/
- public DeleteRecordsResponse(Map<TopicPartition, PartitionResponse> responses) {
+ public DeleteRecordsResponse(int throttleTimeMs, Map<TopicPartition, PartitionResponse> responses) {
+ this.throttleTimeMs = throttleTimeMs;
this.responses = responses;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
List<Struct> topicStructArray = new ArrayList<>();
for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) {
@@ -125,6 +131,10 @@ public class DeleteRecordsResponse extends AbstractResponse {
return struct;
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Map<TopicPartition, PartitionResponse> responses() {
return this.responses;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index ccbe211..2ea8c21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -86,7 +86,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<String, Errors> topicErrors = new HashMap<>();
for (String topic : topics)
topicErrors.put(topic, Errors.forException(e));
@@ -94,6 +94,8 @@ public class DeleteTopicsRequest extends AbstractRequest {
switch (version()) {
case 0:
return new DeleteTopicsResponse(topicErrors);
+ case 1:
+ return new DeleteTopicsResponse(throttleTimeMs, topicErrors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
version(), this.getClass().getSimpleName(), ApiKeys.DELETE_TOPICS.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 9d0d0f3..1b80d1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
public class DeleteTopicsResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
private static final String TOPIC_KEY_NAME = "topic";
private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -40,12 +41,19 @@ public class DeleteTopicsResponse extends AbstractResponse {
* NOT_CONTROLLER(41)
*/
private final Map<String, Errors> errors;
+ private final int throttleTimeMs;
public DeleteTopicsResponse(Map<String, Errors> errors) {
+ this(DEFAULT_THROTTLE_TIME, errors);
+ }
+
+ public DeleteTopicsResponse(int throttleTimeMs, Map<String, Errors> errors) {
+ this.throttleTimeMs = throttleTimeMs;
this.errors = errors;
}
public DeleteTopicsResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
Map<String, Errors> errors = new HashMap<>();
for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
@@ -61,6 +69,8 @@ public class DeleteTopicsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DELETE_TOPICS.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
@@ -72,6 +82,10 @@ public class DeleteTopicsResponse extends AbstractResponse {
return struct;
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Map<String, Errors> errors() {
return errors;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 287eda9..b43e254 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -73,11 +73,13 @@ public class DescribeGroupsRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(Throwable e) {
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short version = version();
switch (version) {
case 0:
return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
+ case 1:
+ return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 797ed58..bd7a087 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -29,6 +29,7 @@ import java.util.Map;
public class DescribeGroupsResponse extends AbstractResponse {
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String GROUPS_KEY_NAME = "groups";
private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -58,12 +59,19 @@ public class DescribeGroupsResponse extends AbstractResponse {
*/
private final Map<String, GroupMetadata> groups;
+ private final int throttleTimeMs;
public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
+ this(DEFAULT_THROTTLE_TIME, groups);
+ }
+
+ public DescribeGroupsResponse(int throttleTimeMs, Map<String, GroupMetadata> groups) {
+ this.throttleTimeMs = throttleTimeMs;
this.groups = groups;
}
public DescribeGroupsResponse(Struct struct) {
+ this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
this.groups = new HashMap<>();
for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
Struct groupStruct = (Struct) groupObj;
@@ -89,6 +97,10 @@ public class DescribeGroupsResponse extends AbstractResponse {
}
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public Map<String, GroupMetadata> groups() {
return groups;
}
@@ -184,16 +196,22 @@ public class DescribeGroupsResponse extends AbstractResponse {
}
public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
+ return fromError(DEFAULT_THROTTLE_TIME, error, groupIds);
+ }
+
+ public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) {
GroupMetadata errorMetadata = GroupMetadata.forError(error);
Map<String, GroupMetadata> groups = new HashMap<>();
for (String groupId : groupIds)
groups.put(groupId, errorMetadata);
- return new DescribeGroupsResponse(groups);
+ return new DescribeGroupsResponse(throttleTimeMs, groups);
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version));
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
List<Struct> groupStructs = new ArrayList<>();
for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index e6eb54e..9c215be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -96,8 +96,8 @@ public class EndTxnRequest extends AbstractRequest {
}
@Override
- public EndTxnResponse getErrorResponse(Throwable e) {
- return new EndTxnResponse(Errors.forException(e));
+ public EndTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new EndTxnResponse(throttleTimeMs, Errors.forException(e));
}
public static EndTxnRequest parse(ByteBuffer buffer, short version) {