You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/19 04:13:25 UTC

[4/8] kafka git commit: MINOR: Move request/response schemas to the corresponding object representation

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 39c027b..4a60c94 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 
@@ -28,6 +31,12 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+
 public class FetchRequest extends AbstractRequest {
     public static final int CONSUMER_REPLICA_ID = -1;
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
@@ -40,14 +49,101 @@ public class FetchRequest extends AbstractRequest {
     private static final String MAX_BYTES_KEY_NAME = "max_bytes";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
     private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
 
+    private static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(
+            PARTITION_ID,
+            new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."),
+            new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch."));
+
+    // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+    private static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(
+            PARTITION_ID,
+            new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."),
+            new Field(LOG_START_OFFSET_KEY_NAME, INT64, "Earliest available offset of the follower replica. " +
+                            "The field is only used when request is sent by follower. "),
+            new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch."));
+
+    private static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_REQUEST_PARTITION_V0), "Partitions to fetch."));
+
+    private static final Schema FETCH_REQUEST_TOPIC_V5 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_REQUEST_PARTITION_V5), "Partitions to fetch."));
+
+    private static final Schema FETCH_REQUEST_V0 = new Schema(
+            new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."),
+            new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."),
+            new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch."));
+
+    // The V1 Fetch Request body is the same as V0.
+    // Only the version number is incremented to indicate a newer client
+    private static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
+    // The V2 Fetch Request body is the same as V1.
+    // Only the version number is incremented to indicate the client support message format V1 which uses
+    // relative offset and has timestamp.
+    private static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
+    // Fetch Request V3 added top level max_bytes field - the total size of partition data to accumulate in response.
+    // The partition ordering is now relevant - partitions will be processed in order they appear in request.
+    private static final Schema FETCH_REQUEST_V3 = new Schema(
+            new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."),
+            new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."),
+            new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."),
+            new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+                    "if the first message in the first non-empty partition of the fetch is larger than this " +
+                    "value, the message will still be returned to ensure that progress can be made."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch in the order provided."));
+
+    // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response).
+    private static final Schema FETCH_REQUEST_V4 = new Schema(
+            new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."),
+            new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."),
+            new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."),
+            new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+                    "if the first message in the first non-empty partition of the fetch is larger than this " +
+                    "value, the message will still be returned to ensure that progress can be made."),
+            new Field(ISOLATION_LEVEL_KEY_NAME, INT8, "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+                    "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+                    "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+                    "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+                    "consumers to discard ABORTED transactional records"),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch in the order provided."));
+
+    // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed.
+    private static final Schema FETCH_REQUEST_V5 = new Schema(
+            new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."),
+            new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."),
+            new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."),
+            new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+                    "if the first message in the first non-empty partition of the fetch is larger than this " +
+                    "value, the message will still be returned to ensure that progress can be made."),
+            new Field(ISOLATION_LEVEL_KEY_NAME, INT8, "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+                    "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+                    "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+                    "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+                    "consumers to discard ABORTED transactional records"),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V5), "Topics to fetch in the order provided."));
+
+    /**
+     * The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
+     */
+    private static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
+            FETCH_REQUEST_V5, FETCH_REQUEST_V6};
+    };
+
     // default values for older versions where a request level limit did not exist
     public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
     public static final long INVALID_LOG_START_OFFSET = -1L;
@@ -191,10 +287,10 @@ public class FetchRequest extends AbstractRequest {
         fetchData = new LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            String topic = topicResponse.get(TOPIC_NAME);
             for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                int partition = partitionResponse.get(PARTITION_ID);
                 long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
                 int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
                 long logStartOffset = partitionResponse.hasField(LOG_START_OFFSET_KEY_NAME) ?
@@ -266,12 +362,12 @@ public class FetchRequest extends AbstractRequest {
         List<Struct> topicArray = new ArrayList<>();
         for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
+            topicData.set(TOPIC_NAME, topicEntry.topic);
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(PARTITION_ID, partitionEntry.getKey());
                 partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.fetchOffset);
                 if (partitionData.hasField(LOG_START_OFFSET_KEY_NAME))
                     partitionData.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 281ad44..417e845 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -22,9 +22,10 @@ import org.apache.kafka.common.network.MultiSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.Records;
 
 import java.nio.ByteBuffer;
@@ -33,6 +34,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.RECORDS;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 /**
  * This wrapper supports all versions of the Fetch API
  */
@@ -41,13 +50,10 @@ public class FetchResponse extends AbstractResponse {
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
     // partition level field names
     private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
     private static final String LAST_STABLE_OFFSET_KEY_NAME = "last_stable_offset";
     private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
@@ -58,7 +64,93 @@ public class FetchResponse extends AbstractResponse {
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String FIRST_OFFSET_KEY_NAME = "first_offset";
 
-    private static final int DEFAULT_THROTTLE_TIME = 0;
+    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE,
+            new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset."));
+    private static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(
+            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V0),
+            new Field(RECORD_SET_KEY_NAME, RECORDS));
+
+    private static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+
+    private static final Schema FETCH_RESPONSE_V0 = new Schema(
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
+    private static final Schema FETCH_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+    // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
+    // record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1
+    // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
+    private static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
+    private static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
+
+    // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
+    // last stable offset). It also exposes messages with magic v2 (along with older formats).
+    private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "The producer id associated with the aborted transactions"),
+            new Field(FIRST_OFFSET_KEY_NAME, INT64, "The first offset in the aborted transaction"));
+
+    private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
+
+    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE,
+            new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset."),
+            new Field(LAST_STABLE_OFFSET_KEY_NAME, INT64, "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
+                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
+            new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
+
+    // FETCH_RESPONSE_PARTITION_HEADER_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE,
+            new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset."),
+            new Field(LAST_STABLE_OFFSET_KEY_NAME, INT64, "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
+                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
+            new Field(LOG_START_OFFSET_KEY_NAME, INT64, "Earliest available offset."),
+            new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V5)));
+
+    private static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
+            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V4),
+            new Field(RECORD_SET_KEY_NAME, RECORDS));
+
+    private static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema(
+            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V5),
+            new Field(RECORD_SET_KEY_NAME, RECORDS));
+
+    private static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
+
+    private static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
+
+    private static final Schema FETCH_RESPONSE_V4 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
+
+    private static final Schema FETCH_RESPONSE_V5 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
+
+    /**
+     * The body of FETCH_RESPONSE_V6 is the same as FETCH_RESPONSE_V5.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
+     */
+    private static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
+            FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6};
+    }
+
+
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
@@ -190,12 +282,12 @@ public class FetchResponse extends AbstractResponse {
         LinkedHashMap<TopicPartition, PartitionData> responseData = new LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            String topic = topicResponse.get(TOPIC_NAME);
             for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
-                Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
+                int partition = partitionResponseHeader.get(PARTITION_ID);
+                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
                 long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
                 long lastStableOffset = INVALID_LAST_STABLE_OFFSET;
                 if (partitionResponseHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME))
@@ -226,7 +318,7 @@ public class FetchResponse extends AbstractResponse {
             }
         }
         this.responseData = responseData;
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
     }
 
     @Override
@@ -266,7 +358,7 @@ public class FetchResponse extends AbstractResponse {
     private static void addResponseData(Struct struct, int throttleTimeMs, String dest, List<Send> sends) {
         Object[] allTopicData = struct.getArray(RESPONSES_KEY_NAME);
 
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME)) {
+        if (struct.hasField(THROTTLE_TIME_MS)) {
             ByteBuffer buffer = ByteBuffer.allocate(8);
             buffer.putInt(throttleTimeMs);
             buffer.putInt(allTopicData.length);
@@ -284,12 +376,12 @@ public class FetchResponse extends AbstractResponse {
     }
 
     private static void addTopicData(String dest, List<Send> sends, Struct topicData) {
-        String topic = topicData.getString(TOPIC_KEY_NAME);
+        String topic = topicData.get(TOPIC_NAME);
         Object[] allPartitionData = topicData.getArray(PARTITIONS_KEY_NAME);
 
         // include the topic header and the count for the number of partitions
-        ByteBuffer buffer = ByteBuffer.allocate(Type.STRING.sizeOf(topic) + 4);
-        Type.STRING.write(buffer, topic);
+        ByteBuffer buffer = ByteBuffer.allocate(STRING.sizeOf(topic) + 4);
+        STRING.write(buffer, topic);
         buffer.putInt(allPartitionData.length);
         buffer.rewind();
         sends.add(new ByteBufferSend(dest, buffer));
@@ -313,13 +405,13 @@ public class FetchResponse extends AbstractResponse {
         sends.add(new RecordsSend(dest, records));
     }
 
-    private static Struct toStruct(short version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
+    private static Struct toStruct(short version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTimeMs) {
         Struct struct = new Struct(ApiKeys.FETCH.responseSchema(version));
         List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();
         for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
+            topicData.set(TOPIC_NAME, topicEntry.topic);
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
@@ -332,8 +424,8 @@ public class FetchResponse extends AbstractResponse {
                     errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME);
-                partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionDataHeader.set(ERROR_CODE_KEY_NAME, errorCode);
+                partitionDataHeader.set(PARTITION_ID, partitionEntry.getKey());
+                partitionDataHeader.set(ERROR_CODE, errorCode);
                 partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
 
                 if (partitionDataHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) {
@@ -363,9 +455,7 @@ public class FetchResponse extends AbstractResponse {
             topicArray.add(topicData);
         }
         struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index fbc7fa2..c94bcde 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -20,15 +20,32 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class FindCoordinatorRequest extends AbstractRequest {
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
     private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
 
+    private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
+            new Field("group_id", STRING, "The unique group id."));
+
+    private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
+            new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
+                            "for transactional producers, this is the transactional id)"),
+            new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
+    }
+
     public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
         private final String coordinatorKey;
         private final CoordinatorType coordinatorType;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index ae6986a..f5d9805 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -19,16 +19,46 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-public class FindCoordinatorResponse extends AbstractResponse {
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
+public class FindCoordinatorResponse extends AbstractResponse {
     private static final String COORDINATOR_KEY_NAME = "coordinator";
 
+    // coordinator level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
+            new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
+            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
+
+    private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " +
+                    "for a consumer group."));
+
+    private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
+    }
+
     /**
      * Possible error codes:
      *
@@ -37,10 +67,6 @@ public class FindCoordinatorResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
 
     private final int throttleTimeMs;
     private final String errorMessage;
@@ -59,12 +85,9 @@ public class FindCoordinatorResponse extends AbstractResponse {
     }
 
     public FindCoordinatorResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
-        if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
-            errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
-        else
-            errorMessage = null;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        error = Errors.forCode(struct.get(ERROR_CODE));
+        errorMessage = struct.getOrElse(ERROR_MESSAGE, null);
 
         Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
         int nodeId = broker.getInt(NODE_ID_KEY_NAME);
@@ -88,11 +111,9 @@ public class FindCoordinatorResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
-            struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
+        struct.setIfExists(ERROR_MESSAGE, errorMessage);
 
         Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
         coordinator.set(NODE_ID_KEY_NAME, node.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 7e08a55..00a806f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -18,15 +18,32 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class HeartbeatRequest extends AbstractRequest {
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
     private static final String MEMBER_ID_KEY_NAME = "member_id";
 
+    private static final Schema HEARTBEAT_REQUEST_V0 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            new Field(GROUP_GENERATION_ID_KEY_NAME, INT32, "The generation of the group."),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator."));
+
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
+    }
+
     public static class Builder extends AbstractRequest.Builder<HeartbeatRequest> {
         private final String groupId;
         private final int groupGenerationId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index cec39f0..e41d937 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -18,13 +18,25 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
 public class HeartbeatResponse extends AbstractResponse {
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(
+            ERROR_CODE);
+    private static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE);
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
+    }
 
     /**
      * Possible error codes:
@@ -49,8 +61,8 @@ public class HeartbeatResponse extends AbstractResponse {
     }
 
     public HeartbeatResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
     public int throttleTimeMs() {
@@ -64,9 +76,8 @@ public class HeartbeatResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
index 45f88a2..fa14a97 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -18,16 +18,29 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+
 public class InitProducerIdRequest extends AbstractRequest {
     public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
 
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
 
+    private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
+            new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional id whose producer id we want to retrieve or generate."),
+            new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0};
+    }
+
     private final String transactionalId;
     private final int transactionTimeoutMs;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 88fb09c..8799ad7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -18,11 +18,18 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 public class InitProducerIdResponse extends AbstractResponse {
     // Possible error codes:
     //   NotCoordinator
@@ -33,7 +40,19 @@ public class InitProducerIdResponse extends AbstractResponse {
 
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String EPOCH_KEY_NAME = "producer_epoch";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "The producer id for the input transactional id. If the input " +
+                    "id was empty, then this is used only for ensuring idempotence of messages."),
+            new Field(EPOCH_KEY_NAME, INT16, "The epoch for the producer id. Will always be 0 if no transactional " +
+                    "id was specified in the request."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0};
+    }
+
     private final int throttleTimeMs;
     private final Errors error;
     private final long producerId;
@@ -47,8 +66,8 @@ public class InitProducerIdResponse extends AbstractResponse {
     }
 
     public InitProducerIdResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
+        this.error = Errors.forCode(struct.get(ERROR_CODE));
         this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
         this.epoch = struct.getShort(EPOCH_KEY_NAME);
     }
@@ -76,10 +95,10 @@ public class InitProducerIdResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         struct.set(PRODUCER_ID_KEY_NAME, producerId);
         struct.set(EPOCH_KEY_NAME, epoch);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(ERROR_CODE, error.code());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index ff07d13..b2ff133 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -26,6 +29,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class JoinGroupRequest extends AbstractRequest {
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
@@ -36,6 +43,38 @@ public class JoinGroupRequest extends AbstractRequest {
     private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name";
     private static final String PROTOCOL_METADATA_KEY_NAME = "protocol_metadata";
 
+    /* Join group api */
+    private static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(
+            new Field(PROTOCOL_NAME_KEY_NAME, STRING),
+            new Field(PROTOCOL_METADATA_KEY_NAME, BYTES));
+
+    private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives " +
+                    "no heartbeat after this timeout in ms."),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or an empty string for a new consumer."),
+            new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"),
+            new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
+                    "that the member supports"));
+
+    private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives no " +
+                    "heartbeat after this timeout in ms."),
+            new Field(REBALANCE_TIMEOUT_KEY_NAME, INT32, "The maximum time that the coordinator will wait for each " +
+                    "member to rejoin when rebalancing the group"),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or an empty string for a new consumer."),
+            new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"),
+            new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
+                    "that the member supports"));
+
+    /* v2 request is the same as v1. Throttle time has been added to response */
+    private static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
+    }
+
     public static final String UNKNOWN_MEMBER_ID = "";
 
     private final String groupId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index eb86ce7..a4431b9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -26,9 +29,53 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class JoinGroupResponse extends AbstractResponse {
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String GENERATION_ID_KEY_NAME = "generation_id";
+    private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol";
+    private static final String LEADER_ID_KEY_NAME = "leader_id";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
+    private static final String MEMBERS_KEY_NAME = "members";
+
+    private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
+
+    private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(
+            new Field(MEMBER_ID_KEY_NAME, STRING),
+            new Field(MEMBER_METADATA_KEY_NAME, BYTES));
+
+    private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."),
+            new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"),
+            new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."),
+            new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
+
+    private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
+
+    private static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."),
+            new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"),
+            new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."),
+            new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
+
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
+    }
+
+    public static final String UNKNOWN_PROTOCOL = "";
+    public static final int UNKNOWN_GENERATION_ID = -1;
+    public static final String UNKNOWN_MEMBER_ID = "";
 
     /**
      * Possible error codes:
@@ -42,18 +89,6 @@ public class JoinGroupResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    private static final String GENERATION_ID_KEY_NAME = "generation_id";
-    private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol";
-    private static final String LEADER_ID_KEY_NAME = "leader_id";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
-    private static final String MEMBERS_KEY_NAME = "members";
-
-    private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
-
-    public static final String UNKNOWN_PROTOCOL = "";
-    public static final int UNKNOWN_GENERATION_ID = -1;
-    public static final String UNKNOWN_MEMBER_ID = "";
-
     private final int throttleTimeMs;
     private final Errors error;
     private final int generationId;
@@ -88,7 +123,7 @@ public class JoinGroupResponse extends AbstractResponse {
     }
 
     public JoinGroupResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
         members = new HashMap<>();
 
         for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
@@ -97,7 +132,7 @@ public class JoinGroupResponse extends AbstractResponse {
             ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME);
             members.put(memberId, memberMetadata);
         }
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        error = Errors.forCode(struct.get(ERROR_CODE));
         generationId = struct.getInt(GENERATION_ID_KEY_NAME);
         groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME);
         memberId = struct.getString(MEMBER_ID_KEY_NAME);
@@ -143,10 +178,9 @@ public class JoinGroupResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.JOIN_GROUP.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(ERROR_CODE, error.code());
         struct.set(GENERATION_ID_KEY_NAME, generationId);
         struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
         struct.set(MEMBER_ID_KEY_NAME, memberId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index bd379b8..73f037f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -20,6 +20,9 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -31,6 +34,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class LeaderAndIsrRequest extends AbstractRequest {
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
     private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
@@ -38,8 +47,6 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     private static final String LIVE_LEADERS_KEY_NAME = "live_leaders";
 
     // partition_states key names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String LEADER_KEY_NAME = "leader";
     private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch";
     private static final String ISR_KEY_NAME = "isr";
@@ -52,6 +59,51 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     private static final String HOST_KEY_NAME = "host";
     private static final String PORT_KEY_NAME = "port";
 
+    private static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 = new Schema(
+            TOPIC_NAME,
+            PARTITION_ID,
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(LEADER_KEY_NAME, INT32, "The broker id for the leader."),
+            new Field(LEADER_EPOCH_KEY_NAME, INT32, "The leader epoch."),
+            new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The in sync replica ids."),
+            new Field(ZK_VERSION_KEY_NAME, INT32, "The ZK version."),
+            new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The replica ids."));
+
+    // LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 added a per-partition is_new Field.
+    // This field specifies whether the replica should have existed on the broker or not.
+    private static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 = new Schema(
+            TOPIC_NAME,
+            PARTITION_ID,
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(LEADER_KEY_NAME, INT32, "The broker id for the leader."),
+            new Field(LEADER_EPOCH_KEY_NAME, INT32, "The leader epoch."),
+            new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The in sync replica ids."),
+            new Field(ZK_VERSION_KEY_NAME, INT32, "The ZK version."),
+            new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The replica ids."),
+            new Field(IS_NEW_KEY_NAME, BOOLEAN, "Whether the replica should have existed on the broker or not"));
+
+    private static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 = new Schema(
+            new Field(END_POINT_ID_KEY_NAME, INT32, "The broker id."),
+            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
+            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
+
+    private static final Schema LEADER_AND_ISR_REQUEST_V0 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
+            new Field(LIVE_LEADERS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
+
+    // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies whether the replica should have existed on the broker or not.
+    private static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema(
+            new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
+            new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
+            new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1)),
+            new Field(LIVE_LEADERS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1};
+    }
+
     public static class Builder extends AbstractRequest.Builder<LeaderAndIsrRequest> {
         private final int controllerId;
         private final int controllerEpoch;
@@ -105,8 +157,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) {
             Struct partitionStateData = (Struct) partitionStateDataObj;
-            String topic = partitionStateData.getString(TOPIC_KEY_NAME);
-            int partition = partitionStateData.getInt(PARTITION_KEY_NAME);
+            String topic = partitionStateData.get(TOPIC_NAME);
+            int partition = partitionStateData.get(PARTITION_ID);
             int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME);
             int leader = partitionStateData.getInt(LEADER_KEY_NAME);
             int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME);
@@ -154,8 +206,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) {
             Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME);
             TopicPartition topicPartition = entry.getKey();
-            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
-            partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
+            partitionStateData.set(TOPIC_NAME, topicPartition.topic());
+            partitionStateData.set(PARTITION_ID, topicPartition.partition());
             PartitionState partitionState = entry.getValue();
             partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.basePartitionState.controllerEpoch);
             partitionStateData.set(LEADER_KEY_NAME, partitionState.basePartitionState.leader);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index bc4400e..39b8c37 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,14 +30,27 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class LeaderAndIsrResponse extends AbstractResponse {
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+public class LeaderAndIsrResponse extends AbstractResponse {
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
-    private static final String PARTITIONS_TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_PARTITION_KEY_NAME = "partition";
-    private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
+    private static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema(
+            TOPIC_NAME,
+            PARTITION_ID,
+            ERROR_CODE);
+    private static final Schema LEADER_AND_ISR_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0)));
+
+    // LeaderAndIsrResponse V1 may receive KAFKA_STORAGE_ERROR in the response
+    private static final Schema LEADER_AND_ISR_RESPONSE_V1 = LEADER_AND_ISR_RESPONSE_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1};
+    }
 
     /**
      * Possible error code:
@@ -54,13 +70,13 @@ public class LeaderAndIsrResponse extends AbstractResponse {
         responses = new HashMap<>();
         for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
             Struct responseData = (Struct) responseDataObj;
-            String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
-            int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
-            Errors error = Errors.forCode(responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME));
+            String topic = responseData.get(TOPIC_NAME);
+            int partition = responseData.get(PARTITION_ID);
+            Errors error = Errors.forCode(responseData.get(ERROR_CODE));
             responses.put(new TopicPartition(topic, partition), error);
         }
 
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
     public Map<TopicPartition, Errors> responses() {
@@ -83,14 +99,14 @@ public class LeaderAndIsrResponse extends AbstractResponse {
         for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) {
             Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
             TopicPartition partition = response.getKey();
-            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
-            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition());
-            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code());
+            partitionData.set(TOPIC_NAME, partition.topic());
+            partitionData.set(PARTITION_ID, partition.partition());
+            partitionData.set(ERROR_CODE, response.getValue().code());
             responseDatas.add(partitionData);
         }
 
         struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(ERROR_CODE, error.code());
 
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 76e076e..661eb7f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -16,15 +16,31 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class LeaveGroupRequest extends AbstractRequest {
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String MEMBER_ID_KEY_NAME = "member_id";
 
+    private static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(
+            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator."));
+
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    private static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
+    }
+
     public static class Builder extends AbstractRequest.Builder<LeaveGroupRequest> {
         private final String groupId;
         private final String memberId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 1c85850..bef21e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -18,13 +18,25 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
 public class LeaveGroupResponse extends AbstractResponse {
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(
+            ERROR_CODE);
+    private static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE);
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
+    }
 
     /**
      * Possible error code:
@@ -48,8 +60,8 @@ public class LeaveGroupResponse extends AbstractResponse {
     }
 
     public LeaveGroupResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        this.error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
     public int throttleTimeMs() {
@@ -63,9 +75,8 @@ public class LeaveGroupResponse extends AbstractResponse {
     @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 3d4f2b8..f279b4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -18,12 +18,24 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
 public class ListGroupsRequest extends AbstractRequest {
+
+    /* List groups api */
+    private static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
+
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    private static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
+    }
+
     public static class Builder extends AbstractRequest.Builder<ListGroupsRequest> {
         public Builder() {
             super(ApiKeys.LIST_GROUPS);
@@ -49,7 +61,7 @@ public class ListGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+    public ListGroupsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
         switch (versionId) {
             case 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 8ae3792..cdf4c59 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -18,19 +18,39 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class ListGroupsResponse extends AbstractResponse {
 
-    public static final String ERROR_CODE_KEY_NAME = "error_code";
-    public static final String GROUPS_KEY_NAME = "groups";
-    public static final String GROUP_ID_KEY_NAME = "group_id";
-    public static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+    private static final String GROUPS_KEY_NAME = "groups";
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+
+    private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(
+            new Field("group_id", STRING),
+            new Field("protocol_type", STRING));
+    private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+    private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
+    }
 
     /**
      * Possible error codes:
@@ -54,8 +74,8 @@ public class ListGroupsResponse extends AbstractResponse {
     }
 
     public ListGroupsResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        this.error = Errors.forCode(struct.get(ERROR_CODE));
         this.groups = new ArrayList<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
@@ -99,9 +119,8 @@ public class ListGroupsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LIST_GROUPS.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
         List<Struct> groupList = new ArrayList<>();
         for (Group group : groups) {
             Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
@@ -113,14 +132,6 @@ public class ListGroupsResponse extends AbstractResponse {
         return struct;
     }
 
-    public static ListGroupsResponse fromError(Errors error) {
-        return fromError(DEFAULT_THROTTLE_TIME, error);
-    }
-
-    public static ListGroupsResponse fromError(int throttleTimeMs, Errors error) {
-        return new ListGroupsResponse(throttleTimeMs, error, Collections.<Group>emptyList());
-    }
-
     public static ListGroupsResponse parse(ByteBuffer buffer, short version) {
         return new ListGroupsResponse(ApiKeys.LIST_GROUPS.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 03f6ee5..ace582d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -20,6 +20,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -32,6 +35,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+
 public class ListOffsetRequest extends AbstractRequest {
     public static final long EARLIEST_TIMESTAMP = -2L;
     public static final long LATEST_TIMESTAMP = -1L;
@@ -44,14 +53,49 @@ public class ListOffsetRequest extends AbstractRequest {
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String TIMESTAMP_KEY_NAME = "timestamp";
     private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
 
+    private static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(
+            PARTITION_ID,
+            new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp."),
+            new Field(MAX_NUM_OFFSETS_KEY_NAME, INT32, "Maximum offsets to return."));
+    private static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new Schema(
+            PARTITION_ID,
+            new Field(TIMESTAMP_KEY_NAME, INT64, "The target timestamp for the partition."));
+
+    private static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0), "Partitions to list offset."));
+    private static final Schema LIST_OFFSET_REQUEST_TOPIC_V1 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V1), "Partitions to list offset."));
+
+    private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(
+            new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0), "Topics to list offsets."));
+    private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(
+            new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), "Topics to list offsets."));
+
+    private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
+            new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."),
+            new Field(ISOLATION_LEVEL_KEY_NAME, INT8, "This setting controls the visibility of transactional records. " +
+                    "Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED " +
+                    "(isolation_level = 1), non-transactional and COMMITTED transactional records are visible. " +
+                    "To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current " +
+                    "LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the " +
+                    "result, which allows consumers to discard ABORTED transactional records"),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), "Topics to list offsets."));;
+
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
+    }
+
     private final int replicaId;
     private final IsolationLevel isolationLevel;
     private final Map<TopicPartition, PartitionData> offsetData;
@@ -193,10 +237,10 @@ public class ListOffsetRequest extends AbstractRequest {
         partitionTimestamps = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            String topic = topicResponse.get(TOPIC_NAME);
             for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                int partition = partitionResponse.get(PARTITION_ID);
                 long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
                 TopicPartition tp = new TopicPartition(topic, partition);
                 if (partitionResponse.hasField(MAX_NUM_OFFSETS_KEY_NAME)) {
@@ -283,20 +327,20 @@ public class ListOffsetRequest extends AbstractRequest {
         List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicData.set(TOPIC_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, Object> partitionEntry : topicEntry.getValue().entrySet()) {
                 if (version == 0) {
                     PartitionData offsetPartitionData = (PartitionData) partitionEntry.getValue();
                     Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                    partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                    partitionData.set(PARTITION_ID, partitionEntry.getKey());
                     partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
                     partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
                     partitionArray.add(partitionData);
                 } else {
                     Long timestamp = (Long) partitionEntry.getValue();
                     Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                    partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                    partitionData.set(PARTITION_ID, partitionEntry.getKey());
                     partitionData.set(TIMESTAMP_KEY_NAME, timestamp);
                     partitionArray.add(partitionData);
                 }