You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/25 19:28:46 UTC
kafka git commit: KAFKA-5320: Include all request throttling in
client throttle metrics
Repository: kafka
Updated Branches:
refs/heads/trunk 20e200878 -> 73ca0d215
KAFKA-5320: Include all request throttling in client throttle metrics
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3137 from rajinisivaram/KAFKA-5320
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73ca0d21
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73ca0d21
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73ca0d21
Branch: refs/heads/trunk
Commit: 73ca0d215ead9574487744eb89f7ae677a9e13ea
Parents: 20e2008
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu May 25 20:28:18 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu May 25 20:28:18 2017 +0100
----------------------------------------------------------------------
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/clients/NetworkClient.java | 40 +++++++++++--
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../clients/consumer/internals/Fetcher.java | 26 +++++----
.../kafka/clients/producer/KafkaProducer.java | 4 +-
.../clients/producer/internals/Sender.java | 25 ++++----
.../kafka/common/requests/AbstractResponse.java | 1 +
.../requests/AddOffsetsToTxnResponse.java | 1 -
.../requests/AddPartitionsToTxnResponse.java | 1 -
.../common/requests/AlterConfigsResponse.java | 2 -
.../common/requests/ApiVersionsResponse.java | 1 -
.../common/requests/CreateAclsResponse.java | 5 +-
.../common/requests/CreateTopicsResponse.java | 1 -
.../common/requests/DeleteAclsResponse.java | 5 +-
.../common/requests/DeleteRecordsResponse.java | 1 -
.../common/requests/DeleteTopicsResponse.java | 1 -
.../common/requests/DescribeAclsResponse.java | 5 +-
.../requests/DescribeConfigsResponse.java | 2 -
.../common/requests/DescribeGroupsResponse.java | 1 -
.../kafka/common/requests/EndTxnResponse.java | 1 -
.../kafka/common/requests/FetchResponse.java | 1 -
.../requests/FindCoordinatorResponse.java | 1 -
.../common/requests/HeartbeatResponse.java | 1 -
.../common/requests/InitProducerIdResponse.java | 1 -
.../common/requests/JoinGroupResponse.java | 1 -
.../common/requests/LeaveGroupResponse.java | 1 -
.../common/requests/ListGroupsResponse.java | 1 -
.../common/requests/ListOffsetResponse.java | 1 -
.../kafka/common/requests/MetadataResponse.java | 1 -
.../common/requests/OffsetCommitResponse.java | 1 -
.../common/requests/OffsetFetchResponse.java | 1 -
.../kafka/common/requests/ProduceResponse.java | 1 -
.../common/requests/SyncGroupResponse.java | 1 -
.../requests/TxnOffsetCommitResponse.java | 1 -
.../clients/consumer/internals/FetcherTest.java | 60 ++++++++++++++------
.../clients/producer/internals/SenderTest.java | 45 ++++++++++++---
36 files changed, 156 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3b865bc..a8d3033 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
- files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+ files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 93fbb85..bfd0eb5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
@@ -104,6 +105,8 @@ public class NetworkClient implements KafkaClient {
private final List<ClientResponse> abortedSends = new LinkedList<>();
+ private final Sensor throttleTimeSensor;
+
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
@@ -119,7 +122,27 @@ public class NetworkClient implements KafkaClient {
this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
reconnectBackoffMs, reconnectBackoffMax,
socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
- discoverBrokerVersions, apiVersions);
+ discoverBrokerVersions, apiVersions, null);
+ }
+
+ public NetworkClient(Selectable selector,
+ Metadata metadata,
+ String clientId,
+ int maxInFlightRequestsPerConnection,
+ long reconnectBackoffMs,
+ long reconnectBackoffMax,
+ int socketSendBuffer,
+ int socketReceiveBuffer,
+ int requestTimeoutMs,
+ Time time,
+ boolean discoverBrokerVersions,
+ ApiVersions apiVersions,
+ Sensor throttleTimeSensor) {
+
+ this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
+ reconnectBackoffMs, reconnectBackoffMax,
+ socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
+ discoverBrokerVersions, apiVersions, throttleTimeSensor);
}
public NetworkClient(Selectable selector,
@@ -137,7 +160,7 @@ public class NetworkClient implements KafkaClient {
this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection,
reconnectBackoffMs, reconnectBackoffMax,
socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
- discoverBrokerVersions, apiVersions);
+ discoverBrokerVersions, apiVersions, null);
}
private NetworkClient(MetadataUpdater metadataUpdater,
@@ -152,7 +175,8 @@ public class NetworkClient implements KafkaClient {
int requestTimeoutMs,
Time time,
boolean discoverBrokerVersions,
- ApiVersions apiVersions) {
+ ApiVersions apiVersions,
+ Sensor throttleTimeSensor) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
@@ -177,6 +201,7 @@ public class NetworkClient implements KafkaClient {
this.time = time;
this.discoverBrokerVersions = discoverBrokerVersions;
this.apiVersions = apiVersions;
+ this.throttleTimeSensor = throttleTimeSensor;
}
/**
@@ -480,11 +505,18 @@ public class NetworkClient implements KafkaClient {
}
public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
+ return parseResponseMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0);
+ }
+
+ private static AbstractResponse parseResponseMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,
+ Sensor throttleTimeSensor, long now) {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
// Always expect the response version id to be the same as the request version id
ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer);
correlate(requestHeader, responseHeader);
+ if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME))
+ throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME), now);
return AbstractResponse.getResponse(apiKey, responseBody);
}
@@ -572,7 +604,7 @@ public class NetworkClient implements KafkaClient {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
- AbstractResponse body = parseResponse(receive.payload(), req.header);
+ AbstractResponse body = parseResponseMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now);
log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6489792..0359071 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -666,6 +666,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
+ Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricGrpPrefix);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
@@ -679,7 +680,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
time,
true,
- new ApiVersions());
+ new ApiVersions(),
+ throttleTimeSensor);
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a79ea5d..01bd0e5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -227,7 +227,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
sensors.fetchLatency.record(resp.requestLatencyMs());
- sensors.fetchThrottleTimeSensor.record(response.throttleTimeMs());
}
@Override
@@ -932,6 +931,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
sensors.updatePartitionLagSensors(assignment);
}
+ public static Sensor throttleTimeSensor(Metrics metrics, String metricGrpPrefix) {
+ String metricGrpName = metricGrpPrefix + FetchManagerMetrics.METRIC_GROUP_SUFFIX;
+ Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
+ fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
+ metricGrpName,
+ "The average throttle time in ms"), new Avg());
+ fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
+ metricGrpName,
+ "The maximum throttle time in ms"), new Max());
+ return fetchThrottleTimeSensor;
+ }
+
private class PartitionRecords {
private final TopicPartition partition;
private final CompletedFetch completedFetch;
@@ -1214,19 +1225,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
private static class FetchManagerMetrics {
+ private static final String METRIC_GROUP_SUFFIX = "-fetch-manager-metrics";
private final Metrics metrics;
private final String metricGrpName;
private final Sensor bytesFetched;
private final Sensor recordsFetched;
private final Sensor fetchLatency;
private final Sensor recordsFetchLag;
- private final Sensor fetchThrottleTimeSensor;
private Set<TopicPartition> assignedPartitions;
private FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;
- this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
+ this.metricGrpName = metricGrpPrefix + METRIC_GROUP_SUFFIX;
this.bytesFetched = metrics.sensor("bytes-fetched");
this.bytesFetched.add(metrics.metricName("fetch-size-avg",
@@ -1262,15 +1273,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
this.recordsFetchLag.add(metrics.metricName("records-lag-max",
this.metricGrpName,
"The maximum lag in terms of number of records for any partition in this window"), new Max());
-
- this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
- this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
- this.metricGrpName,
- "The average throttle time in ms"), new Avg());
-
- this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
- this.metricGrpName,
- "The maximum throttle time in ms"), new Max());
}
private void recordTopicFetchMetrics(String topic, int bytes, int records) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 22baf3c..4fcbcc8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -290,6 +290,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
+ Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder),
@@ -303,7 +304,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.requestTimeoutMs,
time,
true,
- apiVersions);
+ apiVersions,
+ throttleTimeSensor);
this.sender = new Sender(client,
this.metadata,
this.accumulator,
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 116a1c5..3fa5903 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -463,7 +463,6 @@ public class Sender implements Runnable {
completeBatch(batch, partResp, correlationId, now);
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
- this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
for (ProducerBatch batch : batches.values()) {
@@ -661,11 +660,22 @@ public class Sender implements Runnable {
this.client.wakeup();
}
+ public static Sensor throttleTimeSensor(Metrics metrics) {
+ String metricGrpName = SenderMetrics.METRIC_GROUP_NAME;
+ Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
+ produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-avg",
+ metricGrpName, "The average throttle time in ms"), new Avg());
+ produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-max",
+ metricGrpName, "The maximum throttle time in ms"), new Max());
+ return produceThrottleTimeSensor;
+ }
+
/**
* A collection of sensors for the sender
*/
private class SenderMetrics {
+ private static final String METRIC_GROUP_NAME = "producer-metrics";
private final Metrics metrics;
public final Sensor retrySensor;
public final Sensor errorSensor;
@@ -675,12 +685,11 @@ public class Sender implements Runnable {
public final Sensor batchSizeSensor;
public final Sensor compressionRateSensor;
public final Sensor maxRecordSizeSensor;
- public final Sensor produceThrottleTimeSensor;
public final Sensor batchSplitSensor;
public SenderMetrics(Metrics metrics) {
this.metrics = metrics;
- String metricGrpName = "producer-metrics";
+ String metricGrpName = METRIC_GROUP_NAME;
this.batchSizeSensor = metrics.sensor("batch-size");
MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.");
@@ -704,12 +713,6 @@ public class Sender implements Runnable {
m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request latency in ms");
this.requestTimeSensor.add(m, new Max());
- this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
- m = metrics.metricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms");
- this.produceThrottleTimeSensor.add(m, new Avg());
- m = metrics.metricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms");
- this.produceThrottleTimeSensor.add(m, new Max());
-
this.recordsPerRequestSensor = metrics.sensor("records-per-request");
m = metrics.metricName("record-send-rate", metricGrpName, "The average number of records sent per second.");
this.recordsPerRequestSensor.add(m, new Rate());
@@ -847,10 +850,6 @@ public class Sender implements Runnable {
}
}
- public void recordThrottleTime(long throttleTimeMs) {
- this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
- }
-
void recordBatchSplit() {
this.batchSplitSensor.record();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 1000ef5..99b35e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public abstract class AbstractResponse extends AbstractRequestResponse {
+ public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final int DEFAULT_THROTTLE_TIME = 0;
public Send toSend(String destination, RequestHeader requestHeader) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 754f242..0536636 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class AddOffsetsToTxnResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 39172ee..4112b93 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
public class AddPartitionsToTxnResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String ERRORS_KEY_NAME = "errors";
private static final String TOPIC_NAME = "topic";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index 8f904d8..3a3eb9a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -28,8 +28,6 @@ import java.util.Map;
public class AlterConfigsResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
-
private static final String RESOURCES_KEY_NAME = "resources";
private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index d434c75..6f921a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -32,7 +32,6 @@ import java.util.Map;
public class ApiVersionsResponse extends AbstractResponse {
public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME);
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final String ERROR_CODE_KEY_NAME = "error_code";
public static final String API_VERSIONS_KEY_NAME = "api_versions";
public static final String API_KEY_NAME = "api_key";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 885981a..c84b97c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.List;
public class CreateAclsResponse extends AbstractResponse {
- private final static String THROTTLE_TIME_MS = "throttle_time_ms";
private final static String CREATION_RESPONSES = "creation_responses";
private final static String ERROR_CODE = "error_code";
private final static String ERROR_MESSAGE = "error_message";
@@ -57,7 +56,7 @@ public class CreateAclsResponse extends AbstractResponse {
}
public CreateAclsResponse(Struct struct) {
- this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
this.aclCreationResponses = new ArrayList<>();
for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
Struct responseStruct = (Struct) responseStructObj;
@@ -75,7 +74,7 @@ public class CreateAclsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
List<Struct> responseStructs = new ArrayList<>();
for (AclCreationResponse response : aclCreationResponses) {
Struct responseStruct = struct.instance(CREATION_RESPONSES);
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index e46e7a1..c34265d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
public class CreateTopicsResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
private static final String TOPIC_KEY_NAME = "topic";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 6fffc0f..341021b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -35,7 +35,6 @@ import java.util.List;
public class DeleteAclsResponse extends AbstractResponse {
public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class);
- private final static String THROTTLE_TIME_MS = "throttle_time_ms";
private final static String FILTER_RESPONSES = "filter_responses";
private final static String ERROR_CODE = "error_code";
private final static String ERROR_MESSAGE = "error_message";
@@ -97,7 +96,7 @@ public class DeleteAclsResponse extends AbstractResponse {
}
public DeleteAclsResponse(Struct struct) {
- this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
this.responses = new ArrayList<>();
for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) {
Struct responseStruct = (Struct) responseStructObj;
@@ -130,7 +129,7 @@ public class DeleteAclsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
List<Struct> responseStructs = new ArrayList<>();
for (AclFilterResponse response : responses) {
Struct responseStruct = struct.instance(FILTER_RESPONSES);
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 64165eb..f19f933 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -33,7 +33,6 @@ public class DeleteRecordsResponse extends AbstractResponse {
public static final long INVALID_LOW_WATERMARK = -1L;
// request level key names
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPICS_KEY_NAME = "topics";
// topic level key names
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 1b80d1c..3f11167 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
public class DeleteTopicsResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
private static final String TOPIC_KEY_NAME = "topic";
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index 0de4865..127493b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
public class DescribeAclsResponse extends AbstractResponse {
- private final static String THROTTLE_TIME_MS = "throttle_time_ms";
private final static String ERROR_CODE = "error_code";
private final static String ERROR_MESSAGE = "error_message";
private final static String RESOURCES = "resources";
@@ -50,7 +49,7 @@ public class DescribeAclsResponse extends AbstractResponse {
}
public DescribeAclsResponse(Struct struct) {
- this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+ this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
Errors error = Errors.forCode(struct.getShort(ERROR_CODE));
if (error != Errors.NONE) {
this.throwable = error.exception(struct.getString(ERROR_MESSAGE));
@@ -73,7 +72,7 @@ public class DescribeAclsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
if (throwable != null) {
Errors errors = Errors.forException(throwable);
struct.set(ERROR_CODE, errors.code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 05bf88d..8694e1f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -29,8 +29,6 @@ import java.util.Map;
public class DescribeConfigsResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
-
private static final String RESOURCES_KEY_NAME = "resources";
private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index bd7a087..0e1d6bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -29,7 +29,6 @@ import java.util.Map;
public class DescribeGroupsResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String GROUPS_KEY_NAME = "groups";
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 17cf68d..9083808 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class EndTxnResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0cb87b5..96fee43 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -43,7 +43,6 @@ public class FetchResponse extends AbstractResponse {
// topic level field names
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partition_responses";
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
// partition level field names
private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index e7df8e8..11eed1d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
public class FindCoordinatorResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
private static final String COORDINATOR_KEY_NAME = "coordinator";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index a90212b..cec39f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
public class HeartbeatResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 96e1cdf..da5e6e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -31,7 +31,6 @@ public class InitProducerIdResponse extends AbstractResponse {
// TransactionalIdAuthorizationFailed
// ClusterAuthorizationFailed
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index a1c9e2b..eb86ce7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -28,7 +28,6 @@ import java.util.Map;
public class JoinGroupResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index ccfc8a7..1c85850 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
public class LeaveGroupResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 13f589f..8ae3792 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -27,7 +27,6 @@ import java.util.List;
public class ListGroupsResponse extends AbstractResponse {
- public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final String ERROR_CODE_KEY_NAME = "error_code";
public static final String GROUPS_KEY_NAME = "groups";
public static final String GROUP_ID_KEY_NAME = "group_id";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 61c2a55..7dfaedc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -33,7 +33,6 @@ public class ListOffsetResponse extends AbstractResponse {
public static final long UNKNOWN_TIMESTAMP = -1L;
public static final long UNKNOWN_OFFSET = -1L;
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String RESPONSES_KEY_NAME = "responses";
// topic level field names
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 017fdf4..74e058b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -35,7 +35,6 @@ import java.util.Set;
public class MetadataResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String BROKERS_KEY_NAME = "brokers";
private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index d8d647d..06e5608 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -30,7 +30,6 @@ import java.util.Map;
public class OffsetCommitResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String RESPONSES_KEY_NAME = "responses";
// topic level fields
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index f795a5b..6315535 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.CollectionUtils;
public class OffsetFetchResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String RESPONSES_KEY_NAME = "responses";
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 55332f6..d42f1c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -39,7 +39,6 @@ public class ProduceResponse extends AbstractResponse {
// topic level field names
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index b99a99f..c96e21f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
public class SyncGroupResponse extends AbstractResponse {
- public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final String ERROR_CODE_KEY_NAME = "error_code";
public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index a62568f..e7b349c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.Map;
public class TxnOffsetCommitResponse extends AbstractResponse {
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String PARTITIONS_KEY_NAME = "partitions";
private static final String TOPIC_KEY_NAME = "topic";
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4e46d57..a81dc58 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,8 +16,11 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -40,6 +43,8 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
@@ -64,10 +69,14 @@ import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -1033,28 +1042,43 @@ public class FetcherTest {
*/
@Test
public void testQuotaMetrics() throws Exception {
- subscriptions.assignFromUser(singleton(tp1));
- subscriptions.seek(tp1, 0);
-
- // normal fetch
- for (int i = 1; i < 4; i++) {
- // We need to make sure the message offset grows. Otherwise they will be considered as already consumed
- // and filtered out by consumer.
- MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
- TimestampType.CREATE_TIME, 0L);
- for (int v = 0; v < 3; v++)
- builder.appendWithOffset(i * 3 + v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
- List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp1);
- assertEquals(3, records.size());
+ MockSelector selector = new MockSelector(time);
+ Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, "consumer" + groupId);
+ Cluster cluster = TestUtils.singletonCluster("test", 1);
+ Node node = cluster.nodes().get(0);
+ NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
+ 1000, 1000, 64 * 1024, 64 * 1024, 1000,
+ time, true, new ApiVersions(), throttleTimeSensor);
+
+ short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
+ ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
+ selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
+ while (!client.ready(node, time.milliseconds()))
+ client.poll(1, time.milliseconds());
+ selector.clear();
+
+ for (int i = 1; i <= 3; i++) {
+ int throttleTimeMs = 100 * i;
+ FetchRequest.Builder builder = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<TopicPartition, PartitionData>());
+ ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
+ client.send(request, time.milliseconds());
+ client.poll(1, time.milliseconds());
+ FetchResponse response = fetchResponse(nextRecords, Errors.NONE, i, throttleTimeMs);
+ buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId()));
+ selector.completeReceive(new NetworkReceive(node.idString(), buffer));
+ client.poll(1, time.milliseconds());
+ selector.clear();
}
-
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
- KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup));
- KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup));
- assertEquals(200, avgMetric.value(), EPSILON);
- assertEquals(300, maxMetric.value(), EPSILON);
+ KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
+ KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
+ // Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
+ assertEquals(250, avgMetric.value(), EPSILON);
+ assertEquals(400, maxMetric.value(), EPSILON);
+ client.close();
}
+
/*
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 719efe9..50c4cd4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -17,8 +17,10 @@
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
@@ -30,6 +32,8 @@ import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
@@ -43,12 +47,16 @@ import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -212,18 +220,41 @@ public class SenderTest {
*/
@Test
public void testQuotaMetrics() throws Exception {
- final long offset = 0;
+ MockSelector selector = new MockSelector(time);
+ Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
+ Cluster cluster = TestUtils.singletonCluster("test", 1);
+ Node node = cluster.nodes().get(0);
+ NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
+ 1000, 1000, 64 * 1024, 64 * 1024, 1000,
+ time, true, new ApiVersions(), throttleTimeSensor);
+
+ short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
+ ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
+ selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
+ while (!client.ready(node, time.milliseconds()))
+ client.poll(1, time.milliseconds());
+ selector.clear();
+
for (int i = 1; i <= 3; i++) {
- accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
- sender.run(time.milliseconds()); // send produce request
- client.respond(produceResponse(tp0, offset, Errors.NONE, 100 * i));
- sender.run(time.milliseconds());
+ int throttleTimeMs = 100 * i;
+ ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+ Collections.<TopicPartition, MemoryRecords>emptyMap());
+ ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
+ client.send(request, time.milliseconds());
+ client.poll(1, time.milliseconds());
+ ProduceResponse response = produceResponse(tp0, i, Errors.NONE, throttleTimeMs);
+ buffer = response.serialize(ApiKeys.PRODUCE.latestVersion(), new ResponseHeader(request.correlationId()));
+ selector.completeReceive(new NetworkReceive(node.idString(), buffer));
+ client.poll(1, time.milliseconds());
+ selector.clear();
}
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, ""));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, ""));
- assertEquals(200, avgMetric.value(), EPS);
- assertEquals(300, maxMetric.value(), EPS);
+ // Throttle times are ApiVersions=400, Produce=(100, 200, 300)
+ assertEquals(250, avgMetric.value(), EPS);
+ assertEquals(400, maxMetric.value(), EPS);
+ client.close();
}
@Test