You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/16 23:46:53 UTC

kafka git commit: kafka-2195; Add versionId to AbstractRequest.getErrorResponse and AbstractRequest.getRequest; patched by Andrii Biletskyi; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 20a31a29f -> 54e54f080


kafka-2195; Add versionId to AbstractRequest.getErrorResponse and AbstractRequest.getRequest; patched by Andrii Biletskyi; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54e54f08
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54e54f08
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54e54f08

Branch: refs/heads/trunk
Commit: 54e54f08077c9d71a5121e640b55836e6f7f2c9b
Parents: 20a31a2
Author: Andrii Biletskyi <an...@stealth.ly>
Authored: Tue Jun 16 14:46:48 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 16 14:46:48 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/requests/AbstractRequest.java  | 24 +++----
 .../requests/ConsumerMetadataRequest.java       | 14 +++-
 .../kafka/common/requests/FetchRequest.java     | 18 +++--
 .../kafka/common/requests/HeartbeatRequest.java | 20 ++++--
 .../kafka/common/requests/JoinGroupRequest.java | 26 +++++--
 .../common/requests/JoinGroupResponse.java      |  4 --
 .../common/requests/ListOffsetRequest.java      | 14 +++-
 .../kafka/common/requests/MetadataRequest.java  | 21 +++++-
 .../kafka/common/requests/MetadataResponse.java | 73 +++++++++-----------
 .../common/requests/OffsetCommitRequest.java    | 14 +++-
 .../common/requests/OffsetFetchRequest.java     | 20 ++++--
 .../kafka/common/requests/ProduceRequest.java   | 17 +++--
 .../common/requests/RequestResponseTest.java    | 23 +++---
 .../scala/kafka/network/RequestChannel.scala    |  2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 15 files changed, 192 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5e5308e..5d3d528 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -28,33 +28,33 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
     }
 
     /**
-     * Get an error response for a request
+     * Get an error response for a request for a given api version
      */
-    public abstract AbstractRequestResponse getErrorResponse(Throwable e);
+    public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);
 
     /**
      * Factory method for getting a request object based on ApiKey ID and a buffer
      */
-    public static AbstractRequest getRequest(int requestId, ByteBuffer buffer) {
+    public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
         switch (ApiKeys.forId(requestId)) {
             case PRODUCE:
-                return ProduceRequest.parse(buffer);
+                return ProduceRequest.parse(buffer, versionId);
             case FETCH:
-                return FetchRequest.parse(buffer);
+                return FetchRequest.parse(buffer, versionId);
             case LIST_OFFSETS:
-                return ListOffsetRequest.parse(buffer);
+                return ListOffsetRequest.parse(buffer, versionId);
             case METADATA:
-                return MetadataRequest.parse(buffer);
+                return MetadataRequest.parse(buffer, versionId);
             case OFFSET_COMMIT:
-                return OffsetCommitRequest.parse(buffer);
+                return OffsetCommitRequest.parse(buffer, versionId);
             case OFFSET_FETCH:
-                return OffsetFetchRequest.parse(buffer);
+                return OffsetFetchRequest.parse(buffer, versionId);
             case CONSUMER_METADATA:
-                return ConsumerMetadataRequest.parse(buffer);
+                return ConsumerMetadataRequest.parse(buffer, versionId);
             case JOIN_GROUP:
-                return JoinGroupRequest.parse(buffer);
+                return JoinGroupRequest.parse(buffer, versionId);
             case HEARTBEAT:
-                return HeartbeatRequest.parse(buffer);
+                return HeartbeatRequest.parse(buffer, versionId);
             default:
                 return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
index 04b90bf..fef22d7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -41,14 +41,24 @@ public class ConsumerMetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
-        return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch(versionId) {
+            case 0:
+                return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
+        }
     }
 
     public String groupId() {
         return groupId;
     }
 
+    public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
+        return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
+    }
+
     public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
         return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 8686d83..4f52c32 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -120,17 +120,23 @@ public class FetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
-                                                                                            FetchResponse.INVALID_HIGHWATERMARK,
-                                                                                            FetchResponse.EMPTY_RECORD_SET);
+                    FetchResponse.INVALID_HIGHWATERMARK,
+                    FetchResponse.EMPTY_RECORD_SET);
             responseData.put(entry.getKey(), partitionResponse);
         }
 
-        return new FetchResponse(responseData);
+        switch(versionId) {
+            case 0:
+                return new FetchResponse(responseData);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
+        }
     }
 
     public int replicaId() {
@@ -149,6 +155,10 @@ public class FetchRequest extends AbstractRequest {
         return fetchData;
     }
 
+    public static FetchRequest parse(ByteBuffer buffer, int versionId) {
+        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
+    }
+
     public static FetchRequest parse(ByteBuffer buffer) {
         return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 51d081f..d4d4a35 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -48,6 +48,17 @@ public class HeartbeatRequest extends AbstractRequest {
         consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch(versionId) {
+            case 0:
+                return new HeartbeatResponse(Errors.forException(e).code());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
+        }
+    }
+
     public String groupId() {
         return groupId;
     }
@@ -60,12 +71,11 @@ public class HeartbeatRequest extends AbstractRequest {
         return consumerId;
     }
 
-    public static HeartbeatRequest parse(ByteBuffer buffer) {
-        return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
+        return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
     }
 
-    @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
-        return new HeartbeatResponse(Errors.forException(e).code());
+    public static HeartbeatRequest parse(ByteBuffer buffer) {
+        return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 6795682..1ffe076 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -20,6 +21,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class JoinGroupRequest extends AbstractRequest {
@@ -65,6 +67,21 @@ public class JoinGroupRequest extends AbstractRequest {
         strategy = struct.getString(STRATEGY_KEY_NAME);
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new JoinGroupResponse(
+                        Errors.forException(e).code(),
+                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
+                        JoinGroupResponse.UNKNOWN_CONSUMER_ID,
+                        Collections.<TopicPartition>emptyList());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
+        }
+    }
+
     public String groupId() {
         return groupId;
     }
@@ -85,12 +102,11 @@ public class JoinGroupRequest extends AbstractRequest {
         return strategy;
     }
 
-    public static JoinGroupRequest parse(ByteBuffer buffer) {
-        return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
+        return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
     }
 
-    @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
-        return new JoinGroupResponse(Errors.forException(e).code());
+    public static JoinGroupRequest parse(ByteBuffer buffer) {
+        return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 8d418cd..7bf544e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -75,10 +75,6 @@ public class JoinGroupResponse extends AbstractRequestResponse {
         this.assignedPartitions = assignedPartitions;
     }
 
-    public JoinGroupResponse(short errorCode) {
-        this(errorCode, UNKNOWN_GENERATION_ID, UNKNOWN_CONSUMER_ID, Collections.<TopicPartition>emptyList());
-    }
-
     public JoinGroupResponse(Struct struct) {
         super(struct);
         assignedPartitions = new ArrayList<TopicPartition>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 19267ee..a1bb2b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -107,7 +107,7 @@ public class ListOffsetRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
@@ -115,7 +115,13 @@ public class ListOffsetRequest extends AbstractRequest {
             responseData.put(entry.getKey(), partitionResponse);
         }
 
-        return new ListOffsetResponse(responseData);
+        switch(versionId) {
+            case 0:
+                return new ListOffsetResponse(responseData);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
+        }
     }
 
     public int replicaId() {
@@ -126,6 +132,10 @@ public class ListOffsetRequest extends AbstractRequest {
         return offsetData;
     }
 
+    public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
+        return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
+    }
+
     public static ListOffsetRequest parse(ByteBuffer buffer) {
         return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 7e0ce15..f70e8da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -18,6 +18,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -47,18 +50,30 @@ public class MetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<String, Errors>();
-        for (String topic: topics) {
+        for (String topic : topics) {
             topicErrors.put(topic, Errors.forException(e));
         }
-        return new MetadataResponse(topicErrors);
+
+        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+        switch (versionId) {
+            case 0:
+                return new MetadataResponse(cluster, topicErrors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
+        }
     }
 
     public List<String> topics() {
         return topics;
     }
 
+    public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
+        return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
+    }
+
     public static MetadataRequest parse(ByteBuffer buffer) {
         return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 44e2ce6..c8f2d08 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 public class MetadataResponse extends AbstractRequestResponse {
-    
+
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
     private static final String BROKERS_KEY_NAME = "brokers";
     private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
@@ -67,30 +67,15 @@ public class MetadataResponse extends AbstractRequestResponse {
     private final Cluster cluster;
     private final Map<String, Errors> errors;
 
-    /* Constructor for error responses where most of the data, except error per topic, is irrelevant */
-    public MetadataResponse(Map<String, Errors> topicErrors) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        struct.set(BROKERS_KEY_NAME, new ArrayList<Struct>().toArray());
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Errors> topicError : topicErrors.entrySet()) {
-            Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
-            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, topicError.getValue().code());
-            topicData.set(TOPIC_KEY_NAME, topicError.getKey());
-            topicData.set(PARTITION_METADATA_KEY_NAME, new ArrayList<Struct>().toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
-
-        this.errors = topicErrors;
-        this.cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
-    }
-
-    public MetadataResponse(Cluster cluster) {
+    /**
+     * Constructor for MetadataResponse where there are errors for some of the topics,
+     * error data take precedence over cluster information for particular topic
+     */
+    public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
         super(new Struct(CURRENT_SCHEMA));
 
         List<Struct> brokerArray = new ArrayList<Struct>();
-        for (Node node: cluster.nodes()) {
+        for (Node node : cluster.nodes()) {
             Struct broker = struct.instance(BROKERS_KEY_NAME);
             broker.set(NODE_ID_KEY_NAME, node.id());
             broker.set(HOST_KEY_NAME, node.host());
@@ -100,27 +85,33 @@ public class MetadataResponse extends AbstractRequestResponse {
         struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
 
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (String topic: cluster.topics()) {
+        for (String topic : cluster.topics()) {
             Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
-            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+
             topicData.set(TOPIC_KEY_NAME, topic);
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
-                Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
-                partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
-                partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
-                partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
-                ArrayList<Integer> replicas = new ArrayList<Integer>();
-                for (Node node: fetchPartitionData.replicas())
-                    replicas.add(node.id());
-                partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
-                ArrayList<Integer> isr = new ArrayList<Integer>();
-                for (Node node: fetchPartitionData.inSyncReplicas())
-                    isr.add(node.id());
-                partitionData.set(ISR_KEY_NAME, isr.toArray());
-                partitionArray.add(partitionData);
+            if (errors.containsKey(topic)) {
+                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
+            } else {
+                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+                List<Struct> partitionArray = new ArrayList<Struct>();
+                for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
+                    Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
+                    partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+                    partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
+                    partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
+                    ArrayList<Integer> replicas = new ArrayList<Integer>();
+                    for (Node node : fetchPartitionData.replicas())
+                        replicas.add(node.id());
+                    partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+                    ArrayList<Integer> isr = new ArrayList<Integer>();
+                    for (Node node : fetchPartitionData.inSyncReplicas())
+                        isr.add(node.id());
+                    partitionData.set(ISR_KEY_NAME, isr.toArray());
+                    partitionArray.add(partitionData);
+                }
+                topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
             }
-            topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+
             topicArray.add(topicData);
         }
         struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
@@ -183,4 +174,4 @@ public class MetadataResponse extends AbstractRequestResponse {
     public static MetadataResponse parse(ByteBuffer buffer) {
         return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 8bf6cbb..d6e6386 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -217,12 +217,22 @@ public class OffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
         for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
             responseData.put(entry.getKey(), Errors.forException(e).code());
         }
-        return new OffsetCommitResponse(responseData);
+
+        switch (versionId) {
+            // OffsetCommitResponseV0 == OffsetCommitResponseV1 == OffsetCommitResponseV2
+            case 0:
+            case 1:
+            case 2:
+                return new OffsetCommitResponse(responseData);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id)));
+        }
     }
 
     public String groupId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index deec1fa..b5e8a0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -88,16 +88,24 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
 
         for (TopicPartition partition: partitions) {
             responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
-                                                                              OffsetFetchResponse.NO_METADATA,
-                                                                              Errors.forException(e).code()));
+                    OffsetFetchResponse.NO_METADATA,
+                    Errors.forException(e).code()));
         }
 
-        return new OffsetFetchResponse(responseData);
+        switch (versionId) {
+            // OffsetFetchResponseV0 == OffsetFetchResponseV1
+            case 0:
+            case 1:
+                return new OffsetFetchResponse(responseData);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
+        }
     }
 
     public String groupId() {
@@ -108,6 +116,10 @@ public class OffsetFetchRequest extends AbstractRequest {
         return partitions;
     }
 
+    public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) {
+        return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer));
+    }
+
     public static OffsetFetchRequest parse(ByteBuffer buffer) {
         return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index fabeae3..715504b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -90,19 +90,24 @@ public class ProduceRequest  extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(Throwable e) {
-
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
             return null;
 
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
 
-        for (Map.Entry<TopicPartition, ByteBuffer> entry: partitionRecords.entrySet()) {
+        for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
             responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET));
         }
 
-        return new ProduceResponse(responseMap);
+        switch (versionId) {
+            case 0:
+                return new ProduceResponse(responseMap);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
+        }
     }
 
     public short acks() {
@@ -117,6 +122,10 @@ public class ProduceRequest  extends AbstractRequest {
         return partitionRecords;
     }
 
+    public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
+        return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
+    }
+
     public static ProduceRequest parse(ByteBuffer buffer) {
         return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e3cc196..8b2aca8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -38,31 +38,31 @@ public class RequestResponseTest {
                 createRequestHeader(),
                 createResponseHeader(),
                 createConsumerMetadataRequest(),
-                createConsumerMetadataRequest().getErrorResponse(new UnknownServerException()),
+                createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()),
                 createConsumerMetadataResponse(),
                 createFetchRequest(),
-                createFetchRequest().getErrorResponse(new UnknownServerException()),
+                createFetchRequest().getErrorResponse(0, new UnknownServerException()),
                 createFetchResponse(),
                 createHeartBeatRequest(),
-                createHeartBeatRequest().getErrorResponse(new UnknownServerException()),
+                createHeartBeatRequest().getErrorResponse(0, new UnknownServerException()),
                 createHeartBeatResponse(),
                 createJoinGroupRequest(),
-                createJoinGroupRequest().getErrorResponse(new UnknownServerException()),
+                createJoinGroupRequest().getErrorResponse(0, new UnknownServerException()),
                 createJoinGroupResponse(),
                 createListOffsetRequest(),
-                createListOffsetRequest().getErrorResponse(new UnknownServerException()),
+                createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
                 createListOffsetResponse(),
                 createMetadataRequest(),
-                createMetadataRequest().getErrorResponse(new UnknownServerException()),
+                createMetadataRequest().getErrorResponse(0, new UnknownServerException()),
                 createMetadataResponse(),
                 createOffsetCommitRequest(),
-                createOffsetCommitRequest().getErrorResponse(new UnknownServerException()),
+                createOffsetCommitRequest().getErrorResponse(0, new UnknownServerException()),
                 createOffsetCommitResponse(),
                 createOffsetFetchRequest(),
-                createOffsetFetchRequest().getErrorResponse(new UnknownServerException()),
+                createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()),
                 createOffsetFetchResponse(),
                 createProduceRequest(),
-                createProduceRequest().getErrorResponse(new UnknownServerException()),
+                createProduceRequest().getErrorResponse(0, new UnknownServerException()),
                 createProduceResponse());
 
         for (AbstractRequestResponse req: requestResponseList) {
@@ -145,7 +145,10 @@ public class RequestResponseTest {
         Node[] isr = new Node[1];
         isr[0] = node;
         Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)));
-        return new MetadataResponse(cluster);
+
+        Map<String, Errors> errors = new HashMap<String, Errors>();
+        errors.put("topic2", Errors.LEADER_NOT_AVAILABLE);
+        return new MetadataResponse(cluster, errors);
     }
 
     private AbstractRequest createOffsetCommitRequest() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 357d8b9..2074128 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -66,7 +66,7 @@ object RequestChannel extends Logging {
         null
     val body: AbstractRequest =
       if (requestObj == null)
-        AbstractRequest.getRequest(header.apiKey, buffer)
+        AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
       else
         null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d63bc18..c7debe4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -74,7 +74,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if ( request.requestObj != null)
           request.requestObj.handleError(e, requestChannel, request)
         else {
-          val response = request.body.getErrorResponse(e)
+          val response = request.body.getErrorResponse(request.header.apiVersion, e)
           val respHeader = new ResponseHeader(request.header.correlationId)
 
           /* If request doesn't have a default error response, we just close the connection.