You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:17 UTC

[20/37] git commit: kafka-1462; Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Guozhang Wang and Jay Kreps

kafka-1462; Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Guozhang Wang and Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc0e03f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc0e03f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc0e03f7

Branch: refs/heads/transactional_messaging
Commit: fc0e03f79131746da81c05c12e056862c08d79d4
Parents: 4ebcdfd
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jul 17 18:20:01 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jul 17 18:20:01 2014 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      |  12 +-
 .../java/org/apache/kafka/common/Cluster.java   |  15 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |   7 +-
 .../apache/kafka/common/protocol/Protocol.java  | 280 ++++++++++++++++++-
 .../kafka/common/protocol/types/Struct.java     |  67 +++++
 .../requests/AbstractRequestResponse.java       |  66 +++++
 .../requests/ConsumerMetadataRequest.java       |  47 ++++
 .../requests/ConsumerMetadataResponse.java      |  69 +++++
 .../kafka/common/requests/FetchRequest.java     | 132 +++++++++
 .../kafka/common/requests/FetchResponse.java    | 110 ++++++++
 .../kafka/common/requests/HeartbeatRequest.java |  64 +++++
 .../common/requests/HeartbeatResponse.java      |  45 +++
 .../kafka/common/requests/JoinGroupRequest.java |  87 ++++++
 .../common/requests/JoinGroupResponse.java      | 102 +++++++
 .../common/requests/ListOffsetRequest.java      | 114 ++++++++
 .../common/requests/ListOffsetResponse.java     | 108 +++++++
 .../kafka/common/requests/MetadataRequest.java  |  29 +-
 .../kafka/common/requests/MetadataResponse.java |  98 +++++--
 .../common/requests/OffsetCommitRequest.java    | 180 ++++++++++++
 .../common/requests/OffsetCommitResponse.java   |  87 ++++++
 .../common/requests/OffsetFetchRequest.java     |  98 +++++++
 .../common/requests/OffsetFetchResponse.java    | 107 +++++++
 .../kafka/common/requests/ProduceRequest.java   | 124 +++++---
 .../kafka/common/requests/ProduceResponse.java  |  96 ++++---
 .../kafka/common/requests/RequestHeader.java    |  49 ++--
 .../kafka/common/requests/ResponseHeader.java   |  22 +-
 .../kafka/common/utils/CollectionUtils.java     |  62 ++++
 .../apache/kafka/clients/NetworkClientTest.java |   4 +-
 .../common/requests/RequestResponseTest.java    | 173 ++++++++++++
 .../kafka/api/ConsumerMetadataRequest.scala     |   4 +-
 .../kafka/api/ConsumerMetadataResponse.scala    |   4 +-
 .../kafka/api/ControlledShutdownRequest.scala   |   4 +-
 .../kafka/api/ControlledShutdownResponse.scala  |   4 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   4 +-
 .../api/GenericRequestOrResponseAndHeader.scala |  45 +++
 .../kafka/api/HeartbeatRequestAndHeader.scala   |  39 +++
 .../kafka/api/HeartbeatResponseAndHeader.scala  |  28 ++
 .../kafka/api/JoinGroupRequestAndHeader.scala   |  40 +++
 .../kafka/api/JoinGroupResponseAndHeader.scala  |  28 ++
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |   4 +-
 .../scala/kafka/api/LeaderAndIsrResponse.scala  |   4 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |  38 ++-
 .../scala/kafka/api/OffsetCommitResponse.scala  |   4 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |   4 +-
 .../scala/kafka/api/OffsetFetchResponse.scala   |   4 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |   4 +-
 .../main/scala/kafka/api/OffsetResponse.scala   |   4 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   4 +-
 .../main/scala/kafka/api/ProducerResponse.scala |   4 +-
 core/src/main/scala/kafka/api/RequestKeys.scala |   7 +-
 .../scala/kafka/api/RequestOrResponse.scala     |   2 +-
 .../scala/kafka/api/StopReplicaRequest.scala    |   4 +-
 .../scala/kafka/api/StopReplicaResponse.scala   |   4 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  |   4 +-
 .../scala/kafka/api/TopicMetadataResponse.scala |   4 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |   4 +-
 .../kafka/api/UpdateMetadataResponse.scala      |   4 +-
 .../controller/ControllerChannelManager.scala   |   8 +-
 .../kafka/javaapi/OffsetCommitRequest.scala     |   2 -
 .../kafka/javaapi/TopicMetadataRequest.scala    |   4 +-
 .../api/RequestResponseSerializationTest.scala  |  74 ++++-
 61 files changed, 2610 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index a016269..8ebe7ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -276,13 +277,16 @@ public class Sender implements Runnable {
      * Create a produce request from the given record batches
      */
     private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
-        ProduceRequest request = new ProduceRequest(acks, timeout);
+        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
         Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
         for (RecordBatch batch : batches) {
-            batch.records.buffer().flip();
-            request.add(batch.topicPartition, batch.records);
-            recordsByPartition.put(batch.topicPartition, batch);
+            TopicPartition tp = batch.topicPartition;
+            ByteBuffer recordsBuffer = batch.records.buffer();
+            recordsBuffer.flip();
+            produceRecordsByPartition.put(tp, recordsBuffer);
+            recordsByPartition.put(tp, batch);
         }
+        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
         RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct());
         return new ClientRequest(now, acks != 0, send, recordsByPartition);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c62707a..d3299b9 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -15,12 +15,7 @@ package org.apache.kafka.common;
 import org.apache.kafka.common.utils.Utils;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
@@ -143,6 +138,14 @@ public final class Cluster {
         return this.partitionsByNode.get(nodeId);
     }
 
+    /**
+     * Get all topics.
+     * @return a set of all topics
+     */
+    public Set<String> topics() {
+        return this.partitionsByTopic.keySet();
+    }
+
     @Override
     public String toString() {
         return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6fe7573..109fc96 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,8 +30,11 @@ public enum ApiKeys {
     METADATA(3, "metadata"),
     LEADER_AND_ISR(4, "leader_and_isr"),
     STOP_REPLICA(5, "stop_replica"),
-    OFFSET_COMMIT(6, "offset_commit"),
-    OFFSET_FETCH(7, "offset_fetch");
+    OFFSET_COMMIT(8, "offset_commit"),
+    OFFSET_FETCH(9, "offset_fetch"),
+    CONSUMER_METADATA(10, "consumer_metadata"),
+    JOIN_GROUP(11, "join_group"),
+    HEARTBEAT(12, "heartbeat");
 
     private static ApiKeys[] codeToType;
     public static int MAX_API_KEY = -1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 044b030..7517b87 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -104,6 +104,264 @@ public class Protocol {
     public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
     public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
 
+    /* Offset commit api */
+    public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                   INT32,
+                                                                                   "Topic partition id."),
+                                                                         new Field("offset",
+                                                                                   INT64,
+                                                                                   "Message offset to be committed."),
+                                                                         new Field("timestamp",
+                                                                                   INT64,
+                                                                                   "Timestamp of the commit"),
+                                                                         new Field("metadata",
+                                                                                   STRING,
+                                                                                   "Any associated metadata the client wants to keep."));
+
+    public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                                STRING,
+                                                                                "Topic to commit."),
+                                                                       new Field("partitions",
+                                                                                 new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
+                                                                                 "Partitions to commit offsets."));
+
+    public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                         STRING,
+                                                                         "The consumer group id."),
+                                                               new Field("topics",
+                                                                         new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+                                                                         "Topics to commit offsets."));
+
+    public static Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
+                                                                         STRING,
+                                                                         "The consumer group id."),
+                                                               new Field("group_generation_id",
+                                                                         INT32,
+                                                                         "The generation of the consumer group."),
+                                                               new Field("consumer_id",
+                                                                         STRING,
+                                                                         "The consumer id assigned by the group coordinator."),
+                                                               new Field("topics",
+                                                                         new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+                                                                         "Topics to commit offsets."));
+
+    public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                    INT32,
+                                                                                    "Topic partition id."),
+                                                                          new Field("error_code",
+                                                                                    INT16));
+
+    public static Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                      new Field("partition_responses",
+                                                                                new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
+
+    public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                          new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+    public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
+    /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
+    public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+
+    /* Offset fetch api */
+    public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                  INT32,
+                                                                                  "Topic partition id."));
+
+    public static Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                               STRING,
+                                                                               "Topic to fetch offset."),
+                                                                     new Field("partitions",
+                                                                                new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
+                                                                                "Partitions to fetch offsets."));
+
+    public static Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                        STRING,
+                                                                        "The consumer group id."),
+                                                              new Field("topics",
+                                                                        new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
+                                                                        "Topics to fetch offsets."));
+
+    public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                   INT32,
+                                                                                   "Topic partition id."),
+                                                                         new Field("offset",
+                                                                                   INT64,
+                                                                                   "Last committed message offset."),
+                                                                         new Field("metadata",
+                                                                                   STRING,
+                                                                                   "Any associated metadata the client wants to keep."),
+                                                                         new Field("error_code",
+                                                                                   INT16));
+
+    public static Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                     new Field("partition_responses",
+                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+    public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                         new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+    public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
+    public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
+
+    /* List offset api */
+    public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                 INT32,
+                                                                                 "Topic partition id."),
+                                                                       new Field("timestamp",
+                                                                                 INT64,
+                                                                                 "Timestamp."),
+                                                                       new Field("max_num_offsets",
+                                                                                 INT32,
+                                                                                 "Maximum offsets to return."));
+
+    public static Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                             STRING,
+                                                                             "Topic to list offset."),
+                                                                   new Field("partitions",
+                                                                             new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
+                                                                             "Partitions to list offset."));
+
+    public static Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+                                                                       INT32,
+                                                                       "Broker id of the follower. For normal consumers, use -1."),
+                                                             new Field("topics",
+                                                                        new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
+                                                                        "Topics to list offsets."));
+
+    public static Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                  INT32,
+                                                                                  "Topic partition id."),
+                                                                        new Field("error_code",
+                                                                                  INT16),
+                                                                        new Field("offsets",
+                                                                                  new ArrayOf(INT64),
+                                                                                  "A list of offsets."));
+
+    public static Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                    new Field("partition_responses",
+                                                                              new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+    public static Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                  new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+    public static Schema[] LIST_OFFSET_REQUEST = new Schema[] { LIST_OFFSET_REQUEST_V0 };
+    public static Schema[] LIST_OFFSET_RESPONSE = new Schema[] { LIST_OFFSET_RESPONSE_V0 };
+
+    /* Fetch api */
+    public static Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                           INT32,
+                                                                           "Topic partition id."),
+                                                                 new Field("fetch_offset",
+                                                                           INT64,
+                                                                           "Message offset."),
+                                                                 new Field("max_bytes",
+                                                                           INT32,
+                                                                           "Maximum bytes to fetch."));
+
+    public static Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                       STRING,
+                                                                       "Topic to fetch."),
+                                                             new Field("partitions",
+                                                                       new ArrayOf(FETCH_REQUEST_PARTITION_V0),
+                                                                       "Partitions to fetch."));
+
+    public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
+                                                                 INT32,
+                                                                 "Broker id of the follower. For normal consumers, use -1."),
+                                                       new Field("max_wait_time",
+                                                                 INT32,
+                                                                 "Maximum time in ms to wait for the response."),
+                                                       new Field("min_bytes",
+                                                                 INT32,
+                                                                 "Minimum bytes to accumulate in the response."),
+                                                       new Field("topics",
+                                                                 new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+                                                                 "Topics to fetch."));
+
+    public static Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                            INT32,
+                                                                            "Topic partition id."),
+                                                                  new Field("error_code",
+                                                                            INT16),
+                                                                  new Field("high_watermark",
+                                                                            INT64,
+                                                                            "Last committed offset."),
+                                                                  new Field("record_set", BYTES));
+
+    public static Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                              new Field("partition_responses",
+                                                                        new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+
+    public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                  new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
+    public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 };
+    public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 };
+
+    /* Consumer metadata api */
+    public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                             STRING,
+                                                                             "The consumer group id."));
+
+    public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code",
+                                                                              INT16),
+                                                                    new Field("coordinator",
+                                                                              BROKER,
+                                                                              "Host and port information for the coordinator for a consumer group."));
+
+    public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 };
+    public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 };
+
+    /* Join group api */
+    public static Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                      STRING,
+                                                                      "The consumer group id."),
+                                                            new Field("session_timeout",
+                                                                      INT32,
+                                                                      "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+                                                            new Field("topics",
+                                                                      new ArrayOf(STRING),
+                                                                      "An array of topics to subscribe to."),
+                                                            new Field("consumer_id",
+                                                                      STRING,
+                                                                      "The assigned consumer id or an empty string for a new consumer."),
+                                                            new Field("partition_assignment_strategy",
+                                                                      STRING,
+                                                                      "The strategy for the coordinator to assign partitions."));
+
+    public static Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                   new Field("partitions", new ArrayOf(INT32)));
+    public static Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code",
+                                                                       INT16),
+                                                             new Field("group_generation_id",
+                                                                       INT32,
+                                                                       "The generation of the consumer group."),
+                                                             new Field("consumer_id",
+                                                                       STRING,
+                                                                       "The consumer id assigned by the group coordinator."),
+                                                             new Field("assigned_partitions",
+                                                                       new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+
+    public static Schema[] JOIN_GROUP_REQUEST = new Schema[] { JOIN_GROUP_REQUEST_V0 };
+    public static Schema[] JOIN_GROUP_RESPONSE = new Schema[] { JOIN_GROUP_RESPONSE_V0 };
+
+    /* Heartbeat api */
+    public static Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                      STRING,
+                                                                      "The consumer group id."),
+                                                            new Field("group_generation_id",
+                                                                      INT32,
+                                                                      "The generation of the consumer group."),
+                                                            new Field("consumer_id",
+                                                                      STRING,
+                                                                      "The consumer id assigned by the group coordinator."));
+
+    public static Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code",
+                                                                       INT16));
+
+    public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
+    public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions */
     public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -113,22 +371,28 @@ public class Protocol {
 
     static {
         REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
-        REQUESTS[ApiKeys.FETCH.id] = new Schema[] {};
-        REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+        REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
+        REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
         REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
         REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
         REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
-        REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
-        REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+        REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
+        REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
+        REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
+        REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
+        REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
-        RESPONSES[ApiKeys.FETCH.id] = new Schema[] {};
-        RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+        RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
+        RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
         RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
         RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
         RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
-        RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
-        RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+        RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
+        RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
+        RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
+        RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
+        RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 8cecba5..444e69e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -83,6 +83,15 @@ public class Struct {
         return getFieldOrDefault(field);
     }
 
+    /**
+     * Check if the struct contains a field.
+     * @param name
+     * @return
+     */
+    public boolean hasField(String name) {
+        return schema.get(name) != null;
+    }
+
     public Struct getStruct(Field field) {
         return (Struct) get(field);
     }
@@ -107,6 +116,22 @@ public class Struct {
         return (Integer) get(name);
     }
 
+    public Long getLong(Field field) {
+        return (Long) get(field);
+    }
+
+    public Long getLong(String name) {
+        return (Long) get(name);
+    }
+
+    public ByteBuffer getBytes(Field field) {
+        return (ByteBuffer) get(field);
+    }
+
+    public ByteBuffer getBytes(String name) {
+        return (ByteBuffer) get(name);
+    }
+
     public Object[] getArray(Field field) {
         return (Object[]) get(field);
     }
@@ -253,4 +278,46 @@ public class Struct {
         return b.toString();
     }
 
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        for (int i = 0; i < this.values.length; i++) {
+            Field f = this.schema.get(i);
+            if (f.type() instanceof ArrayOf) {
+                Object[] arrayObject = (Object []) this.get(f);
+                for (Object arrayItem: arrayObject)
+                    result = prime * result + arrayItem.hashCode();
+            } else {
+                result = prime * result + this.get(f).hashCode();
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Struct other = (Struct) obj;
+        if (schema != other.schema)
+            return false;
+        for (int i = 0; i < this.values.length; i++) {
+            Field f = this.schema.get(i);
+            Boolean result;
+            if (f.type() instanceof ArrayOf) {
+                result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f));
+            } else {
+                result = this.get(f).equals(other.get(f));
+            }
+            if (!result)
+                return false;
+        }
+        return true;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
new file mode 100644
index 0000000..37aff6c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public abstract class AbstractRequestResponse {
+    protected final Struct struct;
+
+
+    public AbstractRequestResponse(Struct struct) {
+        this.struct = struct;
+    }
+
+    public Struct toStruct() {
+        return struct;
+    }
+
+    /**
+     * Get the serialized size of this object
+     */
+    public int sizeOf() {
+        return struct.sizeOf();
+    }
+
+    /**
+     * Write this object to a buffer
+     */
+    public void writeTo(ByteBuffer buffer) {
+        struct.writeTo(buffer);
+    }
+
+    @Override
+    public String toString() {
+        return struct.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return struct.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        AbstractRequestResponse other = (AbstractRequestResponse) obj;
+        return struct.equals(other.struct);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
new file mode 100644
index 0000000..99b52c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class ConsumerMetadataRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
+    private static String GROUP_ID_KEY_NAME = "group_id";
+
+    private final String groupId;
+
+    public ConsumerMetadataRequest(String groupId) {
+        super(new Struct(curSchema));
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        this.groupId = groupId;
+    }
+
+    public ConsumerMetadataRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
+        return new ConsumerMetadataRequest(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
new file mode 100644
index 0000000..8b8f591
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class ConsumerMetadataResponse extends AbstractRequestResponse {
+    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static String COORDINATOR_KEY_NAME = "coordinator";
+
+    // coordinator level field names
+    private static String NODE_ID_KEY_NAME = "node_id";
+    private static String HOST_KEY_NAME = "host";
+    private static String PORT_KEY_NAME = "port";
+
+    private final short errorCode;
+    private final Node node;
+
+    public ConsumerMetadataResponse(short errorCode, Node node) {
+        super(new Struct(curSchema));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        this.errorCode = errorCode;
+        this.node = node;
+    }
+
+    public ConsumerMetadataResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        node = new Node(nodeId, host, port);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
+        return new ConsumerMetadataResponse(((Struct) curSchema.read(buffer)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
new file mode 100644
index 0000000..2fc471f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
+    private static String REPLICA_ID_KEY_NAME = "replica_id";
+    private static String MAX_WAIT_KEY_NAME = "max_wait_time";
+    private static String MIN_BYTES_KEY_NAME = "min_bytes";
+    private static String TOPICS_KEY_NAME = "topics";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+    private static String MAX_BYTES_KEY_NAME = "max_bytes";
+
+    private final int replicaId;
+    private final int maxWait;
+    private final int minBytes;
+    private final Map<TopicPartition, PartitionData> fetchData;
+
+    public static final class PartitionData {
+        public final long offset;
+        public final int maxBytes;
+
+        public PartitionData(long offset, int maxBytes) {
+            this.offset = offset;
+            this.maxBytes = maxBytes;
+        }
+    }
+
+    public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+        super(new Struct(curSchema));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
+
+        struct.set(REPLICA_ID_KEY_NAME, replicaId);
+        struct.set(MAX_WAIT_KEY_NAME, maxWait);
+        struct.set(MIN_BYTES_KEY_NAME, minBytes);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
+                partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        this.replicaId = replicaId;
+        this.maxWait = maxWait;
+        this.minBytes = minBytes;
+        this.fetchData = fetchData;
+    }
+
+    public FetchRequest(Struct struct) {
+        super(struct);
+        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+        maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
+        minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
+        fetchData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
+                int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
+                PartitionData partitionData = new PartitionData(offset, maxBytes);
+                fetchData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public int maxWait() {
+        return maxWait;
+    }
+
+    public int minBytes() {
+        return minBytes;
+    }
+
+    public Map<TopicPartition, PartitionData> fetchData() {
+        return fetchData;
+    }
+
+    public static FetchRequest parse(ByteBuffer buffer) {
+        return new FetchRequest(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
new file mode 100644
index 0000000..f719010
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchResponse extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
+    private static String RESPONSES_KEY_NAME = "responses";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static String HIGH_WATERMARK_KEY_NAME = "high_watermark";
+    private static String RECORD_SET_KEY_NAME = "record_set";
+
+    private final Map<TopicPartition, PartitionData> responseData;
+
+    public static final class PartitionData {
+        public final short errorCode;
+        public final long highWatermark;
+        public final ByteBuffer recordSet;
+
+        public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
+            this.errorCode = errorCode;
+            this.highWatermark = highWatermark;
+            this.recordSet = recordSet;
+        }
+    }
+
+    public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
+        super(new Struct(curSchema));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+                partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+        this.responseData = responseData;
+    }
+
+    public FetchResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
+                ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+                PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+                responseData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    public Map<TopicPartition, PartitionData> responseData() {
+        return responseData;
+    }
+
+    public static FetchResponse parse(ByteBuffer buffer) {
+        return new FetchResponse(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
new file mode 100644
index 0000000..9512db2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class HeartbeatRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
+    private static String GROUP_ID_KEY_NAME = "group_id";
+    private static String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+
+    private final String groupId;
+    private final int groupGenerationId;
+    private final String consumerId;
+
+    public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
+        super(new Struct(curSchema));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        this.groupId = groupId;
+        this.groupGenerationId = groupGenerationId;
+        this.consumerId = consumerId;
+    }
+
+    public HeartbeatRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
+        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int groupGenerationId() {
+        return groupGenerationId;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public static HeartbeatRequest parse(ByteBuffer buffer) {
+        return new HeartbeatRequest(((Struct) curSchema.read(buffer)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
new file mode 100644
index 0000000..8997ffc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class HeartbeatResponse extends AbstractRequestResponse {
+    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+
+    private final short errorCode;
+    public HeartbeatResponse(short errorCode) {
+        super(new Struct(curSchema));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        this.errorCode = errorCode;
+    }
+
+    public HeartbeatResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static HeartbeatResponse parse(ByteBuffer buffer) {
+        return new HeartbeatResponse(((Struct) curSchema.read(buffer)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
new file mode 100644
index 0000000..d6e91f3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JoinGroupRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
+    private static String GROUP_ID_KEY_NAME = "group_id";
+    private static String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
+    private static String TOPICS_KEY_NAME = "topics";
+    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+
+    private final String groupId;
+    private final int sessionTimeout;
+    private final List<String> topics;
+    private final String consumerId;
+    private final String strategy;
+
+    public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
+        super(new Struct(curSchema));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
+        struct.set(TOPICS_KEY_NAME, topics.toArray());
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(STRATEGY_KEY_NAME, strategy);
+        this.groupId = groupId;
+        this.sessionTimeout = sessionTimeout;
+        this.topics = topics;
+        this.consumerId = consumerId;
+        this.strategy = strategy;
+    }
+
+    public JoinGroupRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
+        Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
+        topics = new ArrayList<String>();
+        for (Object topic: topicsArray)
+            topics.add((String) topic);
+        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        strategy = struct.getString(STRATEGY_KEY_NAME);
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int sessionTimeout() {
+        return sessionTimeout;
+    }
+
+    public List<String> topics() {
+        return topics;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public String strategy() {
+        return strategy;
+    }
+
+    public static JoinGroupRequest parse(ByteBuffer buffer) {
+        return new JoinGroupRequest(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
new file mode 100644
index 0000000..efe8979
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class JoinGroupResponse extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static String GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partitions";
+
+    public static int UNKNOWN_GENERATION_ID = -1;
+    public static String UNKNOWN_CONSUMER_ID = "";
+
+    private final short errorCode;
+    private final int generationId;
+    private final String consumerId;
+    private final List<TopicPartition> assignedPartitions;
+
+    public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
+        super(new Struct(curSchema));
+
+        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
+
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
+            Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
+
+        this.errorCode = errorCode;
+        this.generationId = generationId;
+        this.consumerId = consumerId;
+        this.assignedPartitions = assignedPartitions;
+    }
+
+    public JoinGroupResponse(short errorCode) {
+        this(errorCode, UNKNOWN_GENERATION_ID, UNKNOWN_CONSUMER_ID, Collections.<TopicPartition>emptyList());
+    }
+
+    public JoinGroupResponse(Struct struct) {
+        super(struct);
+        assignedPartitions = new ArrayList<TopicPartition>();
+        for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
+            Struct topicData = (Struct) topicDataObj;
+            String topic = topicData.getString(TOPIC_KEY_NAME);
+            for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
+                assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
+        }
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public int generationId() {
+        return generationId;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public List<TopicPartition> assignedPartitions() {
+        return assignedPartitions;
+    }
+
+    public static JoinGroupResponse parse(ByteBuffer buffer) {
+        return new JoinGroupResponse(((Struct) curSchema.read(buffer)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
new file mode 100644
index 0000000..99364c1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ListOffsetRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
+    private static String REPLICA_ID_KEY_NAME = "replica_id";
+    private static String TOPICS_KEY_NAME = "topics";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String TIMESTAMP_KEY_NAME = "timestamp";
+    private static String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
+
+    private final int replicaId;
+    private final Map<TopicPartition, PartitionData> offsetData;
+
+    public static final class PartitionData {
+        public final long timestamp;
+        public final int maxNumOffsets;
+
+        public PartitionData(long timestamp, int maxNumOffsets) {
+            this.timestamp = timestamp;
+            this.maxNumOffsets = maxNumOffsets;
+        }
+    }
+
+    public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(curSchema));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+        struct.set(REPLICA_ID_KEY_NAME, replicaId);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData offsetPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
+                partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        this.replicaId = replicaId;
+        this.offsetData = offsetData;
+    }
+
+    public ListOffsetRequest(Struct struct) {
+        super(struct);
+        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+        offsetData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+                int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
+                PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets);
+                offsetData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public Map<TopicPartition, PartitionData> offsetData() {
+        return offsetData;
+    }
+
+    public static ListOffsetRequest parse(ByteBuffer buffer) {
+        return new ListOffsetRequest(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
new file mode 100644
index 0000000..ac23971
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ListOffsetResponse extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
+    private static String RESPONSES_KEY_NAME = "responses";
+
+    // topic level field names
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+    // partition level field names
+    private static String PARTITION_KEY_NAME = "partition";
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static String OFFSETS_KEY_NAME = "offsets";
+
+    private final Map<TopicPartition, PartitionData> responseData;
+
+    public static final class PartitionData {
+        public final short errorCode;
+        public final List<Long> offsets;
+
+        public PartitionData(short errorCode, List<Long> offsets) {
+            this.errorCode = errorCode;
+            this.offsets = offsets;
+        }
+    }
+
+    public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
+        super(new Struct(curSchema));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData offsetPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode);
+                partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+        this.responseData = responseData;
+    }
+
+    public ListOffsetResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
+                List<Long> offsetsList = new ArrayList<Long>();
+                for (Object offset: offsets)
+                    offsetsList.add((Long) offset);
+                PartitionData partitionData = new PartitionData(errorCode, offsetsList);
+                responseData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    public Map<TopicPartition, PartitionData> responseData() {
+        return responseData;
+    }
+
+    public static ListOffsetResponse parse(ByteBuffer buffer) {
+        return new ListOffsetResponse(((Struct) curSchema.read(buffer)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index f35bd87..b22ca1d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -12,26 +12,41 @@
  */
 package org.apache.kafka.common.requests;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-public class MetadataRequest {
+public class MetadataRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
+    private static String TOPICS_KEY_NAME = "topics";
 
     private final List<String> topics;
 
     public MetadataRequest(List<String> topics) {
+        super(new Struct(curSchema));
+        struct.set(TOPICS_KEY_NAME, topics.toArray());
         this.topics = topics;
     }
 
-    public Struct toStruct() {
-        String[] ts = new String[topics.size()];
-        topics.toArray(ts);
-        Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
-        body.set("topics", topics.toArray());
-        return body;
+    public MetadataRequest(Struct struct) {
+        super(struct);
+        Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
+        topics = new ArrayList<String>();
+        for (Object topicObj: topicArray) {
+            topics.add((String) topicObj);
+        }
     }
 
+    public List<String> topics() {
+        return topics;
+    }
+
+    public static MetadataRequest parse(ByteBuffer buffer) {
+        return new MetadataRequest(((Struct) curSchema.read(buffer)));
+    }
 }