You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/08/26 03:33:32 UTC
kafka git commit: KAFKA-2136;
Add throttle time (on quota violation) in fetch/produce responses;
reviewed by Joel Koshy, Dong Lin and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 5d453ba6d -> 436b7ddc3
KAFKA-2136; Add throttle time (on quota violation) in fetch/produce
responses; reviewed by Joel Koshy, Dong Lin and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/436b7ddc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/436b7ddc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/436b7ddc
Branch: refs/heads/trunk
Commit: 436b7ddc386eb688ba0f12836710f5e4bcaa06c8
Parents: 5d453ba
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Aug 25 17:52:39 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Aug 25 18:33:10 2015 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 13 +++
.../clients/producer/internals/Sender.java | 14 ++++
.../apache/kafka/common/protocol/Protocol.java | 38 +++++++--
.../kafka/common/requests/FetchRequest.java | 2 +-
.../kafka/common/requests/FetchResponse.java | 78 +++++++++++++-----
.../kafka/common/requests/ProduceRequest.java | 4 +-
.../kafka/common/requests/ProduceResponse.java | 68 +++++++++++----
.../clients/consumer/internals/FetcherTest.java | 87 +++++++++++++-------
.../clients/producer/internals/SenderTest.java | 35 ++++++--
.../common/requests/RequestResponseTest.java | 37 ++++++++-
.../src/main/scala/kafka/api/FetchRequest.scala | 9 +-
.../main/scala/kafka/api/FetchResponse.scala | 69 +++++++++++-----
.../main/scala/kafka/api/ProducerRequest.scala | 2 +-
.../main/scala/kafka/api/ProducerResponse.scala | 16 +++-
.../scala/kafka/consumer/SimpleConsumer.scala | 2 +-
.../kafka/server/AbstractFetcherThread.scala | 16 +++-
.../scala/kafka/server/ClientQuotaManager.scala | 23 +++---
.../src/main/scala/kafka/server/KafkaApis.scala | 43 ++++++----
.../kafka/server/ReplicaFetcherThread.scala | 4 +-
.../scala/kafka/server/ReplicaManager.scala | 2 -
.../scala/kafka/server/ThrottledResponse.scala | 8 +-
.../api/RequestResponseSerializationTest.scala | 40 ++++++++-
.../kafka/server/ClientQuotaManagerTest.scala | 2 +-
.../ThrottledResponseExpirationTest.scala | 8 +-
24 files changed, 463 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9dc6697..1ae6d03 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -453,6 +453,7 @@ public class Fetcher<K, V> {
}
this.sensors.bytesFetched.record(totalBytes);
this.sensors.recordsFetched.record(totalCount);
+ this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
this.sensors.fetchLatency.record(resp.requestLatencyMs());
}
@@ -493,6 +494,7 @@ public class Fetcher<K, V> {
public final Sensor recordsFetched;
public final Sensor fetchLatency;
public final Sensor recordsFetchLag;
+ public final Sensor fetchThrottleTimeSensor;
public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
@@ -542,6 +544,17 @@ public class Fetcher<K, V> {
this.metricGrpName,
"The maximum lag in terms of number of records for any partition in this window",
tags), new Max());
+
+ this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
+ this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg",
+ this.metricGrpName,
+ "The average throttle time in ms",
+ tags), new Avg());
+
+ this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max",
+ this.metricGrpName,
+ "The maximum throttle time in ms",
+ tags), new Max());
}
public void recordTopicFetchMetrics(String topic, int bytes, int records) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 0baf16e..d2e64f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -253,6 +253,8 @@ public class Sender implements Runnable {
completeBatch(batch, error, partResp.baseOffset, correlationId, now);
}
this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
+ this.sensors.recordThrottleTime(response.request().request().destination(),
+ produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
for (RecordBatch batch : batches.values())
@@ -352,6 +354,7 @@ public class Sender implements Runnable {
public final Sensor batchSizeSensor;
public final Sensor compressionRateSensor;
public final Sensor maxRecordSizeSensor;
+ public final Sensor produceThrottleTimeSensor;
public SenderMetrics(Metrics metrics) {
this.metrics = metrics;
@@ -381,6 +384,12 @@ public class Sender implements Runnable {
m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags);
this.requestTimeSensor.add(m, new Max());
+ this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
+ m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags);
+ this.produceThrottleTimeSensor.add(m, new Avg());
+ m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags);
+ this.produceThrottleTimeSensor.add(m, new Max());
+
this.recordsPerRequestSensor = metrics.sensor("records-per-request");
m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags);
this.recordsPerRequestSensor.add(m, new Rate());
@@ -515,6 +524,11 @@ public class Sender implements Runnable {
nodeRequestTime.record(latency, now);
}
}
+
+ public void recordThrottleTime(String node, long throttleTimeMs) {
+ this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3dc8b01..048d761 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -107,9 +107,25 @@ public class Protocol {
INT16),
new Field("base_offset",
INT64))))))));
-
- public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
- public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
+ public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;
+
+ public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
+ new ArrayOf(new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(new Schema(new Field("partition",
+ INT32),
+ new Field("error_code",
+ INT16),
+ new Field("base_offset",
+ INT64))))))),
+ new Field("throttle_time_ms",
+ INT32,
+ "Duration in milliseconds for which the request was throttled" +
+ " due to quota violation. (Zero if the request did not violate any quota.)",
+ 0));
+
+ public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1};
+ public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1};
/* Offset commit api */
public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -342,6 +358,9 @@ public class Protocol {
new ArrayOf(FETCH_REQUEST_TOPIC_V0),
"Topics to fetch."));
+ // The V1 Fetch Request body is the same as V0.
+ // Only the version number is incremented to indicate a newer client
+ public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
@@ -357,9 +376,16 @@ public class Protocol {
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
- public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
- public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
+ public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
+ INT32,
+ "Duration in milliseconds for which the request was throttled" +
+ " due to quota violation. (Zero if the request did not violate any quota.)",
+ 0),
+ new Field("responses",
+ new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1};
+ public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
/* Consumer metadata api */
public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index df073a0..feb4109 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -132,7 +132,7 @@ public class FetchRequest extends AbstractRequest {
switch (versionId) {
case 0:
- return new FetchResponse(responseData);
+ return new FetchResponse(responseData, 0);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index eb8951f..7b78415 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -29,6 +29,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * This wrapper supports both v0 and v1 of FetchResponse.
+ */
public class FetchResponse extends AbstractRequestResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
@@ -37,12 +40,16 @@ public class FetchResponse extends AbstractRequestResponse {
// topic level field names
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partition_responses";
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
- /**
+ // Default throttle time
+ private static final int DEFAULT_THROTTLE_TIME = 0;
+
+ /**
* Possible error code:
*
* OFFSET_OUT_OF_RANGE (1)
@@ -59,6 +66,7 @@ public class FetchResponse extends AbstractRequestResponse {
public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
private final Map<TopicPartition, PartitionData> responseData;
+ private final int throttleTime;
public static final class PartitionData {
public final short errorCode;
@@ -72,8 +80,50 @@ public class FetchResponse extends AbstractRequestResponse {
}
}
+ /**
+ * Constructor for Version 0
+ * @param responseData fetched data grouped by topic-partition
+ */
public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
+ super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0)));
+ initCommonFields(responseData);
+ this.responseData = responseData;
+ this.throttleTime = DEFAULT_THROTTLE_TIME;
+ }
+
+ /**
+ * Constructor for Version 1
+ * @param responseData fetched data grouped by topic-partition
+ * @param throttleTime Time in milliseconds the response was throttled
+ */
+ public FetchResponse(Map<TopicPartition, PartitionData> responseData, int throttleTime) {
super(new Struct(CURRENT_SCHEMA));
+ initCommonFields(responseData);
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+ this.responseData = responseData;
+ this.throttleTime = throttleTime;
+ }
+
+ public FetchResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
+ ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+ PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+ responseData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+ }
+
+ private void initCommonFields(Map<TopicPartition, PartitionData> responseData) {
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<Struct>();
@@ -94,32 +144,22 @@ public class FetchResponse extends AbstractRequestResponse {
topicArray.add(topicData);
}
struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
- this.responseData = responseData;
}
- public FetchResponse(Struct struct) {
- super(struct);
- responseData = new HashMap<TopicPartition, PartitionData>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
- long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
- ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
- PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- }
public Map<TopicPartition, PartitionData> responseData() {
return responseData;
}
+ public int getThrottleTime() {
+ return this.throttleTime;
+ }
+
public static FetchResponse parse(ByteBuffer buffer) {
return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
+
+ public static FetchResponse parse(ByteBuffer buffer, int version) {
+ return new FetchResponse((Struct) ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 715504b..5663f2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ProduceRequest extends AbstractRequest {
+public class ProduceRequest extends AbstractRequest {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
private static final String ACKS_KEY_NAME = "acks";
@@ -103,7 +103,7 @@ public class ProduceRequest extends AbstractRequest {
switch (versionId) {
case 0:
- return new ProduceResponse(responseMap);
+ return new ProduceResponse(responseMap, 0);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index febfc70..2868550 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -25,6 +25,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * This wrapper supports both v0 and v1 of ProduceResponse.
+ */
public class ProduceResponse extends AbstractRequestResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
@@ -33,12 +36,14 @@ public class ProduceResponse extends AbstractRequestResponse {
// topic level field names
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
public static final long INVALID_OFFSET = -1L;
+ private static final int DEFAULT_THROTTLE_TIME = 0;
/**
* Possible error code:
@@ -49,28 +54,30 @@ public class ProduceResponse extends AbstractRequestResponse {
private static final String BASE_OFFSET_KEY_NAME = "base_offset";
private final Map<TopicPartition, PartitionResponse> responses;
+ private final int throttleTime;
+ /**
+ * Constructor for Version 0
+ * @param responses Produced data grouped by topic-partition
+ */
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
+ super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0)));
+ initCommonFields(responses);
+ this.responses = responses;
+ this.throttleTime = DEFAULT_THROTTLE_TIME;
+ }
+
+ /**
+ * Constructor for Version 1
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTime Time in milliseconds the response was throttled
+ */
+ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTime) {
super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
- List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
- for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
- Struct topicData = struct.instance(RESPONSES_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, entry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
- PartitionResponse part = partitionEntry.getValue();
- Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
- .set(PARTITION_KEY_NAME, partitionEntry.getKey())
- .set(ERROR_CODE_KEY_NAME, part.errorCode)
- .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
- partitionArray.add(partStruct);
- }
- topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
- topicDatas.add(topicData);
- }
- struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
+ initCommonFields(responses);
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
this.responses = responses;
+ this.throttleTime = throttleTime;
}
public ProduceResponse(Struct struct) {
@@ -88,12 +95,37 @@ public class ProduceResponse extends AbstractRequestResponse {
responses.put(tp, new PartitionResponse(errorCode, offset));
}
}
+ this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ }
+
+ private void initCommonFields(Map<TopicPartition, PartitionResponse> responses) {
+ Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
+ List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
+ for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
+ PartitionResponse part = partitionEntry.getValue();
+ Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME).set(PARTITION_KEY_NAME,
+ partitionEntry.getKey()).set(
+ ERROR_CODE_KEY_NAME, part.errorCode).set(BASE_OFFSET_KEY_NAME, part.baseOffset);
+ partitionArray.add(partStruct);
+ }
+ topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
+ topicDatas.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
}
public Map<TopicPartition, PartitionResponse> responses() {
return this.responses;
}
+ public int getThrottleTime() {
+ return this.throttleTime;
+ }
+
public static final class PartitionResponse {
public short errorCode;
public long baseOffset;
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a7c83ca..22712bb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -22,9 +22,11 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -54,9 +56,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class FetcherTest {
-
private String topicName = "test";
private String groupId = "test-group";
+ private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
private TopicPartition tp = new TopicPartition(topicName, 0);
private int minBytes = 1;
private int maxWaitMs = 0;
@@ -70,24 +72,25 @@ public class FetcherTest {
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private Metrics metrics = new Metrics(time);
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
+ private static final double EPSILON = 0.0001;
private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(consumerClient,
- minBytes,
- maxWaitMs,
- fetchSize,
- true, // check crc
- new ByteArrayDeserializer(),
- new ByteArrayDeserializer(),
- metadata,
- subscriptions,
- metrics,
- "consumer" + groupId,
- metricTags,
- time,
- retryBackoffMs);
+ minBytes,
+ maxWaitMs,
+ fetchSize,
+ true, // check crc
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(),
+ metadata,
+ subscriptions,
+ metrics,
+ "consumer" + groupId,
+ metricTags,
+ time,
+ retryBackoffMs);
@Before
public void setup() throws Exception {
@@ -109,7 +112,7 @@ public class FetcherTest {
// normal fetch
fetcher.initFetches(cluster);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(3, records.size());
@@ -132,7 +135,7 @@ public class FetcherTest {
// Now the rebalance happens and fetch positions are cleared
subscriptions.changePartitionAssignment(Arrays.asList(tp));
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
// The active fetch should be ignored since its position is no longer valid
@@ -147,7 +150,7 @@ public class FetcherTest {
fetcher.initFetches(cluster);
subscriptions.pause(tp);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
assertNull(fetcher.fetchedRecords().get(tp));
}
@@ -168,7 +171,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -180,7 +183,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -192,7 +195,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
@@ -206,7 +209,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true);
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
@@ -236,7 +239,7 @@ public class FetcherTest {
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
- listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
@@ -250,7 +253,7 @@ public class FetcherTest {
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
- listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
@@ -264,7 +267,7 @@ public class FetcherTest {
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
- listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
@@ -279,11 +282,11 @@ public class FetcherTest {
// First request gets a disconnect
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
- listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
// Next one succeeds
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
- listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
@@ -302,6 +305,32 @@ public class FetcherTest {
assertEquals(cluster.topics().size(), allTopics.size());
}
+ /*
+ * Send multiple requests. Verify that the client side quota metrics have the right values
+ */
+ @Test
+ public void testQuotaMetrics() throws Exception {
+ List<ConsumerRecord<byte[], byte[]>> records;
+ subscriptions.subscribe(tp);
+ subscriptions.seek(tp, 0);
+
+ // normal fetch
+ for (int i = 1; i < 4; i++) {
+ fetcher.initFetches(cluster);
+
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
+ consumerClient.poll(0);
+ records = fetcher.fetchedRecords().get(tp);
+ assertEquals(3, records.size());
+ }
+
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags));
+ KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags));
+ assertEquals(200, avgMetric.value(), EPSILON);
+ assertEquals(300, maxMetric.value(), EPSILON);
+ }
+
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
// matches any list offset request with the provided timestamp
return new MockClient.RequestMatcher() {
@@ -322,10 +351,8 @@ public class FetcherTest {
return response.toStruct();
}
- private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
- FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
+ private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
+ FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), throttleTime);
return response.toStruct();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8b1805d..aa44991 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -26,7 +26,9 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -43,6 +45,9 @@ public class SenderTest {
private static final short ACKS_ALL = -1;
private static final int MAX_RETRIES = 0;
private static final int REQUEST_TIMEOUT_MS = 10000;
+ private static final String CLIENT_ID = "clientId";
+ private static final String METRIC_GROUP = "producer-metrics";
+ private static final double EPS = 0.0001;
private TopicPartition tp = new TopicPartition("test", 0);
private MockTime time = new MockTime();
@@ -62,11 +67,12 @@ public class SenderTest {
REQUEST_TIMEOUT_MS,
metrics,
time,
- "clientId");
+ CLIENT_ID);
@Before
public void setup() {
metadata.update(cluster, time.milliseconds());
+ metricTags.put("client-id", CLIENT_ID);
}
@Test
@@ -76,7 +82,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
- client.respond(produceResponse(tp, offset, Errors.NONE.code()));
+ client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
sender.run(time.milliseconds());
assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
sender.run(time.milliseconds());
@@ -84,6 +90,25 @@ public class SenderTest {
assertEquals(offset, future.get().offset());
}
+ /*
+ * Send multiple requests. Verify that the client side quota metrics have the right values
+ */
+ @Test
+ public void testQuotaMetrics() throws Exception {
+ final long offset = 0;
+ for (int i = 1; i <= 3; i++) {
+ Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
+ sender.run(time.milliseconds()); // send produce request
+ client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i));
+ sender.run(time.milliseconds());
+ }
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ KafkaMetric avgMetric = allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", metricTags));
+ KafkaMetric maxMetric = allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", metricTags));
+ assertEquals(200, avgMetric.value(), EPS);
+ assertEquals(300, maxMetric.value(), EPS);
+ }
+
@Test
public void testRetries() throws Exception {
// create a sender with retries = 1
@@ -110,7 +135,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // resend
assertEquals(1, client.inFlightRequestCount());
long offset = 0;
- client.respond(produceResponse(tp, offset, Errors.NONE.code()));
+ client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
sender.run(time.milliseconds());
assertTrue("Request should have retried and completed", future.isDone());
assertEquals(offset, future.get().offset());
@@ -138,10 +163,10 @@ public class SenderTest {
}
}
- private Struct produceResponse(TopicPartition tp, long offset, int error) {
+ private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
- ProduceResponse response = new ProduceResponse(partResp);
+ ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs);
return response.toStruct();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8b2aca8..9e92da6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -18,7 +18,9 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
import org.junit.Test;
import java.lang.reflect.Method;
@@ -77,6 +79,37 @@ public class RequestResponseTest {
}
}
+ @Test
+ public void produceResponseVersionTest() {
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+ responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
+
+ ProduceResponse v0Response = new ProduceResponse(responseData);
+ ProduceResponse v1Response = new ProduceResponse(responseData, 10);
+ assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime());
+ assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime());
+ assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0), v0Response.toStruct().schema());
+ assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 1), v1Response.toStruct().schema());
+ assertEquals("Response data does not match", responseData, v0Response.responses());
+ assertEquals("Response data does not match", responseData, v1Response.responses());
+ }
+
+ @Test
+ public void fetchResponseVersionTest() {
+ Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+ responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
+
+ FetchResponse v0Response = new FetchResponse(responseData);
+ FetchResponse v1Response = new FetchResponse(responseData, 10);
+ assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime());
+ assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime());
+ assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0), v0Response.toStruct().schema());
+ assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 1), v1Response.toStruct().schema());
+ assertEquals("Response data does not match", responseData, v0Response.responseData());
+ assertEquals("Response data does not match", responseData, v1Response.responseData());
+ }
+
+
private AbstractRequestResponse createRequestHeader() {
return new RequestHeader((short) 10, (short) 1, "", 10);
}
@@ -103,7 +136,7 @@ public class RequestResponseTest {
private AbstractRequestResponse createFetchResponse() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
- return new FetchResponse(responseData);
+ return new FetchResponse(responseData, 0);
}
private AbstractRequest createHeartBeatRequest() {
@@ -182,6 +215,6 @@ public class RequestResponseTest {
private AbstractRequestResponse createProduceResponse() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
- return new ProduceResponse(responseData);
+ return new ProduceResponse(responseData, 0);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 5b38f85..36e288f 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -31,7 +31,7 @@ import scala.collection.immutable.Map
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
object FetchRequest {
- val CurrentVersion = 0.shortValue
+ val CurrentVersion = 1.shortValue
val DefaultMaxWait = 0
val DefaultMinBytes = 0
val DefaultCorrelationId = 0
@@ -170,7 +170,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
@nonthreadsafe
class FetchRequestBuilder() {
private val correlationId = new AtomicInteger(0)
- private val versionId = FetchRequest.CurrentVersion
+ private var versionId = FetchRequest.CurrentVersion
private var clientId = ConsumerConfig.DefaultClientId
private var replicaId = Request.OrdinaryConsumerId
private var maxWait = FetchRequest.DefaultMaxWait
@@ -205,6 +205,11 @@ class FetchRequestBuilder() {
this
}
+ def requestVersion(versionId: Short): FetchRequestBuilder = {
+ this.versionId = versionId
+ this
+ }
+
def build() = {
val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
requestMap.clear()
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index b9efec2..2c07033 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -73,7 +73,7 @@ class PartitionDataSend(val partitionId: Int,
var written = 0L
if(buffer.hasRemaining)
written += channel.write(buffer)
- if(!buffer.hasRemaining && messagesSentSize < messageSize) {
+ if (!buffer.hasRemaining && messagesSentSize < messageSize) {
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)
messagesSentSize += bytesSent
written += bytesSent
@@ -152,12 +152,10 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
object FetchResponse {
- val headerSize =
- 4 + /* correlationId */
- 4 /* topic count */
-
- def readFrom(buffer: ByteBuffer): FetchResponse = {
+ // The request version is used to determine which fields we can expect in the response
+ def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = {
val correlationId = buffer.getInt
+ val throttleTime = if (requestVersion > 0) buffer.getInt else 0
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topicData = TopicData.readFrom(buffer)
@@ -166,28 +164,56 @@ object FetchResponse {
(TopicAndPartition(topicData.topic, partitionId), partitionData)
}
})
- FetchResponse(correlationId, Map(pairs:_*))
+ FetchResponse(correlationId, Map(pairs:_*), requestVersion, throttleTime)
+ }
+
+ // Returns the size of the response header
+ def headerSize(requestVersion: Int): Int = {
+ val throttleTimeSize = if (requestVersion > 0) 4 else 0
+ 4 + /* correlationId */
+ 4 + /* topic count */
+ throttleTimeSize
+ }
+
+ // Returns the size of entire fetch response in bytes (including the header size)
+ def responseSize(dataGroupedByTopic: Map[String, Map[TopicAndPartition, FetchResponsePartitionData]],
+ requestVersion: Int): Int = {
+ headerSize(requestVersion) +
+ dataGroupedByTopic.foldLeft(0) { case (folded, (topic, partitionDataMap)) =>
+ val topicData = TopicData(topic, partitionDataMap.map {
+ case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData)
+ })
+ folded + topicData.sizeInBytes
+ }
}
}
-case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData])
+case class FetchResponse(correlationId: Int,
+ data: Map[TopicAndPartition, FetchResponsePartitionData],
+ requestVersion: Int = 0,
+ throttleTimeMs: Int = 0)
extends RequestOrResponse() {
/**
* Partitions the data into a map of maps (one for each topic).
*/
- lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
-
- val sizeInBytes =
- FetchResponse.headerSize +
- dataGroupedByTopic.foldLeft(0) ((folded, curr) => {
- val topicData = TopicData(curr._1, curr._2.map {
- case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData)
- })
- folded + topicData.sizeInBytes
- })
+ lazy val dataGroupedByTopic = data.groupBy{ case (topicAndPartition, fetchData) => topicAndPartition.topic }
+ val headerSizeInBytes = FetchResponse.headerSize(requestVersion)
+ lazy val sizeInBytes = FetchResponse.responseSize(dataGroupedByTopic, requestVersion)
/*
+ * Writes the header of the FetchResponse to the input buffer
+ */
+ def writeHeaderTo(buffer: ByteBuffer) = {
+ buffer.putInt(sizeInBytes)
+ buffer.putInt(correlationId)
+ // Include the throttleTime only if the client can read it
+ if (requestVersion > 0)
+ buffer.putInt(throttleTimeMs)
+
+ buffer.putInt(dataGroupedByTopic.size) // topic count
+ }
+ /*
* FetchResponse uses [sendfile](http://man7.org/linux/man-pages/man2/sendfile.2.html)
* api for data transfer through the FetchResponseSend, so `writeTo` aren't actually being used.
* It is implemented as an empty function to conform to `RequestOrResponse.writeTo`
@@ -231,10 +257,9 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte
override def destination = dest
- private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
- buffer.putInt(payloadSize)
- buffer.putInt(fetchResponse.correlationId)
- buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
+ // The throttleTimeSize will be 0 if the request was made from a client sending a V0 style request
+ private val buffer = ByteBuffer.allocate(4 /* for size */ + fetchResponse.headerSizeInBytes)
+ fetchResponse.writeHeaderTo(buffer)
buffer.rewind()
private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index c866180..7fb143e 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -26,7 +26,7 @@ import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
object ProducerRequest {
- val CurrentVersion = 0.shortValue
+ val CurrentVersion = 1.shortValue
def readFrom(buffer: ByteBuffer): ProducerRequest = {
val versionId: Short = buffer.getShort
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 5d1fac4..7719f30 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -23,6 +23,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.ApiUtils._
object ProducerResponse {
+ // readFrom assumes that the response is written using V1 format
def readFrom(buffer: ByteBuffer): ProducerResponse = {
val correlationId = buffer.getInt
val topicCount = buffer.getInt
@@ -37,13 +38,17 @@ object ProducerResponse {
})
})
- ProducerResponse(correlationId, Map(statusPairs:_*))
+ val throttleTime = buffer.getInt
+ ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, throttleTime)
}
}
case class ProducerResponseStatus(var error: Short, offset: Long)
-case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus])
+case class ProducerResponse(correlationId: Int,
+ status: Map[TopicAndPartition, ProducerResponseStatus],
+ requestVersion: Int = 0,
+ throttleTime: Int = 0)
extends RequestOrResponse() {
/**
@@ -54,6 +59,7 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P
def hasError = status.values.exists(_.error != ErrorMapping.NoError)
val sizeInBytes = {
+ val throttleTimeSize = if (requestVersion > 0) 4 else 0
val groupedStatus = statusGroupedByTopic
4 + /* correlation id */
4 + /* topic count */
@@ -66,7 +72,8 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P
2 + /* error code */
8 /* offset */
}
- })
+ }) +
+ throttleTimeSize
}
def writeTo(buffer: ByteBuffer) {
@@ -85,6 +92,9 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P
buffer.putLong(nextOffset)
}
})
+ // Throttle time is only supported on V1 style requests
+ if (requestVersion > 0)
+ buffer.putInt(throttleTime)
}
override def describe(details: Boolean):String = { toString }
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 7ebc040..4e1833a 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -131,7 +131,7 @@ class SimpleConsumer(val host: String,
response = sendRequest(request)
}
}
- val fetchResponse = FetchResponse.readFrom(response.payload())
+ val fetchResponse = FetchResponse.readFrom(response.payload(), request.versionId)
val fetchedSize = fetchResponse.sizeInBytes
fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f843061..dca975c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -37,8 +37,17 @@ import com.yammer.metrics.core.Gauge
/**
* Abstract class for fetching data from multiple partitions from the same broker.
*/
-abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, socketTimeout: Int, socketBufferSize: Int,
- fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0,
+abstract class AbstractFetcherThread(name: String,
+ clientId: String,
+ sourceBroker: BrokerEndPoint,
+ socketTimeout: Int,
+ socketBufferSize: Int,
+ fetchSize: Int,
+ fetcherBrokerId: Int = -1,
+ maxWait: Int = 0,
+ minBytes: Int = 1,
+ fetchBackOffMs: Int = 0,
+ fetchRequestVersion: Short = FetchRequest.CurrentVersion,
isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map
@@ -52,7 +61,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
clientId(clientId).
replicaId(fetcherBrokerId).
maxWait(maxWait).
- minBytes(minBytes)
+ minBytes(minBytes).
+ requestVersion(fetchRequestVersion)
/* callbacks to be defined in subclass */
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 9f8473f..016caaf 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -95,7 +95,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
if (response != null) {
// Decrement the size of the delay queue
delayQueueSensor.record(-1)
- trace("Response throttled for: " + response.delayTimeMs + " ms")
+ trace("Response throttled for: " + response.throttleTimeMs + " ms")
response.execute()
}
}
@@ -110,25 +110,25 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* @return Number of milliseconds to delay the response in case of Quota violation.
* Zero otherwise
*/
- def recordAndMaybeThrottle(clientId: String, value: Int, callback: => Unit): Int = {
+ def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = {
val clientSensors = getOrCreateQuotaSensors(clientId)
- var delayTimeMs = 0L
+ var throttleTimeMs = 0
try {
clientSensors.quotaSensor.record(value)
// trigger the callback immediately if quota is not violated
- callback
+ callback(0)
} catch {
case qve: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
- delayTimeMs = delayTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
- delayQueue.add(new ThrottledResponse(time, delayTimeMs, callback))
+ throttleTimeMs = throttleTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
+ delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
delayQueueSensor.record()
- clientSensors.throttleTimeSensor.record(delayTimeMs)
+ clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
- logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), delayTimeMs))
+ logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
- delayTimeMs.toInt
+ throttleTimeMs
}
/*
@@ -139,12 +139,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* we need to add a delay of X to W such that O * W / (W + X) = T.
* Solving for X, we get X = (O - T)/T * W.
*/
- private def delayTime(metricValue: Double, config: MetricConfig): Long =
- {
+ private def throttleTime(metricValue: Double, config: MetricConfig): Int = {
val quota = config.quota()
val difference = metricValue - quota.bound
val time = difference / quota.bound * config.timeWindowMs() * config.samples()
- time.round
+ time.round.toInt
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 67f0cad..e727a6f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -259,18 +259,21 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
var errorInResponse = false
- responseStatus.foreach { case (topicAndPartition, status) =>
+ responseStatus.foreach
+ { case (topicAndPartition, status) =>
// we only print warnings for known errors here; if it is unknown, it will cause
// an error message in the replica manager
if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) {
- debug("Produce request with correlation id %d from client %s on partition %s failed due to %s"
- .format(produceRequest.correlationId, produceRequest.clientId,
- topicAndPartition, ErrorMapping.exceptionNameFor(status.error)))
+ debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
+ produceRequest.correlationId,
+ produceRequest.clientId,
+ topicAndPartition,
+ ErrorMapping.exceptionNameFor(status.error)))
errorInResponse = true
}
}
- def produceResponseCallback {
+ def produceResponseCallback(delayTimeMs: Int) {
if (produceRequest.requiredAcks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
@@ -285,12 +288,19 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.noOperation(request.processor, request)
}
} else {
- val response = ProducerResponse(produceRequest.correlationId, responseStatus)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ val response = ProducerResponse(produceRequest.correlationId,
+ responseStatus,
+ produceRequest.versionId,
+ delayTimeMs)
+ requestChannel.sendResponse(new RequestChannel.Response(request,
+ new RequestOrResponseSend(request.connectionId,
+ response)))
}
}
- quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback)
+ quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
+ numBytesAppended,
+ produceResponseCallback)
}
// only allow appending to internal topic partitions
@@ -332,21 +342,20 @@ class KafkaApis(val requestChannel: RequestChannel,
BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
}
- val response = FetchResponse(fetchRequest.correlationId, responsePartitionData)
- def fetchResponseCallback {
+ def fetchResponseCallback(delayTimeMs: Int) {
+ val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTimeMs)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
}
// Do not throttle replication traffic
if (fetchRequest.isFromFollower) {
- fetchResponseCallback
+ fetchResponseCallback(0)
} else {
- quotaManagers.get(RequestKeys.FetchKey) match {
- case Some(quotaManager) =>
- quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback)
- case None =>
- warn("Cannot throttle Api key %s".format(RequestKeys.nameForKey(RequestKeys.FetchKey)))
- }
+ quotaManagers(RequestKeys.FetchKey).recordAndMaybeThrottle(fetchRequest.clientId,
+ FetchResponse.responseSize(responsePartitionData
+ .groupBy(_._1.topic),
+ fetchRequest.versionId),
+ fetchResponseCallback)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index fae22d2..711d749 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
import kafka.cluster.BrokerEndPoint
import kafka.log.LogConfig
import kafka.message.ByteBufferMessageSet
-import kafka.api.{OffsetRequest, FetchResponsePartitionData}
+import kafka.api.{KAFKA_083, OffsetRequest, FetchResponsePartitionData}
import kafka.common.{KafkaStorageException, TopicAndPartition}
class ReplicaFetcherThread(name:String,
@@ -38,6 +38,8 @@ class ReplicaFetcherThread(name:String,
maxWait = brokerConfig.replicaFetchWaitMaxMs,
minBytes = brokerConfig.replicaFetchMinBytes,
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
+ fetchRequestVersion =
+ if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0,
isInterruptible = false) {
// process fetched data
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d829e18..c195536 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -31,7 +31,6 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.metrics.Metrics
import scala.collection._
@@ -304,7 +303,6 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) {
if (isValidRequiredAcks(requiredAcks)) {
-
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ThrottledResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala
index 1f80d54..214fa1f 100644
--- a/core/src/main/scala/kafka/server/ThrottledResponse.scala
+++ b/core/src/main/scala/kafka/server/ThrottledResponse.scala
@@ -25,13 +25,13 @@ import org.apache.kafka.common.utils.Time
/**
* Represents a request whose response has been delayed.
* @param time @Time instance to use
- * @param delayTimeMs delay associated with this request
+ * @param throttleTimeMs delay associated with this request
* @param callback Callback to trigger after delayTimeMs milliseconds
*/
-private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Long, callback: => Unit) extends Delayed {
- val endTime = time.milliseconds + delayTimeMs
+private[server] class ThrottledResponse(val time: Time, val throttleTimeMs: Int, callback: Int => Unit) extends Delayed {
+ val endTime = time.milliseconds + throttleTimeMs
- def execute() = callback
+ def execute() = callback(throttleTimeMs)
override def getDelay(unit: TimeUnit): Long = {
unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS)
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index b4c2a22..b7e7967 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -18,9 +18,12 @@
package kafka.api
+import java.nio.channels.GatheringByteChannel
+
import kafka.cluster.{BrokerEndPoint, EndPoint, Broker}
import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
import kafka.common._
+import kafka.consumer.FetchRequestAndResponseStatsRegistry
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.utils.SystemTime
@@ -150,7 +153,7 @@ object SerializationTestUtils {
ProducerResponse(1, Map(
TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
- ))
+ ), ProducerRequest.CurrentVersion, 100)
def createTestFetchRequest: FetchRequest = {
new FetchRequest(requestInfo = requestInfos)
@@ -304,4 +307,39 @@ class RequestResponseSerializationTest extends JUnitSuite {
assertEquals("The original and deserialized for " + original.getClass.getSimpleName + " should be the same.", original, deserialized)
}
}
+
+ @Test
+ def testProduceResponseVersion() {
+ val oldClientResponse = ProducerResponse(1, Map(
+ TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001),
+ TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001)
+ ))
+
+ val newClientResponse = ProducerResponse(1, Map(
+ TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001),
+ TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001)
+ ), 1, 100)
+
+ // new response should have 4 bytes more than the old response since delayTime is an INT32
+ assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
+
+ val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes)
+ newClientResponse.writeTo(buffer)
+ buffer.rewind()
+ assertEquals(ProducerResponse.readFrom(buffer).throttleTime, 100)
+ }
+
+ @Test
+ def testFetchResponseVersion() {
+ val oldClientResponse = FetchResponse(1, Map(
+ TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
+ ), 0)
+
+ val newClientResponse = FetchResponse(1, Map(
+ TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
+ ), 1, 100)
+
+ // new response should have 4 bytes more than the old response since delayTime is an INT32
+ assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index caf98e8..997928c 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -30,7 +30,7 @@ class ClientQuotaManagerTest {
quotaBytesPerSecondOverrides = "p1=2000,p2=4000")
var numCallbacks: Int = 0
- def callback {
+ def callback(delayTimeMs: Int) {
numCallbacks += 1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
index c4b5803..778f3f8 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
@@ -32,7 +32,7 @@ class ThrottledResponseExpirationTest {
Collections.emptyList(),
time)
- def callback {
+ def callback(delayTimeMs: Int) {
numCallbacks += 1
}
@@ -75,9 +75,9 @@ class ThrottledResponseExpirationTest {
val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback)
val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback)
val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback)
- Assert.assertEquals(10, t1.delayTimeMs)
- Assert.assertEquals(20, t2.delayTimeMs)
- Assert.assertEquals(20, t3.delayTimeMs)
+ Assert.assertEquals(10, t1.throttleTimeMs)
+ Assert.assertEquals(20, t2.throttleTimeMs)
+ Assert.assertEquals(20, t3.throttleTimeMs)
for(itr <- 0 to 2) {
Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS))