You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/08/26 03:33:32 UTC

kafka git commit: KAFKA-2136; Add throttle time (on quota violation) in fetch/produce responses; reviewed by Joel Koshy, Dong Lin and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 5d453ba6d -> 436b7ddc3


KAFKA-2136; Add throttle time (on quota violation) in fetch/produce
responses; reviewed by Joel Koshy, Dong Lin and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/436b7ddc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/436b7ddc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/436b7ddc

Branch: refs/heads/trunk
Commit: 436b7ddc386eb688ba0f12836710f5e4bcaa06c8
Parents: 5d453ba
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Aug 25 17:52:39 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Aug 25 18:33:10 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 13 +++
 .../clients/producer/internals/Sender.java      | 14 ++++
 .../apache/kafka/common/protocol/Protocol.java  | 38 +++++++--
 .../kafka/common/requests/FetchRequest.java     |  2 +-
 .../kafka/common/requests/FetchResponse.java    | 78 +++++++++++++-----
 .../kafka/common/requests/ProduceRequest.java   |  4 +-
 .../kafka/common/requests/ProduceResponse.java  | 68 +++++++++++----
 .../clients/consumer/internals/FetcherTest.java | 87 +++++++++++++-------
 .../clients/producer/internals/SenderTest.java  | 35 ++++++--
 .../common/requests/RequestResponseTest.java    | 37 ++++++++-
 .../src/main/scala/kafka/api/FetchRequest.scala |  9 +-
 .../main/scala/kafka/api/FetchResponse.scala    | 69 +++++++++++-----
 .../main/scala/kafka/api/ProducerRequest.scala  |  2 +-
 .../main/scala/kafka/api/ProducerResponse.scala | 16 +++-
 .../scala/kafka/consumer/SimpleConsumer.scala   |  2 +-
 .../kafka/server/AbstractFetcherThread.scala    | 16 +++-
 .../scala/kafka/server/ClientQuotaManager.scala | 23 +++---
 .../src/main/scala/kafka/server/KafkaApis.scala | 43 ++++++----
 .../kafka/server/ReplicaFetcherThread.scala     |  4 +-
 .../scala/kafka/server/ReplicaManager.scala     |  2 -
 .../scala/kafka/server/ThrottledResponse.scala  |  8 +-
 .../api/RequestResponseSerializationTest.scala  | 40 ++++++++-
 .../kafka/server/ClientQuotaManagerTest.scala   |  2 +-
 .../ThrottledResponseExpirationTest.scala       |  8 +-
 24 files changed, 463 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9dc6697..1ae6d03 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -453,6 +453,7 @@ public class Fetcher<K, V> {
             }
             this.sensors.bytesFetched.record(totalBytes);
             this.sensors.recordsFetched.record(totalCount);
+            this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
         }
         this.sensors.fetchLatency.record(resp.requestLatencyMs());
     }
@@ -493,6 +494,7 @@ public class Fetcher<K, V> {
         public final Sensor recordsFetched;
         public final Sensor fetchLatency;
         public final Sensor recordsFetchLag;
+        public final Sensor fetchThrottleTimeSensor;
 
 
         public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
@@ -542,6 +544,17 @@ public class Fetcher<K, V> {
                 this.metricGrpName,
                 "The maximum lag in terms of number of records for any partition in this window",
                 tags), new Max());
+
+            this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
+            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg",
+                                                         this.metricGrpName,
+                                                         "The average throttle time in ms",
+                                                         tags), new Avg());
+
+            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max",
+                                                         this.metricGrpName,
+                                                         "The maximum throttle time in ms",
+                                                         tags), new Max());
         }
 
         public void recordTopicFetchMetrics(String topic, int bytes, int records) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 0baf16e..d2e64f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -253,6 +253,8 @@ public class Sender implements Runnable {
                     completeBatch(batch, error, partResp.baseOffset, correlationId, now);
                 }
                 this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
+                this.sensors.recordThrottleTime(response.request().request().destination(),
+                                                produceResponse.getThrottleTime());
             } else {
                 // this is the acks = 0 case, just complete all requests
                 for (RecordBatch batch : batches.values())
@@ -352,6 +354,7 @@ public class Sender implements Runnable {
         public final Sensor batchSizeSensor;
         public final Sensor compressionRateSensor;
         public final Sensor maxRecordSizeSensor;
+        public final Sensor produceThrottleTimeSensor;
 
         public SenderMetrics(Metrics metrics) {
             this.metrics = metrics;
@@ -381,6 +384,12 @@ public class Sender implements Runnable {
             m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags);
             this.requestTimeSensor.add(m, new Max());
 
+            this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
+            m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags);
+            this.produceThrottleTimeSensor.add(m, new Avg());
+            m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags);
+            this.produceThrottleTimeSensor.add(m, new Max());
+
             this.recordsPerRequestSensor = metrics.sensor("records-per-request");
             m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags);
             this.recordsPerRequestSensor.add(m, new Rate());
@@ -515,6 +524,11 @@ public class Sender implements Runnable {
                     nodeRequestTime.record(latency, now);
             }
         }
+
+        public void recordThrottleTime(String node, long throttleTimeMs) {
+            this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3dc8b01..048d761 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -107,9 +107,25 @@ public class Protocol {
                                                                                                                                       INT16),
                                                                                                                             new Field("base_offset",
                                                                                                                                       INT64))))))));
-
-    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
-    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
+    public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;
+
+    public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
+                                                                          new ArrayOf(new Schema(new Field("topic", STRING),
+                                                                                                 new Field("partition_responses",
+                                                                                                           new ArrayOf(new Schema(new Field("partition",
+                                                                                                                                            INT32),
+                                                                                                                                  new Field("error_code",
+                                                                                                                                            INT16),
+                                                                                                                                  new Field("base_offset",
+                                                                                                                                            INT64))))))),
+                                                                new Field("throttle_time_ms",
+                                                                          INT32,
+                                                                          "Duration in milliseconds for which the request was throttled" +
+                                                                              " due to quota violation. (Zero if the request did not violate any quota.)",
+                                                                          0));
+
+    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1};
+    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1};
 
     /* Offset commit api */
     public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -342,6 +358,9 @@ public class Protocol {
                                                                        new ArrayOf(FETCH_REQUEST_TOPIC_V0),
                                                                        "Topics to fetch."));
 
+    // The V1 Fetch Request body is the same as V0.
+    // Only the version number is incremented to indicate a newer client
+    public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
     public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                   INT32,
                                                                                   "Topic partition id."),
@@ -357,9 +376,16 @@ public class Protocol {
 
     public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
                                                                         new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
+    public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
+                                                                        INT32,
+                                                                        "Duration in milliseconds for which the request was throttled" +
+                                                                            " due to quota violation. (Zero if the request did not violate any quota.)",
+                                                                        0),
+                                                              new Field("responses",
+                                                                      new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
 
     /* Consumer metadata api */
     public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index df073a0..feb4109 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -132,7 +132,7 @@ public class FetchRequest extends AbstractRequest {
 
         switch (versionId) {
             case 0:
-                return new FetchResponse(responseData);
+                return new FetchResponse(responseData, 0);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index eb8951f..7b78415 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -29,6 +29,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * This wrapper supports both v0 and v1 of FetchResponse.
+ */
 public class FetchResponse extends AbstractRequestResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
@@ -37,12 +40,16 @@ public class FetchResponse extends AbstractRequestResponse {
     // topic level field names
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partition_responses";
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
 
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
-    /**
+    // Default throttle time
+    private static final int DEFAULT_THROTTLE_TIME = 0;
+
+  /**
      * Possible error code:
      *
      *  OFFSET_OUT_OF_RANGE (1)
@@ -59,6 +66,7 @@ public class FetchResponse extends AbstractRequestResponse {
     public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
 
     private final Map<TopicPartition, PartitionData> responseData;
+    private final int throttleTime;
 
     public static final class PartitionData {
         public final short errorCode;
@@ -72,8 +80,50 @@ public class FetchResponse extends AbstractRequestResponse {
         }
     }
 
+    /**
+     * Constructor for Version 0
+     * @param responseData fetched data grouped by topic-partition
+     */
     public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0)));
+        initCommonFields(responseData);
+        this.responseData = responseData;
+        this.throttleTime = DEFAULT_THROTTLE_TIME;
+    }
+
+  /**
+   * Constructor for Version 1
+   * @param responseData fetched data grouped by topic-partition
+   * @param throttleTime Time in milliseconds the response was throttled
+   */
+    public FetchResponse(Map<TopicPartition, PartitionData> responseData, int throttleTime) {
         super(new Struct(CURRENT_SCHEMA));
+        initCommonFields(responseData);
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+        this.responseData = responseData;
+        this.throttleTime = throttleTime;
+    }
+
+    public FetchResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
+                ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+                PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+                responseData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+        this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+    }
+
+    private void initCommonFields(Map<TopicPartition, PartitionData> responseData) {
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<Struct>();
@@ -94,32 +144,22 @@ public class FetchResponse extends AbstractRequestResponse {
             topicArray.add(topicData);
         }
         struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
     }
 
-    public FetchResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
-                ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
-                PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
 
     public Map<TopicPartition, PartitionData> responseData() {
         return responseData;
     }
 
+    public int getThrottleTime() {
+        return this.throttleTime;
+    }
+
     public static FetchResponse parse(ByteBuffer buffer) {
         return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
     }
+
+    public static FetchResponse parse(ByteBuffer buffer, int version) {
+        return new FetchResponse((Struct) ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 715504b..5663f2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ProduceRequest  extends AbstractRequest {
+public class ProduceRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
     private static final String ACKS_KEY_NAME = "acks";
@@ -103,7 +103,7 @@ public class ProduceRequest  extends AbstractRequest {
 
         switch (versionId) {
             case 0:
-                return new ProduceResponse(responseMap);
+                return new ProduceResponse(responseMap, 0);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index febfc70..2868550 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -25,6 +25,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * This wrapper supports both v0 and v1 of ProduceResponse.
+ */
 public class ProduceResponse extends AbstractRequestResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
@@ -33,12 +36,14 @@ public class ProduceResponse extends AbstractRequestResponse {
     // topic level field names
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
 
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     public static final long INVALID_OFFSET = -1L;
+    private static final int DEFAULT_THROTTLE_TIME = 0;
 
     /**
      * Possible error code:
@@ -49,28 +54,30 @@ public class ProduceResponse extends AbstractRequestResponse {
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";
 
     private final Map<TopicPartition, PartitionResponse> responses;
+    private final int throttleTime;
 
+    /**
+     * Constructor for Version 0
+     * @param responses Produced data grouped by topic-partition
+     */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0)));
+        initCommonFields(responses);
+        this.responses = responses;
+        this.throttleTime = DEFAULT_THROTTLE_TIME;
+    }
+
+    /**
+     * Constructor for Version 1
+     * @param responses Produced data grouped by topic-partition
+     * @param throttleTime Time in milliseconds the response was throttled
+     */
+    public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTime) {
         super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
-        List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
-        for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
-                PartitionResponse part = partitionEntry.getValue();
-                Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
-                                       .set(PARTITION_KEY_NAME, partitionEntry.getKey())
-                                       .set(ERROR_CODE_KEY_NAME, part.errorCode)
-                                       .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
-                partitionArray.add(partStruct);
-            }
-            topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
-            topicDatas.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
+        initCommonFields(responses);
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
         this.responses = responses;
+        this.throttleTime = throttleTime;
     }
 
     public ProduceResponse(Struct struct) {
@@ -88,12 +95,37 @@ public class ProduceResponse extends AbstractRequestResponse {
                 responses.put(tp, new PartitionResponse(errorCode, offset));
             }
         }
+        this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
+    }
+
+    private void initCommonFields(Map<TopicPartition, PartitionResponse> responses) {
+        Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
+        List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
+        for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
+                PartitionResponse part = partitionEntry.getValue();
+                Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME).set(PARTITION_KEY_NAME,
+                                                                                         partitionEntry.getKey()).set(
+                    ERROR_CODE_KEY_NAME, part.errorCode).set(BASE_OFFSET_KEY_NAME, part.baseOffset);
+                partitionArray.add(partStruct);
+            }
+            topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
+            topicDatas.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
     }
 
     public Map<TopicPartition, PartitionResponse> responses() {
         return this.responses;
     }
 
+    public int getThrottleTime() {
+        return this.throttleTime;
+    }
+
     public static final class PartitionResponse {
         public short errorCode;
         public long baseOffset;

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a7c83ca..22712bb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -22,9 +22,11 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -54,9 +56,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class FetcherTest {
-
     private String topicName = "test";
     private String groupId = "test-group";
+    private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
     private TopicPartition tp = new TopicPartition(topicName, 0);
     private int minBytes = 1;
     private int maxWaitMs = 0;
@@ -70,24 +72,25 @@ public class FetcherTest {
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private Metrics metrics = new Metrics(time);
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
+    private static final double EPSILON = 0.0001;
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
 
     private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(consumerClient,
-        minBytes,
-        maxWaitMs,
-        fetchSize,
-        true, // check crc
-        new ByteArrayDeserializer(),
-        new ByteArrayDeserializer(),
-        metadata,
-        subscriptions,
-        metrics,
-        "consumer" + groupId,
-        metricTags,
-        time,
-        retryBackoffMs);
+                                                                          minBytes,
+                                                                          maxWaitMs,
+                                                                          fetchSize,
+                                                                          true, // check crc
+                                                                          new ByteArrayDeserializer(),
+                                                                          new ByteArrayDeserializer(),
+                                                                          metadata,
+                                                                          subscriptions,
+                                                                          metrics,
+                                                                          "consumer" + groupId,
+                                                                          metricTags,
+                                                                          time,
+                                                                          retryBackoffMs);
 
     @Before
     public void setup() throws Exception {
@@ -109,7 +112,7 @@ public class FetcherTest {
 
         // normal fetch
         fetcher.initFetches(cluster);
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(3, records.size());
@@ -132,7 +135,7 @@ public class FetcherTest {
 
         // Now the rebalance happens and fetch positions are cleared
         subscriptions.changePartitionAssignment(Arrays.asList(tp));
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
 
         // The active fetch should be ignored since its position is no longer valid
@@ -147,7 +150,7 @@ public class FetcherTest {
         fetcher.initFetches(cluster);
         subscriptions.pause(tp);
 
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         assertNull(fetcher.fetchedRecords().get(tp));
     }
@@ -168,7 +171,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -180,7 +183,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -192,7 +195,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -206,7 +209,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
 
@@ -236,7 +239,7 @@ public class FetcherTest {
         // with no commit position, we should reset using the default strategy defined above (EARLIEST)
 
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -250,7 +253,7 @@ public class FetcherTest {
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -264,7 +267,7 @@ public class FetcherTest {
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
 
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -279,11 +282,11 @@ public class FetcherTest {
 
         // First request gets a disconnect
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
+                               listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
 
         // Next one succeeds
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
-                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -302,6 +305,32 @@ public class FetcherTest {
         assertEquals(cluster.topics().size(), allTopics.size());
     }
 
+    /*
+     * Send multiple requests. Verify that the client side quota metrics have the right values
+     */
+    @Test
+    public void testQuotaMetrics() throws Exception {
+        List<ConsumerRecord<byte[], byte[]>> records;
+        subscriptions.subscribe(tp);
+        subscriptions.seek(tp, 0);
+
+        // normal fetch
+        for (int i = 1; i < 4; i++) {
+            fetcher.initFetches(cluster);
+
+            client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
+            consumerClient.poll(0);
+            records = fetcher.fetchedRecords().get(tp);
+            assertEquals(3, records.size());
+        }
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags));
+        KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags));
+        assertEquals(200, avgMetric.value(), EPSILON);
+        assertEquals(300, maxMetric.value(), EPSILON);
+    }
+
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
         // matches any list offset request with the provided timestamp
         return new MockClient.RequestMatcher() {
@@ -322,10 +351,8 @@ public class FetcherTest {
         return response.toStruct();
     }
 
-    private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
-        FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
+    private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
+        FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), throttleTime);
         return response.toStruct();
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8b1805d..aa44991 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -26,7 +26,9 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -43,6 +45,9 @@ public class SenderTest {
     private static final short ACKS_ALL = -1;
     private static final int MAX_RETRIES = 0;
     private static final int REQUEST_TIMEOUT_MS = 10000;
+    private static final String CLIENT_ID = "clientId";
+    private static final String METRIC_GROUP = "producer-metrics";
+    private static final double EPS = 0.0001;
 
     private TopicPartition tp = new TopicPartition("test", 0);
     private MockTime time = new MockTime();
@@ -62,11 +67,12 @@ public class SenderTest {
                                        REQUEST_TIMEOUT_MS,
                                        metrics,
                                        time,
-                                       "clientId");
+                                       CLIENT_ID);
 
     @Before
     public void setup() {
         metadata.update(cluster, time.milliseconds());
+        metricTags.put("client-id", CLIENT_ID);
     }
 
     @Test
@@ -76,7 +82,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
-        client.respond(produceResponse(tp, offset, Errors.NONE.code()));
+        client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
         sender.run(time.milliseconds());
         assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
         sender.run(time.milliseconds());
@@ -84,6 +90,25 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
     }
 
+    /*
+     * Send multiple requests. Verify that the client side quota metrics have the right values
+     */
+    @Test
+    public void testQuotaMetrics() throws Exception {
+        final long offset = 0;
+        for (int i = 1; i <= 3; i++) {
+            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
+            sender.run(time.milliseconds()); // send produce request
+            client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i));
+            sender.run(time.milliseconds());
+        }
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric avgMetric = allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", metricTags));
+        KafkaMetric maxMetric = allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", metricTags));
+        assertEquals(200, avgMetric.value(), EPS);
+        assertEquals(300, maxMetric.value(), EPS);
+    }
+
     @Test
     public void testRetries() throws Exception {
         // create a sender with retries = 1
@@ -110,7 +135,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // resend
         assertEquals(1, client.inFlightRequestCount());
         long offset = 0;
-        client.respond(produceResponse(tp, offset, Errors.NONE.code()));
+        client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
         sender.run(time.milliseconds());
         assertTrue("Request should have retried and completed", future.isDone());
         assertEquals(offset, future.get().offset());
@@ -138,10 +163,10 @@ public class SenderTest {
         }
     }
 
-    private Struct produceResponse(TopicPartition tp, long offset, int error) {
+    private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
         ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
-        ProduceResponse response = new ProduceResponse(partResp);
+        ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs);
         return response.toStruct();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8b2aca8..9e92da6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -18,7 +18,9 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -77,6 +79,37 @@ public class RequestResponseTest {
         }
     }
 
+    @Test
+    public void produceResponseVersionTest() {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
+
+        ProduceResponse v0Response = new ProduceResponse(responseData);
+        ProduceResponse v1Response = new ProduceResponse(responseData, 10);
+        assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime());
+        assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime());
+        assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0), v0Response.toStruct().schema());
+        assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 1), v1Response.toStruct().schema());
+        assertEquals("Response data does not match", responseData, v0Response.responses());
+        assertEquals("Response data does not match", responseData, v1Response.responses());
+    }
+
+    @Test
+    public void fetchResponseVersionTest() {
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
+
+        FetchResponse v0Response = new FetchResponse(responseData);
+        FetchResponse v1Response = new FetchResponse(responseData, 10);
+        assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime());
+        assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime());
+        assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0), v0Response.toStruct().schema());
+        assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 1), v1Response.toStruct().schema());
+        assertEquals("Response data does not match", responseData, v0Response.responseData());
+        assertEquals("Response data does not match", responseData, v1Response.responseData());
+    }
+
+
     private AbstractRequestResponse createRequestHeader() {
         return new RequestHeader((short) 10, (short) 1, "", 10);
     }
@@ -103,7 +136,7 @@ public class RequestResponseTest {
     private AbstractRequestResponse createFetchResponse() {
         Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
-        return new FetchResponse(responseData);
+        return new FetchResponse(responseData, 0);
     }
 
     private AbstractRequest createHeartBeatRequest() {
@@ -182,6 +215,6 @@ public class RequestResponseTest {
     private AbstractRequestResponse createProduceResponse() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
-        return new ProduceResponse(responseData);
+        return new ProduceResponse(responseData, 0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 5b38f85..36e288f 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -31,7 +31,7 @@ import scala.collection.immutable.Map
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 
 object FetchRequest {
-  val CurrentVersion = 0.shortValue
+  val CurrentVersion = 1.shortValue
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
   val DefaultCorrelationId = 0
@@ -170,7 +170,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
 @nonthreadsafe
 class FetchRequestBuilder() {
   private val correlationId = new AtomicInteger(0)
-  private val versionId = FetchRequest.CurrentVersion
+  private var versionId = FetchRequest.CurrentVersion
   private var clientId = ConsumerConfig.DefaultClientId
   private var replicaId = Request.OrdinaryConsumerId
   private var maxWait = FetchRequest.DefaultMaxWait
@@ -205,6 +205,11 @@ class FetchRequestBuilder() {
     this
   }
 
+  def requestVersion(versionId: Short): FetchRequestBuilder = {
+    this.versionId = versionId
+    this
+  }
+
   def build() = {
     val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
     requestMap.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index b9efec2..2c07033 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -73,7 +73,7 @@ class PartitionDataSend(val partitionId: Int,
     var written = 0L
     if(buffer.hasRemaining)
       written += channel.write(buffer)
-    if(!buffer.hasRemaining && messagesSentSize < messageSize) {
+    if (!buffer.hasRemaining && messagesSentSize < messageSize) {
       val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)
       messagesSentSize += bytesSent
       written += bytesSent
@@ -152,12 +152,10 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
 
 object FetchResponse {
 
-  val headerSize =
-    4 + /* correlationId */
-    4 /* topic count */
-
-  def readFrom(buffer: ByteBuffer): FetchResponse = {
+  // The request version is used to determine which fields we can expect in the response
+  def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = {
     val correlationId = buffer.getInt
+    val throttleTime = if (requestVersion > 0) buffer.getInt else 0
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
       val topicData = TopicData.readFrom(buffer)
@@ -166,28 +164,56 @@ object FetchResponse {
           (TopicAndPartition(topicData.topic, partitionId), partitionData)
       }
     })
-    FetchResponse(correlationId, Map(pairs:_*))
+    FetchResponse(correlationId, Map(pairs:_*), requestVersion, throttleTime)
+  }
+
+  // Returns the size of the response header
+  def headerSize(requestVersion: Int): Int = {
+    val throttleTimeSize = if (requestVersion > 0) 4 else 0
+    4 + /* correlationId */
+    4 + /* topic count */
+    throttleTimeSize
+  }
+
+  // Returns the size of entire fetch response in bytes (including the header size)
+  def responseSize(dataGroupedByTopic: Map[String, Map[TopicAndPartition, FetchResponsePartitionData]],
+                   requestVersion: Int): Int = {
+    headerSize(requestVersion) +
+    dataGroupedByTopic.foldLeft(0) { case (folded, (topic, partitionDataMap)) =>
+      val topicData = TopicData(topic, partitionDataMap.map {
+        case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData)
+      })
+      folded + topicData.sizeInBytes
+    }
   }
 }
 
-case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData])
+case class FetchResponse(correlationId: Int,
+                         data: Map[TopicAndPartition, FetchResponsePartitionData],
+                         requestVersion: Int = 0,
+                         throttleTimeMs: Int = 0)
   extends RequestOrResponse() {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
    */
-  lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
-
-  val sizeInBytes =
-    FetchResponse.headerSize +
-    dataGroupedByTopic.foldLeft(0) ((folded, curr) => {
-      val topicData = TopicData(curr._1, curr._2.map {
-        case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData)
-      })
-      folded + topicData.sizeInBytes
-    })
+  lazy val dataGroupedByTopic = data.groupBy{ case (topicAndPartition, fetchData) => topicAndPartition.topic }
+  val headerSizeInBytes = FetchResponse.headerSize(requestVersion)
+  lazy val sizeInBytes = FetchResponse.responseSize(dataGroupedByTopic, requestVersion)
 
   /*
+   * Writes the header of the FetchResponse to the input buffer
+   */
+  def writeHeaderTo(buffer: ByteBuffer) = {
+    buffer.putInt(sizeInBytes)
+    buffer.putInt(correlationId)
+    // Include the throttleTime only if the client can read it
+    if (requestVersion > 0)
+      buffer.putInt(throttleTimeMs)
+
+    buffer.putInt(dataGroupedByTopic.size) // topic count
+  }
+  /*
    * FetchResponse uses [sendfile](http://man7.org/linux/man-pages/man2/sendfile.2.html)
    * api for data transfer through the FetchResponseSend, so `writeTo` aren't actually being used.
    * It is implemented as an empty function to conform to `RequestOrResponse.writeTo`
@@ -231,10 +257,9 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte
 
   override def destination = dest
 
-  private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
-  buffer.putInt(payloadSize)
-  buffer.putInt(fetchResponse.correlationId)
-  buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
+  // The throttleTimeSize will be 0 if the request was made from a client sending a V0 style request
+  private val buffer = ByteBuffer.allocate(4 /* for size */ + fetchResponse.headerSizeInBytes)
+  fetchResponse.writeHeaderTo(buffer)
   buffer.rewind()
 
   private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index c866180..7fb143e 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -26,7 +26,7 @@ import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 
 object ProducerRequest {
-  val CurrentVersion = 0.shortValue
+  val CurrentVersion = 1.shortValue
 
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 5d1fac4..7719f30 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -23,6 +23,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.api.ApiUtils._
 
 object ProducerResponse {
+  // readFrom assumes that the response is written using V1 format
   def readFrom(buffer: ByteBuffer): ProducerResponse = {
     val correlationId = buffer.getInt
     val topicCount = buffer.getInt
@@ -37,13 +38,17 @@ object ProducerResponse {
       })
     })
 
-    ProducerResponse(correlationId, Map(statusPairs:_*))
+    val throttleTime = buffer.getInt
+    ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, throttleTime)
   }
 }
 
 case class ProducerResponseStatus(var error: Short, offset: Long)
 
-case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus])
+case class ProducerResponse(correlationId: Int,
+                            status: Map[TopicAndPartition, ProducerResponseStatus],
+                            requestVersion: Int = 0,
+                            throttleTime: Int = 0)
     extends RequestOrResponse() {
 
   /**
@@ -54,6 +59,7 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P
   def hasError = status.values.exists(_.error != ErrorMapping.NoError)
 
   val sizeInBytes = {
+    val throttleTimeSize = if (requestVersion > 0) 4 else 0
     val groupedStatus = statusGroupedByTopic
     4 + /* correlation id */
     4 + /* topic count */
@@ -66,7 +72,8 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P
         2 + /* error code */
         8 /* offset */
       }
-    })
+    }) +
+    throttleTimeSize
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -85,6 +92,9 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P
           buffer.putLong(nextOffset)
       }
     })
+    // Throttle time is only supported on V1 style requests
+    if (requestVersion > 0)
+      buffer.putInt(throttleTime)
   }
 
   override def describe(details: Boolean):String = { toString }

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 7ebc040..4e1833a 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -131,7 +131,7 @@ class SimpleConsumer(val host: String,
         response = sendRequest(request)
       }
     }
-    val fetchResponse = FetchResponse.readFrom(response.payload())
+    val fetchResponse = FetchResponse.readFrom(response.payload(), request.versionId)
     val fetchedSize = fetchResponse.sizeInBytes
     fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
     fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f843061..dca975c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -37,8 +37,17 @@ import com.yammer.metrics.core.Gauge
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, socketTimeout: Int, socketBufferSize: Int,
-                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0,
+abstract class AbstractFetcherThread(name: String,
+                                     clientId: String,
+                                     sourceBroker: BrokerEndPoint,
+                                     socketTimeout: Int,
+                                     socketBufferSize: Int,
+                                     fetchSize: Int,
+                                     fetcherBrokerId: Int = -1,
+                                     maxWait: Int = 0,
+                                     minBytes: Int = 1,
+                                     fetchBackOffMs: Int = 0,
+                                     fetchRequestVersion: Short = FetchRequest.CurrentVersion,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
   private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map
@@ -52,7 +61,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
           clientId(clientId).
           replicaId(fetcherBrokerId).
           maxWait(maxWait).
-          minBytes(minBytes)
+          minBytes(minBytes).
+          requestVersion(fetchRequestVersion)
 
   /* callbacks to be defined in subclass */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 9f8473f..016caaf 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -95,7 +95,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       if (response != null) {
         // Decrement the size of the delay queue
         delayQueueSensor.record(-1)
-        trace("Response throttled for: " + response.delayTimeMs + " ms")
+        trace("Response throttled for: " + response.throttleTimeMs + " ms")
         response.execute()
       }
     }
@@ -110,25 +110,25 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * @return Number of milliseconds to delay the response in case of Quota violation.
    *         Zero otherwise
    */
-  def recordAndMaybeThrottle(clientId: String, value: Int, callback: => Unit): Int = {
+  def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = {
     val clientSensors = getOrCreateQuotaSensors(clientId)
-    var delayTimeMs = 0L
+    var throttleTimeMs = 0
     try {
       clientSensors.quotaSensor.record(value)
       // trigger the callback immediately if quota is not violated
-      callback
+      callback(0)
     } catch {
       case qve: QuotaViolationException =>
         // Compute the delay
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
-        delayTimeMs = delayTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
-        delayQueue.add(new ThrottledResponse(time, delayTimeMs, callback))
+        throttleTimeMs = throttleTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
+        delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
         delayQueueSensor.record()
-        clientSensors.throttleTimeSensor.record(delayTimeMs)
+        clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
-        logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), delayTimeMs))
+        logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
     }
-    delayTimeMs.toInt
+    throttleTimeMs
   }
 
   /*
@@ -139,12 +139,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * we need to add a delay of X to W such that O * W / (W + X) = T.
    * Solving for X, we get X = (O - T)/T * W.
    */
-  private def delayTime(metricValue: Double, config: MetricConfig): Long =
-  {
+  private def throttleTime(metricValue: Double, config: MetricConfig): Int = {
     val quota = config.quota()
     val difference = metricValue - quota.bound
     val time = difference / quota.bound * config.timeWindowMs() * config.samples()
-    time.round
+    time.round.toInt
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 67f0cad..e727a6f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -259,18 +259,21 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
       var errorInResponse = false
-      responseStatus.foreach { case (topicAndPartition, status) =>
+      responseStatus.foreach
+      { case (topicAndPartition, status) =>
         // we only print warnings for known errors here; if it is unknown, it will cause
         // an error message in the replica manager
         if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) {
-          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s"
-            .format(produceRequest.correlationId, produceRequest.clientId,
-            topicAndPartition, ErrorMapping.exceptionNameFor(status.error)))
+          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
+            produceRequest.correlationId,
+            produceRequest.clientId,
+            topicAndPartition,
+            ErrorMapping.exceptionNameFor(status.error)))
           errorInResponse = true
         }
       }
 
-      def produceResponseCallback {
+      def produceResponseCallback(delayTimeMs: Int) {
         if (produceRequest.requiredAcks == 0) {
           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
           // the request, since no response is expected by the producer, the server will close socket server so that
@@ -285,12 +288,19 @@ class KafkaApis(val requestChannel: RequestChannel,
             requestChannel.noOperation(request.processor, request)
           }
         } else {
-          val response = ProducerResponse(produceRequest.correlationId, responseStatus)
-          requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+          val response = ProducerResponse(produceRequest.correlationId,
+                                          responseStatus,
+                                          produceRequest.versionId,
+                                          delayTimeMs)
+          requestChannel.sendResponse(new RequestChannel.Response(request,
+                                                                  new RequestOrResponseSend(request.connectionId,
+                                                                                            response)))
         }
       }
 
-      quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback)
+      quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
+                                                                   numBytesAppended,
+                                                                   produceResponseCallback)
     }
 
     // only allow appending to internal topic partitions
@@ -332,21 +342,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
       }
 
-      val response = FetchResponse(fetchRequest.correlationId, responsePartitionData)
-      def fetchResponseCallback {
+      def fetchResponseCallback(delayTimeMs: Int) {
+        val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTimeMs)
         requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
       }
 
       // Do not throttle replication traffic
       if (fetchRequest.isFromFollower) {
-        fetchResponseCallback
+        fetchResponseCallback(0)
       } else {
-        quotaManagers.get(RequestKeys.FetchKey) match {
-          case Some(quotaManager) =>
-            quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback)
-          case None =>
-            warn("Cannot throttle Api key %s".format(RequestKeys.nameForKey(RequestKeys.FetchKey)))
-        }
+        quotaManagers(RequestKeys.FetchKey).recordAndMaybeThrottle(fetchRequest.clientId,
+                                                                   FetchResponse.responseSize(responsePartitionData
+                                                                                                      .groupBy(_._1.topic),
+                                                                                              fetchRequest.versionId),
+                                                                   fetchResponseCallback)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index fae22d2..711d749 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{OffsetRequest, FetchResponsePartitionData}
+import kafka.api.{KAFKA_083, OffsetRequest, FetchResponsePartitionData}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 
 class ReplicaFetcherThread(name:String,
@@ -38,6 +38,8 @@ class ReplicaFetcherThread(name:String,
                                 maxWait = brokerConfig.replicaFetchWaitMaxMs,
                                 minBytes = brokerConfig.replicaFetchMinBytes,
                                 fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
+                                fetchRequestVersion =
+                                        if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0,
                                 isInterruptible = false) {
 
   // process fetched data

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d829e18..c195536 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -31,7 +31,6 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.metrics.Metrics
 
 import scala.collection._
 
@@ -304,7 +303,6 @@ class ReplicaManager(val config: KafkaConfig,
                      responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) {
 
     if (isValidRequiredAcks(requiredAcks)) {
-
       val sTime = SystemTime.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
       debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/main/scala/kafka/server/ThrottledResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala
index 1f80d54..214fa1f 100644
--- a/core/src/main/scala/kafka/server/ThrottledResponse.scala
+++ b/core/src/main/scala/kafka/server/ThrottledResponse.scala
@@ -25,13 +25,13 @@ import org.apache.kafka.common.utils.Time
 /**
  * Represents a request whose response has been delayed.
  * @param time @Time instance to use
- * @param delayTimeMs delay associated with this request
+ * @param throttleTimeMs delay associated with this request
  * @param callback Callback to trigger after delayTimeMs milliseconds
  */
-private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Long, callback: => Unit) extends Delayed {
-  val endTime = time.milliseconds + delayTimeMs
+private[server] class ThrottledResponse(val time: Time, val throttleTimeMs: Int, callback: Int => Unit) extends Delayed {
+  val endTime = time.milliseconds + throttleTimeMs
 
-  def execute() = callback
+  def execute() = callback(throttleTimeMs)
 
   override def getDelay(unit: TimeUnit): Long = {
     unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index b4c2a22..b7e7967 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -18,9 +18,12 @@
 package kafka.api
 
 
+import java.nio.channels.GatheringByteChannel
+
 import kafka.cluster.{BrokerEndPoint, EndPoint, Broker}
 import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
 import kafka.common._
+import kafka.consumer.FetchRequestAndResponseStatsRegistry
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.SystemTime
 
@@ -150,7 +153,7 @@ object SerializationTestUtils {
     ProducerResponse(1, Map(
       TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
       TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
-    ))
+    ), ProducerRequest.CurrentVersion, 100)
 
   def createTestFetchRequest: FetchRequest = {
     new FetchRequest(requestInfo = requestInfos)
@@ -304,4 +307,39 @@ class RequestResponseSerializationTest extends JUnitSuite {
       assertEquals("The original and deserialized for " + original.getClass.getSimpleName + " should be the same.", original, deserialized)
     }
   }
+
+  @Test
+  def testProduceResponseVersion() {
+    val oldClientResponse = ProducerResponse(1, Map(
+      TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001),
+      TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001)
+    ))
+
+    val newClientResponse = ProducerResponse(1, Map(
+      TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001),
+      TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001)
+    ), 1, 100)
+
+    // new response should have 4 bytes more than the old response since delayTime is an INT32
+    assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
+
+    val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes)
+    newClientResponse.writeTo(buffer)
+    buffer.rewind()
+    assertEquals(ProducerResponse.readFrom(buffer).throttleTime, 100)
+  }
+
+  @Test
+  def testFetchResponseVersion() {
+    val oldClientResponse = FetchResponse(1, Map(
+      TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
+    ), 0)
+
+    val newClientResponse = FetchResponse(1, Map(
+      TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
+    ), 1, 100)
+
+    // new response should have 4 bytes more than the old response since delayTime is an INT32
+    assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index caf98e8..997928c 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -30,7 +30,7 @@ class ClientQuotaManagerTest {
                                                 quotaBytesPerSecondOverrides = "p1=2000,p2=4000")
 
   var numCallbacks: Int = 0
-  def callback {
+  def callback(delayTimeMs: Int) {
     numCallbacks += 1
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/436b7ddc/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
index c4b5803..778f3f8 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
@@ -32,7 +32,7 @@ class ThrottledResponseExpirationTest {
                                                                     Collections.emptyList(),
                                                                     time)
 
-  def callback {
+  def callback(delayTimeMs: Int) {
     numCallbacks += 1
   }
 
@@ -75,9 +75,9 @@ class ThrottledResponseExpirationTest {
     val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback)
     val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback)
     val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback)
-    Assert.assertEquals(10, t1.delayTimeMs)
-    Assert.assertEquals(20, t2.delayTimeMs)
-    Assert.assertEquals(20, t3.delayTimeMs)
+    Assert.assertEquals(10, t1.throttleTimeMs)
+    Assert.assertEquals(20, t2.throttleTimeMs)
+    Assert.assertEquals(20, t3.throttleTimeMs)
 
     for(itr <- 0 to 2) {
       Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS))