You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:16 UTC

[19/37] kafka-1462; Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Guozhang Wang and Jay Kreps

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 2652c32..7d90fce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -20,50 +21,112 @@ import java.util.Map;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-public class MetadataResponse {
+public class MetadataResponse extends AbstractRequestResponse {
+    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+    private static String BROKERS_KEY_NAME = "brokers";
+    private static String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
+
+    // broker level field names
+    private static String NODE_ID_KEY_NAME = "node_id";
+    private static String HOST_KEY_NAME = "host";
+    private static String PORT_KEY_NAME = "port";
+
+    // topic level field names
+    private static String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITION_METADATA_KEY_NAME = "partition_metadata";
+
+    // partition level field names
+    private static String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+    private static String PARTITION_KEY_NAME = "partition_id";
+    private static String LEADER_KEY_NAME = "leader";
+    private static String REPLICAS_KEY_NAME = "replicas";
+    private static String ISR_KEY_NAME = "isr";
 
     private final Cluster cluster;
     private final Map<String, Errors> errors;
 
-    public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
+    public MetadataResponse(Cluster cluster) {
+        super(new Struct(curSchema));
+
+        List<Struct> brokerArray = new ArrayList<Struct>();
+        for (Node node: cluster.nodes()) {
+            Struct broker = struct.instance(BROKERS_KEY_NAME);
+            broker.set(NODE_ID_KEY_NAME, node.id());
+            broker.set(HOST_KEY_NAME, node.host());
+            broker.set(PORT_KEY_NAME, node.port());
+            brokerArray.add(broker);
+        }
+        struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (String topic: cluster.topics()) {
+            Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, (short)0);  // no error
+            topicData.set(TOPIC_KEY_NAME, topic);
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
+                Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
+                partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, (short)0);  // no error
+                partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
+                partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
+                ArrayList<Integer> replicas = new ArrayList<Integer>();
+                for (Node node: fetchPartitionData.replicas())
+                    replicas.add(node.id());
+                partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+                ArrayList<Integer> isr = new ArrayList<Integer>();
+                for (Node node: fetchPartitionData.inSyncReplicas())
+                    isr.add(node.id());
+                partitionData.set(ISR_KEY_NAME, isr.toArray());
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
+
         this.cluster = cluster;
-        this.errors = errors;
+        this.errors = new HashMap<String, Errors>();
     }
 
     public MetadataResponse(Struct struct) {
+        super(struct);
         Map<String, Errors> errors = new HashMap<String, Errors>();
         Map<Integer, Node> brokers = new HashMap<Integer, Node>();
-        Object[] brokerStructs = (Object[]) struct.get("brokers");
+        Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
         for (int i = 0; i < brokerStructs.length; i++) {
             Struct broker = (Struct) brokerStructs[i];
-            int nodeId = (Integer) broker.get("node_id");
-            String host = (String) broker.get("host");
-            int port = (Integer) broker.get("port");
+            int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+            String host = broker.getString(HOST_KEY_NAME);
+            int port = broker.getInt(PORT_KEY_NAME);
             brokers.put(nodeId, new Node(nodeId, host, port));
         }
         List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
-        Object[] topicInfos = (Object[]) struct.get("topic_metadata");
+        Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME);
         for (int i = 0; i < topicInfos.length; i++) {
             Struct topicInfo = (Struct) topicInfos[i];
-            short topicError = topicInfo.getShort("topic_error_code");
-            String topic = topicInfo.getString("topic");
+            short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME);
+            String topic = topicInfo.getString(TOPIC_KEY_NAME);
             if (topicError == Errors.NONE.code()) {
-                Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
+                Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
                 for (int j = 0; j < partitionInfos.length; j++) {
                     Struct partitionInfo = (Struct) partitionInfos[j];
-                    short partError = partitionInfo.getShort("partition_error_code");
+                    short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME);
                     if (partError == Errors.NONE.code()) {
-                        int partition = partitionInfo.getInt("partition_id");
-                        int leader = partitionInfo.getInt("leader");
+                        int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+                        int leader = partitionInfo.getInt(LEADER_KEY_NAME);
                         Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                        Object[] replicas = (Object[]) partitionInfo.get("replicas");
+                        Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
                         Node[] replicaNodes = new Node[replicas.length];
                         for (int k = 0; k < replicas.length; k++)
                             replicaNodes[k] = brokers.get(replicas[k]);
-                        Object[] isr = (Object[]) partitionInfo.get("isr");
+                        Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
                         Node[] isrNodes = new Node[isr.length];
                         for (int k = 0; k < isr.length; k++)
                             isrNodes[k] = brokers.get(isr[k]);
@@ -86,4 +149,7 @@ public class MetadataResponse {
         return this.cluster;
     }
 
+    public static MetadataResponse parse(ByteBuffer buffer) {
+        return new MetadataResponse(((Struct) curSchema.read(buffer)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
new file mode 100644
index 0000000..3ee5cba
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This wrapper supports both v0 and v1 of OffsetCommitRequest.
+ */
+public class OffsetCommitRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
+    private static String GROUP_ID_KEY_NAME = "group_id";
+    private static String GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static String TOPICS_KEY_NAME = "topics";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String COMMIT_OFFSET_KEY_NAME = "offset";
+    private static String TIMESTAMP_KEY_NAME = "timestamp";
+    private static String METADATA_KEY_NAME = "metadata";
+
+    public static final int DEFAULT_GENERATION_ID = -1;
+    public static final String DEFAULT_CONSUMER_ID = "";
+
+    private final String groupId;
+    private final int generationId;
+    private final String consumerId;
+    private final Map<TopicPartition, PartitionData> offsetData;
+
+    public static final class PartitionData {
+        public final long offset;
+        public final long timestamp;
+        public final String metadata;
+
+        public PartitionData(long offset, long timestamp, String metadata) {
+            this.offset = offset;
+            this.timestamp = timestamp;
+            this.metadata = metadata;
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     * @param groupId
+     * @param offsetData
+     */
+    @Deprecated
+    public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
+        initCommonFields(groupId, offsetData);
+        this.groupId = groupId;
+        this.generationId = DEFAULT_GENERATION_ID;
+        this.consumerId = DEFAULT_CONSUMER_ID;
+        this.offsetData = offsetData;
+    }
+
+    /**
+     * Constructor for version 1.
+     * @param groupId
+     * @param generationId
+     * @param consumerId
+     * @param offsetData
+     */
+    public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(curSchema));
+
+        initCommonFields(groupId, offsetData);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        this.groupId = groupId;
+        this.generationId = generationId;
+        this.consumerId = consumerId;
+        this.offsetData = offsetData;
+    }
+
+    private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
+                partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
+                partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+    }
+
+    public OffsetCommitRequest(Struct struct) {
+        super(struct);
+        offsetData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
+                long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+                String metadata = partitionResponse.getString(METADATA_KEY_NAME);
+                PartitionData partitionData = new PartitionData(offset, timestamp, metadata);
+                offsetData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        // This field only exists in v1.
+        if (struct.hasField(GENERATION_ID_KEY_NAME))
+            generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+        else
+            generationId = DEFAULT_GENERATION_ID;
+
+        // This field only exists in v1.
+        if (struct.hasField(CONSUMER_ID_KEY_NAME))
+            consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        else
+            consumerId = DEFAULT_CONSUMER_ID;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int generationId() {
+        return generationId;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public Map<TopicPartition, PartitionData> offsetData() {
+        return offsetData;
+    }
+
+    public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
+        Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
+        return new OffsetCommitRequest(((Struct) schema.read(buffer)));
+    }
+
+    public static OffsetCommitRequest parse(ByteBuffer buffer) {
+        return new OffsetCommitRequest(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
new file mode 100644
index 0000000..711232a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetCommitResponse extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
+    private static String RESPONSES_KEY_NAME = "responses";
+
+    // topic level fields
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+    // partition level fields
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+
+    private final Map<TopicPartition, Short> responseData;
+
+    public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
+        super(new Struct(curSchema));
+
+        Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, Short>> entries: topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, Short> partitionEntry : entries.getValue().entrySet()) {
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue());
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+        this.responseData = responseData;
+    }
+
+    public OffsetCommitResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, Short>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                responseData.put(new TopicPartition(topic, partition), errorCode);
+            }
+        }
+    }
+
+    public Map<TopicPartition, Short> responseData() {
+        return responseData;
+    }
+
+    public static OffsetCommitResponse parse(ByteBuffer buffer) {
+        return new OffsetCommitResponse(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
new file mode 100644
index 0000000..90d5135
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This wrapper supports both v0 and v1 of OffsetFetchRequest.
+ */
+public class OffsetFetchRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
+    private static String GROUP_ID_KEY_NAME = "group_id";
+    private static String TOPICS_KEY_NAME = "topics";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+
+    public static final int DEFAULT_GENERATION_ID = -1;
+    public static final String DEFAULT_CONSUMER_ID = "";
+
+    private final String groupId;
+    private final List<TopicPartition> partitions;
+
+    public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
+        super(new Struct(curSchema));
+
+        Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Integer partiitonId : entries.getValue()) {
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partiitonId);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        this.groupId = groupId;
+        this.partitions = partitions;
+    }
+
+    public OffsetFetchRequest(Struct struct) {
+        super(struct);
+        partitions = new ArrayList<TopicPartition>();
+        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                partitions.add(new TopicPartition(topic, partition));
+            }
+        }
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+   }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public List<TopicPartition> partitions() {
+        return partitions;
+    }
+
+    public static OffsetFetchRequest parse(ByteBuffer buffer) {
+        return new OffsetFetchRequest(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
new file mode 100644
index 0000000..6b7c269
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetFetchResponse extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
+    private static String RESPONSES_KEY_NAME = "responses";
+
+    // topic level fields
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+    // partition level fields
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String COMMIT_OFFSET_KEY_NAME = "offset";
+    private static String METADATA_KEY_NAME = "metadata";
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+
+    private final Map<TopicPartition,PartitionData> responseData;
+
+    public static final class PartitionData {
+        public final long offset;
+        public final String metadata;
+        public final short errorCode;
+
+        public PartitionData(long offset, String metadata, short errorCode) {
+            this.offset = offset;
+            this.metadata = metadata;
+            this.errorCode = errorCode;
+        }
+    }
+
+    public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
+        super(new Struct(curSchema));
+
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> entries: topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
+                partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
+                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+        this.responseData = responseData;
+    }
+
+    public OffsetFetchResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
+                String metadata = partitionResponse.getString(METADATA_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                PartitionData partitionData = new PartitionData(offset, metadata, errorCode);
+                responseData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    public Map<TopicPartition, PartitionData> responseData() {
+        return responseData;
+    }
+
+    public static OffsetFetchResponse parse(ByteBuffer buffer) {
+        return new OffsetFetchResponse(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 6036f6a..3dbba8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -1,71 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.MemoryRecords;
+public class ProduceRequest  extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
+    private static String ACKS_KEY_NAME = "acks";
+    private static String TIMEOUT_KEY_NAME = "timeout";
+    private static String TOPIC_DATA_KEY_NAME = "topic_data";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITION_DATA_KEY_NAME = "data";
 
-public class ProduceRequest {
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String RECORD_SET_KEY_NAME = "record_set";
 
     private final short acks;
     private final int timeout;
-    private final Map<String, List<PartitionRecords>> records;
+    private final Map<TopicPartition, ByteBuffer> partitionRecords;
 
-    public ProduceRequest(short acks, int timeout) {
+    public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
+        super(new Struct(curSchema));
+        Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
+        struct.set(ACKS_KEY_NAME, acks);
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+        List<Struct> topicDatas = new ArrayList<Struct>(recordsByTopic.size());
+        for (Map.Entry<String, Map<Integer, ByteBuffer>> entry : recordsByTopic.entrySet()) {
+            Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, ByteBuffer> partitionEntry : entry.getValue().entrySet()) {
+                ByteBuffer buffer = partitionEntry.getValue().duplicate();
+                Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
+                                       .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+                                       .set(RECORD_SET_KEY_NAME, buffer);
+                partitionArray.add(part);
+            }
+            topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
+            topicDatas.add(topicData);
+        }
+        struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
         this.acks = acks;
         this.timeout = timeout;
-        this.records = new HashMap<String, List<PartitionRecords>>();
+        this.partitionRecords = partitionRecords;
     }
 
-    public void add(TopicPartition tp, MemoryRecords recs) {
-        List<PartitionRecords> found = this.records.get(tp.topic());
-        if (found == null) {
-            found = new ArrayList<PartitionRecords>();
-            records.put(tp.topic(), found);
+    public ProduceRequest(Struct struct) {
+        super(struct);
+        partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
+        for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
+            Struct topicData = (Struct) topicDataObj;
+            String topic = topicData.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+                partitionRecords.put(new TopicPartition(topic, partition), records);
+            }
         }
-        found.add(new PartitionRecords(tp, recs));
+        acks = struct.getShort(ACKS_KEY_NAME);
+        timeout = struct.getInt(TIMEOUT_KEY_NAME);
     }
 
-    public Struct toStruct() {
-        Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id));
-        produce.set("acks", acks);
-        produce.set("timeout", timeout);
-        List<Struct> topicDatas = new ArrayList<Struct>(records.size());
-        for (Map.Entry<String, List<PartitionRecords>> entry : records.entrySet()) {
-            Struct topicData = produce.instance("topic_data");
-            topicData.set("topic", entry.getKey());
-            List<PartitionRecords> parts = entry.getValue();
-            Object[] partitionData = new Object[parts.size()];
-            for (int i = 0; i < parts.size(); i++) {
-                ByteBuffer buffer = parts.get(i).records.buffer();
-                buffer.flip();
-                Struct part = topicData.instance("data")
-                                       .set("partition", parts.get(i).topicPartition.partition())
-                                       .set("record_set", buffer);
-                partitionData[i] = part;
-            }
-            topicData.set("data", partitionData);
-            topicDatas.add(topicData);
-        }
-        produce.set("topic_data", topicDatas.toArray());
-        return produce;
+    public short acks() {
+        return acks;
     }
 
-    private static final class PartitionRecords {
-        public final TopicPartition topicPartition;
-        public final MemoryRecords records;
+    public int timeout() {
+        return timeout;
+    }
 
-        public PartitionRecords(TopicPartition topicPartition, MemoryRecords records) {
-            this.topicPartition = topicPartition;
-            this.records = records;
-        }
+    public Map<TopicPartition, ByteBuffer> partitionRecords() {
+        return partitionRecords;
     }
 
+    public static ProduceRequest parse(ByteBuffer buffer) {
+        return new ProduceRequest(((Struct) curSchema.read(buffer)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 6cf4fb7..5220464 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -12,67 +12,83 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.Struct;
+public class ProduceResponse extends AbstractRequestResponse {
+    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
+    private static String RESPONSES_KEY_NAME = "responses";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
 
-public class ProduceResponse {
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static String BASE_OFFSET_KEY_NAME = "base_offset";
 
     private final Map<TopicPartition, PartitionResponse> responses;
 
-    public ProduceResponse() {
-        this.responses = new HashMap<TopicPartition, PartitionResponse>();
+    public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
+        super(new Struct(curSchema));
+        Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
+        List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
+        for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
+                PartitionResponse part = partitionEntry.getValue();
+                Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
+                                       .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+                                       .set(ERROR_CODE_KEY_NAME, part.errorCode)
+                                       .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
+                partitionArray.add(partStruct);
+            }
+            topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
+            topicDatas.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
+        this.responses = responses;
     }
 
     public ProduceResponse(Struct struct) {
+        super(struct);
         responses = new HashMap<TopicPartition, PartitionResponse>();
-        for (Object topicResponse : (Object[]) struct.get("responses")) {
+        for (Object topicResponse : struct.getArray("responses")) {
             Struct topicRespStruct = (Struct) topicResponse;
-            String topic = (String) topicRespStruct.get("topic");
-            for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
+            String topic = topicRespStruct.getString("topic");
+            for (Object partResponse : topicRespStruct.getArray("partition_responses")) {
                 Struct partRespStruct = (Struct) partResponse;
-                int partition = (Integer) partRespStruct.get("partition");
-                short errorCode = (Short) partRespStruct.get("error_code");
-                long offset = (Long) partRespStruct.get("base_offset");
+                int partition = partRespStruct.getInt("partition");
+                short errorCode = partRespStruct.getShort("error_code");
+                long offset = partRespStruct.getLong("base_offset");
                 TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(partition, errorCode, offset));
+                responses.put(tp, new PartitionResponse(errorCode, offset));
             }
         }
     }
 
-    public void addResponse(TopicPartition tp, int partition, short error, long baseOffset) {
-        this.responses.put(tp, new PartitionResponse(partition, error, baseOffset));
-    }
-
     public Map<TopicPartition, PartitionResponse> responses() {
         return this.responses;
     }
 
-    @Override
-    public String toString() {
-        StringBuilder b = new StringBuilder();
-        b.append('{');
-        boolean isFirst = true;
-        for (Map.Entry<TopicPartition, PartitionResponse> entry : responses.entrySet()) {
-            if (isFirst)
-                isFirst = false;
-            else
-                b.append(',');
-            b.append(entry.getKey() + " : " + entry.getValue());
-        }
-        b.append('}');
-        return b.toString();
-    }
-
-    public static class PartitionResponse {
-        public int partitionId;
+    public static final class PartitionResponse {
         public short errorCode;
         public long baseOffset;
 
-        public PartitionResponse(int partitionId, short errorCode, long baseOffset) {
-            this.partitionId = partitionId;
+        public PartitionResponse(short errorCode, long baseOffset) {
             this.errorCode = errorCode;
             this.baseOffset = baseOffset;
         }
@@ -81,9 +97,7 @@ public class ProduceResponse {
         public String toString() {
             StringBuilder b = new StringBuilder();
             b.append('{');
-            b.append("pid: ");
-            b.append(partitionId);
-            b.append(",error: ");
+            b.append("error: ");
             b.append(errorCode);
             b.append(",offset: ");
             b.append(baseOffset);
@@ -91,4 +105,8 @@ public class ProduceResponse {
             return b.toString();
         }
     }
+
+    public static ProduceResponse parse(ByteBuffer buffer) {
+        return new ProduceResponse(((Struct) curSchema.read(buffer)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 66cc2fe..f459a2a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -24,18 +24,24 @@ import org.apache.kafka.common.protocol.types.Struct;
 /**
  * The header for a request in the Kafka protocol
  */
-public class RequestHeader {
+public class RequestHeader extends AbstractRequestResponse {
 
     private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
     private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
     private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
     private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
 
-    private final Struct header;
+    private final short apiKey;
+    private final short apiVersion;
+    private final String clientId;
+    private final int correlationId;
 
     public RequestHeader(Struct header) {
-        super();
-        this.header = header;
+        super(header);
+        apiKey = struct.getShort(API_KEY_FIELD);
+        apiVersion = struct.getShort(API_VERSION_FIELD);
+        clientId = struct.getString(CLIENT_ID_FIELD);
+        correlationId = struct.getInt(CORRELATION_ID_FIELD);
     }
 
     public RequestHeader(short apiKey, String client, int correlation) {
@@ -43,43 +49,34 @@ public class RequestHeader {
     }
 
     public RequestHeader(short apiKey, short version, String client, int correlation) {
-        this(new Struct(Protocol.REQUEST_HEADER));
-        this.header.set(API_KEY_FIELD, apiKey);
-        this.header.set(API_VERSION_FIELD, version);
-        this.header.set(CLIENT_ID_FIELD, client);
-        this.header.set(CORRELATION_ID_FIELD, correlation);
+        super(new Struct(Protocol.REQUEST_HEADER));
+        struct.set(API_KEY_FIELD, apiKey);
+        struct.set(API_VERSION_FIELD, version);
+        struct.set(CLIENT_ID_FIELD, client);
+        struct.set(CORRELATION_ID_FIELD, correlation);
+        this.apiKey = apiKey;
+        this.apiVersion = version;
+        this.clientId = client;
+        this.correlationId = correlation;
     }
 
     public short apiKey() {
-        return (Short) this.header.get(API_KEY_FIELD);
+        return apiKey;
     }
 
     public short apiVersion() {
-        return (Short) this.header.get(API_VERSION_FIELD);
+        return apiVersion;
     }
 
     public String clientId() {
-        return (String) this.header.get(CLIENT_ID_FIELD);
+        return clientId;
     }
 
     public int correlationId() {
-        return (Integer) this.header.get(CORRELATION_ID_FIELD);
+        return correlationId;
     }
 
     public static RequestHeader parse(ByteBuffer buffer) {
         return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
     }
-
-    public void writeTo(ByteBuffer buffer) {
-        header.writeTo(buffer);
-    }
-
-    public int sizeOf() {
-        return header.sizeOf();
-    }
-
-    @Override
-    public String toString() {
-        return header.toString();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
index 257b828..dd63853 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
@@ -28,31 +28,25 @@ import org.apache.kafka.common.protocol.types.Struct;
 /**
  * A response header in the kafka protocol.
  */
-public class ResponseHeader {
+public class ResponseHeader extends AbstractRequestResponse {
 
     private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
 
-    private final Struct header;
+    private final int correlationId;
 
     public ResponseHeader(Struct header) {
-        this.header = header;
+        super(header);
+        correlationId = struct.getInt(CORRELATION_KEY_FIELD);
     }
 
     public ResponseHeader(int correlationId) {
-        this(new Struct(Protocol.RESPONSE_HEADER));
-        this.header.set(CORRELATION_KEY_FIELD, correlationId);
+        super(new Struct(Protocol.RESPONSE_HEADER));
+        struct.set(CORRELATION_KEY_FIELD, correlationId);
+        this.correlationId = correlationId;
     }
 
     public int correlationId() {
-        return (Integer) header.get(CORRELATION_KEY_FIELD);
-    }
-
-    public void writeTo(ByteBuffer buffer) {
-        header.writeTo(buffer);
-    }
-
-    public int sizeOf() {
-        return header.sizeOf();
+        return correlationId;
     }
 
     public static ResponseHeader parse(ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
new file mode 100644
index 0000000..ba38637
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CollectionUtils {
+    /**
+     * group data by topic
+     * @param data Data to be partitioned
+     * @param <T> Partition data type
+     * @return partitioned data
+     */
+    public static <T> Map<String, Map<Integer, T>> groupDataByTopic(Map<TopicPartition, T> data) {
+        Map<String, Map<Integer, T>> dataByTopic = new HashMap<String, Map<Integer, T>>();
+        for (Map.Entry<TopicPartition, T> entry: data.entrySet()) {
+            String topic = entry.getKey().topic();
+            int partition = entry.getKey().partition();
+            Map<Integer, T> topicData = dataByTopic.get(topic);
+            if (topicData == null) {
+                topicData = new HashMap<Integer, T>();
+                dataByTopic.put(topic, topicData);
+            }
+            topicData.put(partition, entry.getValue());
+        }
+        return dataByTopic;
+    }
+
+    /**
+     * group partitions by topic
+     * @param partitions
+     * @return partitions per topic
+     */
+    public static Map<String, List<Integer>> groupDataByTopic(List<TopicPartition> partitions) {
+        Map<String, List<Integer>> partitionsByTopic = new HashMap<String, List<Integer>>();
+        for (TopicPartition tp: partitions) {
+            String topic = tp.topic();
+            List<Integer> topicData = partitionsByTopic.get(topic);
+            if (topicData == null) {
+                topicData = new ArrayList<Integer>();
+                partitionsByTopic.put(topic, topicData);
+            }
+            topicData.add(tp.partition());
+        }
+        return  partitionsByTopic;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 2f98192..1a55242 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -7,11 +7,13 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -68,7 +70,7 @@ public class NetworkClientTest {
 
     @Test
     public void testSimpleRequestResponse() {
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000);
+        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
         RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct());
         ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
new file mode 100644
index 0000000..df37fc6
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class RequestResponseTest {
+
+    @Test
+    public void testSerialization() throws Exception{
+        List<AbstractRequestResponse> requestList = Arrays.asList(
+                createRequestHeader(),
+                createResponseHeader(),
+                createConsumerMetadataRequest(),
+                createConsumerMetadataResponse(),
+                createFetchRequest(),
+                createFetchResponse(),
+                createHeartBeatRequest(),
+                createHeartBeatResponse(),
+                createJoinGroupRequest(),
+                createJoinGroupResponse(),
+                createListOffsetRequest(),
+                createListOffsetResponse(),
+                createMetadataRequest(),
+                createMetadataResponse(),
+                createOffsetCommitRequest(),
+                createOffsetCommitResponse(),
+                createOffsetFetchRequest(),
+                createOffsetFetchResponse(),
+                createProduceRequest(),
+                createProduceResponse());
+
+        for (AbstractRequestResponse req: requestList) {
+            ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
+            req.writeTo(buffer);
+            buffer.rewind();
+            Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class);
+            AbstractRequestResponse deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer);
+            assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized);
+            assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.",
+                    req.hashCode(), deserialized.hashCode());
+        }
+    }
+
+    private AbstractRequestResponse createRequestHeader() {
+        return new RequestHeader((short)10, (short)1, "", 10);
+    }
+
+    private AbstractRequestResponse createResponseHeader() {
+        return new ResponseHeader(10);
+    }
+
+    private AbstractRequestResponse createConsumerMetadataRequest() {
+        return new ConsumerMetadataRequest("test-group");
+    }
+
+    private AbstractRequestResponse createConsumerMetadataResponse() {
+        return new ConsumerMetadataResponse((short)1, new Node(10, "host1", 2014));
+    }
+
+    private AbstractRequestResponse createFetchRequest() {
+        Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
+        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
+        return new FetchRequest(-1, 100, 100000, fetchData);
+    }
+
+    private AbstractRequestResponse createFetchResponse() {
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData((short)0, 1000000, ByteBuffer.allocate(10)));
+        return new FetchResponse(responseData);
+    }
+
+    private AbstractRequestResponse createHeartBeatRequest() {
+        return new HeartbeatRequest("group1", 1, "consumer1");
+    }
+
+    private AbstractRequestResponse createHeartBeatResponse() {
+        return new HeartbeatResponse((short)0);
+    }
+
+    private AbstractRequestResponse createJoinGroupRequest() {
+        return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), "consumer1", "strategy1");
+    }
+
+    private AbstractRequestResponse createJoinGroupResponse() {
+        return new JoinGroupResponse((short)0, 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
+    }
+
+    private AbstractRequestResponse createListOffsetRequest() {
+        Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
+        offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
+        return new ListOffsetRequest(-1, offsetData);
+    }
+
+    private AbstractRequestResponse createListOffsetResponse() {
+        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+        responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData((short)0, Arrays.asList(100L)));
+        return new ListOffsetResponse(responseData);
+    }
+
+    private AbstractRequestResponse createMetadataRequest() {
+        return new MetadataRequest(Arrays.asList("topic1"));
+    }
+
+    private AbstractRequestResponse createMetadataResponse() {
+        Node node = new Node(1, "host1", 1001);
+        Node[] replicas = new Node[1];
+        replicas[0] = node;
+        Node[] isr = new Node[1];
+        isr[0] = node;
+        Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)));
+        return new MetadataResponse(cluster);
+    }
+
+    private AbstractRequestResponse createOffsetCommitRequest() {
+        Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
+        commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, 1000000, ""));
+        return new OffsetCommitRequest("group1", 100, "consumer1", commitData);
+    }
+
+    private AbstractRequestResponse createOffsetCommitResponse() {
+        Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+        responseData.put(new TopicPartition("test", 0), (short)0);
+        return new OffsetCommitResponse(responseData);
+    }
+
+    private AbstractRequestResponse createOffsetFetchRequest() {
+        return new OffsetFetchRequest("group1", Arrays.asList(new TopicPartition("test11", 1)));
+    }
+
+    private AbstractRequestResponse createOffsetFetchResponse() {
+        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+        responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", (short)0));
+        return new OffsetFetchResponse(responseData);
+    }
+
+    private AbstractRequestResponse createProduceRequest() {
+        Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
+        produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
+        return new ProduceRequest((short)0, 5000, produceData);
+    }
+
+    private AbstractRequestResponse createProduceResponse() {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000));
+        return new ProduceResponse(responseData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
index dfad6e6..6d00ed0 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -41,9 +41,9 @@ object ConsumerMetadataRequest {
 
 case class ConsumerMetadataRequest(group: String,
                                    versionId: Short = ConsumerMetadataRequest.CurrentVersion,
-                                   override val correlationId: Int = 0,
+                                   correlationId: Int = 0,
                                    clientId: String = ConsumerMetadataRequest.DefaultClientId)
-  extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey), correlationId) {
+  extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey)) {
 
   def sizeInBytes =
     2 + /* versionId */

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index c72ca14..84f6017 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -40,8 +40,8 @@ object ConsumerMetadataResponse {
   
 }
 
-case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
-  extends RequestOrResponse(correlationId = correlationId) {
+case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int = 0)
+  extends RequestOrResponse() {
 
   def sizeInBytes =
     4 + /* correlationId */

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 7dacb20..5be393a 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -38,9 +38,9 @@ object ControlledShutdownRequest extends Logging {
 }
 
 case class ControlledShutdownRequest(val versionId: Short,
-                                     override val correlationId: Int,
+                                     val correlationId: Int,
                                      val brokerId: Int)
-  extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey), correlationId){
+  extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){
 
   def this(correlationId: Int, brokerId: Int) =
     this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 46ec3db..5e0a1cf 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -39,10 +39,10 @@ object ControlledShutdownResponse {
 }
 
 
-case class ControlledShutdownResponse(override val correlationId: Int,
+case class ControlledShutdownResponse(val correlationId: Int,
                                       val errorCode: Short = ErrorMapping.NoError,
                                       val partitionsRemaining: Set[TopicAndPartition])
-  extends RequestOrResponse(correlationId = correlationId) {
+  extends RequestOrResponse() {
   def sizeInBytes(): Int ={
     var size =
       4 /* correlation id */ +

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index a8b73ac..55a5982 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -60,13 +60,13 @@ object FetchRequest {
 }
 
 case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
-                                        override val correlationId: Int = FetchRequest.DefaultCorrelationId,
+                                        correlationId: Int = FetchRequest.DefaultCorrelationId,
                                         clientId: String = ConsumerConfig.DefaultClientId,
                                         replicaId: Int = Request.OrdinaryConsumerId,
                                         maxWait: Int = FetchRequest.DefaultMaxWait,
                                         minBytes: Int = FetchRequest.DefaultMinBytes,
                                         requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
-        extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) {
+        extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
 
   /**
    * Partitions the request info into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
new file mode 100644
index 0000000..fb022e8
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import org.apache.kafka.common.requests.AbstractRequestResponse
+
+private[kafka] abstract class GenericRequestOrResponseAndHeader(val header: AbstractRequestResponse,
+                                                                val body: AbstractRequestResponse,
+                                                                val name: String,
+                                                                override val requestId: Option[Short] = None)
+  extends RequestOrResponse(requestId) {
+
+  def writeTo(buffer: ByteBuffer) {
+    header.writeTo(buffer)
+    body.writeTo(buffer)
+  }
+
+  def sizeInBytes(): Int = {
+    header.sizeOf() + body.sizeOf();
+  }
+
+  override def toString(): String = {
+    describe(true)
+  }
+
+  override def describe(details: Boolean): String = {
+    val strBuffer = new StringBuilder
+    strBuffer.append("Name: " + name)
+    strBuffer.append("; header: " + header.toString)
+    strBuffer.append("; body: " + body.toString)
+    strBuffer.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
new file mode 100644
index 0000000..932418b
--- /dev/null
+++ b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import org.apache.kafka.common.requests.{HeartbeatResponse, ResponseHeader, HeartbeatRequest, RequestHeader}
+
+object HeartbeatRequestAndHeader {
+  def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = {
+    val header = RequestHeader.parse(buffer)
+    val body = HeartbeatRequest.parse(buffer)
+    new HeartbeatRequestAndHeader(header, body)
+  }
+}
+
+case class HeartbeatRequestAndHeader(override val header: RequestHeader, override  val body: HeartbeatRequest)
+  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) {
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val errorResponseHeader = new ResponseHeader(header.correlationId)
+    val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(errorResponseHeader, errorResponseBody)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
new file mode 100644
index 0000000..556f38d
--- /dev/null
+++ b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.common.requests.{ResponseHeader, HeartbeatResponse}
+import java.nio.ByteBuffer
+
+object HeartbeatResponseAndHeader {
+  def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = {
+    val header = ResponseHeader.parse(buffer)
+    val body = HeartbeatResponse.parse(buffer)
+    new HeartbeatResponseAndHeader(header, body)
+  }
+}
+
+case class HeartbeatResponseAndHeader(override val header: ResponseHeader, override val body: HeartbeatResponse)
+  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
new file mode 100644
index 0000000..9aea28c
--- /dev/null
+++ b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import org.apache.kafka.common.requests._
+import kafka.network.RequestChannel.Response
+import scala.Some
+
+object JoinGroupRequestAndHeader {
+  def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = {
+    val header = RequestHeader.parse(buffer)
+    val body = JoinGroupRequest.parse(buffer)
+    new JoinGroupRequestAndHeader(header, body)
+  }
+}
+
+case class JoinGroupRequestAndHeader(override val header: RequestHeader, override val body: JoinGroupRequest)
+  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) {
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val errorResponseHeader = new ResponseHeader(header.correlationId)
+    val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(errorResponseHeader, errorResponseBody)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
new file mode 100644
index 0000000..7389ae6
--- /dev/null
+++ b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.common.requests.{JoinGroupResponse, ResponseHeader}
+import java.nio.ByteBuffer
+
+object JoinGroupResponseAndHeader {
+  def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = {
+    val header = ResponseHeader.parse(buffer)
+    val body = JoinGroupResponse.parse(buffer)
+    new JoinGroupResponseAndHeader(header, body)
+  }
+}
+
+case class JoinGroupResponseAndHeader(override val header: ResponseHeader, override val body: JoinGroupResponse)
+  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 3e40817..4ff7e8f 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -129,13 +129,13 @@ object LeaderAndIsrRequest {
 }
 
 case class LeaderAndIsrRequest (versionId: Short,
-                                override val correlationId: Int,
+                                correlationId: Int,
                                 clientId: String,
                                 controllerId: Int,
                                 controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
                                 leaders: Set[Broker])
-    extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
+    extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
   def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int,
            controllerEpoch: Int, correlationId: Int, clientId: String) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index f636444..22ce48a 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -41,10 +41,10 @@ object LeaderAndIsrResponse {
 }
 
 
-case class LeaderAndIsrResponse(override val correlationId: Int,
+case class LeaderAndIsrResponse(correlationId: Int,
                                 responseMap: Map[(String, Int), Short],
                                 errorCode: Short = ErrorMapping.NoError)
-    extends RequestOrResponse(correlationId = correlationId) {
+    extends RequestOrResponse() {
   def sizeInBytes(): Int ={
     var size =
       4 /* correlation id */ +