You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/19 04:13:27 UTC

[6/8] kafka git commit: MINOR: Move request/response schemas to the corresponding object representation

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
new file mode 100644
index 0000000..b031b4f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol.types;
+
+/**
+ * A field definition bound to a particular schema.
+ */
+public class BoundField {
+    public final Field def;
+    final int index;
+    final Schema schema;
+
+    public BoundField(Field def, Schema schema, int index) {
+        this.def = def;
+        this.schema = schema;
+        this.index = index;
+    }
+
+    @Override
+    public String toString() {
+        return def.name + ":" + def.type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 29a89d4..8da848b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -16,63 +16,67 @@
  */
 package org.apache.kafka.common.protocol.types;
 
-/**
- * A field in a schema
- */
 public class Field {
-
-    public static final Object NO_DEFAULT = new Object();
-
-    final int index;
     public final String name;
+    public final String docString;
     public final Type type;
+    public final boolean hasDefaultValue;
     public final Object defaultValue;
-    public final String doc;
-    final Schema schema;
 
-    /**
-     * Create the field.
-     *
-     * @throws SchemaException If the default value is not primitive and the validation fails
-     */
-    public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
-        this.index = index;
+    public Field(String name, Type type, String docString, boolean hasDefaultValue, Object defaultValue) {
         this.name = name;
+        this.docString = docString;
         this.type = type;
-        this.doc = doc;
+        this.hasDefaultValue = hasDefaultValue;
         this.defaultValue = defaultValue;
-        this.schema = schema;
-        if (defaultValue != NO_DEFAULT)
+
+        if (hasDefaultValue)
             type.validate(defaultValue);
     }
 
-    public Field(int index, String name, Type type, String doc, Object defaultValue) {
-        this(index, name, type, doc, defaultValue, null);
+    public Field(String name, Type type, String docString) {
+        this(name, type, docString, false, null);
     }
 
-    public Field(String name, Type type, String doc, Object defaultValue) {
-        this(-1, name, type, doc, defaultValue);
+    public Field(String name, Type type, String docString, Object defaultValue) {
+        this(name, type, docString, true, defaultValue);
     }
 
-    public Field(String name, Type type, String doc) {
-        this(name, type, doc, NO_DEFAULT);
+    public Field(String name, Type type) {
+        this(name, type, null, false, null);
     }
 
-    public Field(String name, Type type) {
-        this(name, type, "");
+    public static class Int8 extends Field {
+        public Int8(String name, String docString) {
+            super(name, Type.INT8, docString, false, null);
+        }
     }
 
-    public Type type() {
-        return type;
+    public static class Int32 extends Field {
+        public Int32(String name, String docString) {
+            super(name, Type.INT32, docString, false, null);
+        }
+
+        public Int32(String name, String docString, int defaultValue) {
+            super(name, Type.INT32, docString, true, defaultValue);
+        }
     }
 
-    public Schema schema() {
-        return schema;
+    public static class Int16 extends Field {
+        public Int16(String name, String docString) {
+            super(name, Type.INT16, docString, false, null);
+        }
     }
 
+    public static class Str extends Field {
+        public Str(String name, String docString) {
+            super(name, Type.STRING, docString, false, null);
+        }
+    }
 
-    @Override
-    public String toString() {
-        return name + ":" + type;
+    public static class NullableStr extends Field {
+        public NullableStr(String name, String docString) {
+            super(name, Type.NULLABLE_STRING, docString, false, null);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index a9c08aa..187e14b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -25,8 +25,8 @@ import java.util.Map;
  */
 public class Schema extends Type {
 
-    private final Field[] fields;
-    private final Map<String, Field> fieldsByName;
+    private final BoundField[] fields;
+    private final Map<String, BoundField> fieldsByName;
 
     /**
      * Construct the schema with a given list of its field values
@@ -34,14 +34,14 @@ public class Schema extends Type {
      * @throws SchemaException If the given list have duplicate fields
      */
     public Schema(Field... fs) {
-        this.fields = new Field[fs.length];
+        this.fields = new BoundField[fs.length];
         this.fieldsByName = new HashMap<>();
         for (int i = 0; i < this.fields.length; i++) {
-            Field field = fs[i];
-            if (fieldsByName.containsKey(field.name))
-                throw new SchemaException("Schema contains a duplicate field: " + field.name);
-            this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
-            this.fieldsByName.put(fs[i].name, this.fields[i]);
+            Field def = fs[i];
+            if (fieldsByName.containsKey(def.name))
+                throw new SchemaException("Schema contains a duplicate field: " + def.name);
+            this.fields[i] = new BoundField(def, this, i);
+            this.fieldsByName.put(def.name, this.fields[i]);
         }
     }
 
@@ -51,12 +51,12 @@ public class Schema extends Type {
     @Override
     public void write(ByteBuffer buffer, Object o) {
         Struct r = (Struct) o;
-        for (Field field : fields) {
+        for (BoundField field : fields) {
             try {
-                Object value = field.type().validate(r.get(field));
-                field.type.write(buffer, value);
+                Object value = field.def.type.validate(r.get(field));
+                field.def.type.write(buffer, value);
             } catch (Exception e) {
-                throw new SchemaException("Error writing field '" + field.name + "': " +
+                throw new SchemaException("Error writing field '" + field.def.name + "': " +
                                           (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
         }
@@ -70,9 +70,9 @@ public class Schema extends Type {
         Object[] objects = new Object[fields.length];
         for (int i = 0; i < fields.length; i++) {
             try {
-                objects[i] = fields[i].type.read(buffer);
+                objects[i] = fields[i].def.type.read(buffer);
             } catch (Exception e) {
-                throw new SchemaException("Error reading field '" + fields[i].name + "': " +
+                throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
                                           (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
         }
@@ -86,11 +86,11 @@ public class Schema extends Type {
     public int sizeOf(Object o) {
         int size = 0;
         Struct r = (Struct) o;
-        for (Field field : fields) {
+        for (BoundField field : fields) {
             try {
-                size += field.type.sizeOf(r.get(field));
+                size += field.def.type.sizeOf(r.get(field));
             } catch (Exception e) {
-                throw new SchemaException("Error computing size for field '" + field.name + "': " +
+                throw new SchemaException("Error computing size for field '" + field.def.name + "': " +
                         (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
         }
@@ -110,7 +110,7 @@ public class Schema extends Type {
      * @param slot The slot at which this field sits
      * @return The field
      */
-    public Field get(int slot) {
+    public BoundField get(int slot) {
         return this.fields[slot];
     }
 
@@ -120,14 +120,14 @@ public class Schema extends Type {
      * @param name The name of the field
      * @return The field
      */
-    public Field get(String name) {
+    public BoundField get(String name) {
         return this.fieldsByName.get(name);
     }
 
     /**
      * Get all the fields in this schema
      */
-    public Field[] fields() {
+    public BoundField[] fields() {
         return this.fields;
     }
 
@@ -151,11 +151,11 @@ public class Schema extends Type {
     public Struct validate(Object item) {
         try {
             Struct struct = (Struct) item;
-            for (Field field : fields) {
+            for (BoundField field : fields) {
                 try {
-                    field.type.validate(struct.get(field));
+                    field.def.type.validate(struct.get(field));
                 } catch (SchemaException e) {
-                    throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
+                    throw new SchemaException("Invalid value for field '" + field.def.name + "': " + e.getMessage());
                 }
             }
             return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index c42390b..b3e9975 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -51,16 +51,16 @@ public class Struct {
      * @param field The field for which to get the default value
      * @throws SchemaException if the field has no value and has no default.
      */
-    private Object getFieldOrDefault(Field field) {
+    private Object getFieldOrDefault(BoundField field) {
         Object value = this.values[field.index];
         if (value != null)
             return value;
-        else if (field.defaultValue != Field.NO_DEFAULT)
-            return field.defaultValue;
-        else if (field.type.isNullable())
+        else if (field.def.hasDefaultValue)
+            return field.def.defaultValue;
+        else if (field.def.type.isNullable())
             return null;
         else
-            throw new SchemaException("Missing value for field '" + field.name + "' which has no default value.");
+            throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value.");
     }
 
     /**
@@ -70,11 +70,43 @@ public class Struct {
      * @return The value for that field.
      * @throws SchemaException if the field has no value and has no default.
      */
-    public Object get(Field field) {
+    public Object get(BoundField field) {
         validateField(field);
         return getFieldOrDefault(field);
     }
 
+    public Byte get(Field.Int8 field) {
+        return getByte(field.name);
+    }
+
+    public Integer get(Field.Int32 field) {
+        return getInt(field.name);
+    }
+
+    public Short get(Field.Int16 field) {
+        return getShort(field.name);
+    }
+
+    public String get(Field.Str field) {
+        return getString(field.name);
+    }
+
+    public String get(Field.NullableStr field) {
+        return getString(field.name);
+    }
+
+    public Integer getOrElse(Field.Int32 field, int alternative) {
+        if (hasField(field.name))
+            return getInt(field.name);
+        return alternative;
+    }
+
+    public String getOrElse(Field.NullableStr field, String alternative) {
+        if (hasField(field.name))
+            return getString(field.name);
+        return alternative;
+    }
+
     /**
      * Get the record value for the field with the given name by doing a hash table lookup (slower!)
      *
@@ -83,7 +115,7 @@ public class Struct {
      * @throws SchemaException If no such field exists
      */
     public Object get(String name) {
-        Field field = schema.get(name);
+        BoundField field = schema.get(name);
         if (field == null)
             throw new SchemaException("No such field: " + name);
         return getFieldOrDefault(field);
@@ -98,7 +130,11 @@ public class Struct {
         return schema.get(name) != null;
     }
 
-    public Struct getStruct(Field field) {
+    public boolean hasField(Field def) {
+        return schema.get(def.name) != null;
+    }
+
+    public Struct getStruct(BoundField field) {
         return (Struct) get(field);
     }
 
@@ -106,7 +142,7 @@ public class Struct {
         return (Struct) get(name);
     }
 
-    public Byte getByte(Field field) {
+    public Byte getByte(BoundField field) {
         return (Byte) get(field);
     }
 
@@ -118,7 +154,7 @@ public class Struct {
         return (Records) get(name);
     }
 
-    public Short getShort(Field field) {
+    public Short getShort(BoundField field) {
         return (Short) get(field);
     }
 
@@ -126,7 +162,7 @@ public class Struct {
         return (Short) get(name);
     }
 
-    public Integer getInt(Field field) {
+    public Integer getInt(BoundField field) {
         return (Integer) get(field);
     }
 
@@ -138,7 +174,7 @@ public class Struct {
         return (Long) get(name);
     }
 
-    public Long getLong(Field field) {
+    public Long getLong(BoundField field) {
         return (Long) get(field);
     }
 
@@ -146,7 +182,7 @@ public class Struct {
         return (Long) get(name);
     }
 
-    public Object[] getArray(Field field) {
+    public Object[] getArray(BoundField field) {
         return (Object[]) get(field);
     }
 
@@ -154,7 +190,7 @@ public class Struct {
         return (Object[]) get(name);
     }
 
-    public String getString(Field field) {
+    public String getString(BoundField field) {
         return (String) get(field);
     }
 
@@ -162,7 +198,7 @@ public class Struct {
         return (String) get(name);
     }
 
-    public Boolean getBoolean(Field field) {
+    public Boolean getBoolean(BoundField field) {
         return (Boolean) get(field);
     }
 
@@ -170,7 +206,7 @@ public class Struct {
         return (Boolean) get(name);
     }
 
-    public ByteBuffer getBytes(Field field) {
+    public ByteBuffer getBytes(BoundField field) {
         Object result = get(field);
         if (result instanceof byte[])
             return ByteBuffer.wrap((byte[]) result);
@@ -191,7 +227,7 @@ public class Struct {
      * @param value The value
      * @throws SchemaException If the validation of the field failed
      */
-    public Struct set(Field field, Object value) {
+    public Struct set(BoundField field, Object value) {
         validateField(field);
         this.values[field.index] = value;
         return this;
@@ -205,13 +241,40 @@ public class Struct {
      * @throws SchemaException If the field is not known
      */
     public Struct set(String name, Object value) {
-        Field field = this.schema.get(name);
+        BoundField field = this.schema.get(name);
         if (field == null)
             throw new SchemaException("Unknown field: " + name);
         this.values[field.index] = value;
         return this;
     }
 
+    public Struct set(Field.Str def, String value) {
+        return set(def.name, value);
+    }
+
+    public Struct set(Field.NullableStr def, String value) {
+        return set(def.name, value);
+    }
+
+    public Struct set(Field.Int8 def, byte value) {
+        return set(def.name, value);
+    }
+
+    public Struct set(Field.Int32 def, int value) {
+        return set(def.name, value);
+    }
+
+    public Struct set(Field.Int16 def, short value) {
+        return set(def.name, value);
+    }
+
+    public Struct setIfExists(Field def, Object value) {
+        BoundField field = this.schema.get(def.name);
+        if (field != null)
+            this.values[field.index] = value;
+        return this;
+    }
+
     /**
      * Create a struct for the schema of a container type (struct or array). Note that for array type, this method
      * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
@@ -221,15 +284,15 @@ public class Struct {
      * @return The struct
      * @throws SchemaException If the given field is not a container type
      */
-    public Struct instance(Field field) {
+    public Struct instance(BoundField field) {
         validateField(field);
-        if (field.type() instanceof Schema) {
-            return new Struct((Schema) field.type());
-        } else if (field.type() instanceof ArrayOf) {
-            ArrayOf array = (ArrayOf) field.type();
+        if (field.def.type instanceof Schema) {
+            return new Struct((Schema) field.def.type);
+        } else if (field.def.type instanceof ArrayOf) {
+            ArrayOf array = (ArrayOf) field.def.type;
             return new Struct((Schema) array.type());
         } else {
-            throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type());
+            throw new SchemaException("Field '" + field.def.name + "' is not a container type, it is of type " + field.def.type);
         }
     }
 
@@ -270,9 +333,9 @@ public class Struct {
      *
      * @throws SchemaException If validation fails
      */
-    private void validateField(Field field) {
+    private void validateField(BoundField field) {
         if (this.schema != field.schema)
-            throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance.");
+            throw new SchemaException("Attempt to access field '" + field.def.name + "' from a different schema instance.");
         if (field.index > values.length)
             throw new SchemaException("Invalid field index: " + field.index);
     }
@@ -291,10 +354,10 @@ public class Struct {
         StringBuilder b = new StringBuilder();
         b.append('{');
         for (int i = 0; i < this.values.length; i++) {
-            Field f = this.schema.get(i);
-            b.append(f.name);
+            BoundField f = this.schema.get(i);
+            b.append(f.def.name);
             b.append('=');
-            if (f.type() instanceof ArrayOf && this.values[i] != null) {
+            if (f.def.type instanceof ArrayOf && this.values[i] != null) {
                 Object[] arrayValue = (Object[]) this.values[i];
                 b.append('[');
                 for (int j = 0; j < arrayValue.length; j++) {
@@ -317,8 +380,8 @@ public class Struct {
         final int prime = 31;
         int result = 1;
         for (int i = 0; i < this.values.length; i++) {
-            Field f = this.schema.get(i);
-            if (f.type() instanceof ArrayOf) {
+            BoundField f = this.schema.get(i);
+            if (f.def.type instanceof ArrayOf) {
                 if (this.get(f) != null) {
                     Object[] arrayObject = (Object[]) this.get(f);
                     for (Object arrayItem: arrayObject)
@@ -346,9 +409,9 @@ public class Struct {
         if (schema != other.schema)
             return false;
         for (int i = 0; i < this.values.length; i++) {
-            Field f = this.schema.get(i);
+            BoundField f = this.schema.get(i);
             boolean result;
-            if (f.type() instanceof ArrayOf) {
+            if (f.def.type instanceof ArrayOf) {
                 result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
             } else {
                 Object thisField = this.get(f);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 34fda50..1f1418f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -133,9 +133,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new SyncGroupRequest(struct, apiVersion);
             case STOP_REPLICA:
                 return new StopReplicaRequest(struct, apiVersion);
-            case CONTROLLED_SHUTDOWN_KEY:
+            case CONTROLLED_SHUTDOWN:
                 return new ControlledShutdownRequest(struct, apiVersion);
-            case UPDATE_METADATA_KEY:
+            case UPDATE_METADATA:
                 return new UpdateMetadataRequest(struct, apiVersion);
             case LEADER_AND_ISR:
                 return new LeaderAndIsrRequest(struct, apiVersion);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 95d1ef9..b6cb8fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public abstract class AbstractResponse extends AbstractRequestResponse {
-    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final int DEFAULT_THROTTLE_TIME = 0;
 
     protected Send toSend(String destination, ResponseHeader header, short apiVersion) {
@@ -66,9 +65,9 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new SyncGroupResponse(struct);
             case STOP_REPLICA:
                 return new StopReplicaResponse(struct);
-            case CONTROLLED_SHUTDOWN_KEY:
+            case CONTROLLED_SHUTDOWN:
                 return new ControlledShutdownResponse(struct);
-            case UPDATE_METADATA_KEY:
+            case UPDATE_METADATA:
                 return new UpdateMetadataResponse(struct);
             case LEADER_AND_ISR:
                 return new LeaderAndIsrResponse(struct);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 36b290f..e3e4d79 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -18,16 +18,32 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class AddOffsetsToTxnRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String EPOCH_KEY_NAME = "producer_epoch";
     private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
 
+    private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
+            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."),
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
+            new Field(EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
+            new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Consumer group id whose offsets should be included in the transaction."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0};
+    }
+
     public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
         private final String transactionalId;
         private final long producerId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 981a234..10dc279 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -18,12 +18,22 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
 public class AddOffsetsToTxnResponse extends AbstractResponse {
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE);
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0};
+    }
 
     // Possible error codes:
     //   NotCoordinator
@@ -44,8 +54,8 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     }
 
     public AddOffsetsToTxnResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
+        this.error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
     public int throttleTimeMs() {
@@ -59,8 +69,8 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 6fe034c..c195e24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -28,14 +31,32 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class AddPartitionsToTxnRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
-    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
-    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String TOPICS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
+    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
+            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."),
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
+            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
+            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
+                    "The partitions to add to the transaction."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0};
+    }
+
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
         private final String transactionalId;
         private final long producerId;
@@ -93,10 +114,10 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
         this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
 
         List<TopicPartition> partitions = new ArrayList<>();
-        Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+        Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
         for (Object topicPartitionObj : topicPartitionsArray) {
             Struct topicPartitionStruct = (Struct) topicPartitionObj;
-            String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicPartitionStruct.get(TOPIC_NAME);
             for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
                 partitions.add(new TopicPartition(topic, (Integer) partitionObj));
             }
@@ -131,13 +152,13 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
         Object[] partitionsArray = new Object[mappedPartitions.size()];
         int i = 0;
         for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
-            Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
-            topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+            Struct topicPartitionsStruct = struct.instance(TOPICS_KEY_NAME);
+            topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
             topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
             partitionsArray[i++] = topicPartitionsStruct;
         }
 
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+        struct.set(TOPICS_KEY_NAME, partitionsArray);
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index f05310a..e9f6088 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -28,13 +31,27 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
 public class AddPartitionsToTxnResponse extends AbstractResponse {
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String ERRORS_KEY_NAME = "errors";
-    private static final String TOPIC_NAME = "topic";
-    private static final String PARTITION = "partition";
     private static final String PARTITION_ERRORS = "partition_errors";
 
+    private static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(ERRORS_KEY_NAME, new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field(PARTITION_ERRORS, new ArrayOf(new Schema(
+                            PARTITION_ID,
+                            ERROR_CODE)))))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
+    }
+
     private final int throttleTimeMs;
 
     // Possible error codes:
@@ -56,15 +73,15 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     }
 
     public AddPartitionsToTxnResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         errors = new HashMap<>();
         for (Object topic : struct.getArray(ERRORS_KEY_NAME)) {
             Struct topicStruct = (Struct) topic;
-            final String topicName = topicStruct.getString(TOPIC_NAME);
+            final String topicName = topicStruct.get(TOPIC_NAME);
             for (Object partition : topicStruct.getArray(PARTITION_ERRORS)) {
                 Struct partitionStruct = (Struct) partition;
-                TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.getInt(PARTITION));
-                errors.put(topicPartition, Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME)));
+                TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.get(PARTITION_ID));
+                errors.put(topicPartition, Errors.forCode(partitionStruct.get(ERROR_CODE)));
             }
         }
     }
@@ -80,7 +97,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
 
         Map<String, Map<Integer, Errors>> errorsByTopic = CollectionUtils.groupDataByTopic(errors);
         List<Struct> topics = new ArrayList<>(errorsByTopic.size());
@@ -90,8 +107,8 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, Errors> partitionErrors : entry.getValue().entrySet()) {
                 final Struct partitionData = topicErrorCodes.instance(PARTITION_ERRORS)
-                        .set(PARTITION, partitionErrors.getKey())
-                        .set(ERROR_CODE_KEY_NAME, partitionErrors.getValue().code());
+                        .set(PARTITION_ID, partitionErrors.getKey())
+                        .set(ERROR_CODE, partitionErrors.getValue().code());
                 partitionArray.add(partitionData);
 
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index a964f85..14b39ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -18,6 +18,9 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,6 +30,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class AlterConfigsRequest extends AbstractRequest {
 
     private static final String RESOURCES_KEY_NAME = "resources";
@@ -38,6 +46,24 @@ public class AlterConfigsRequest extends AbstractRequest {
     private static final String CONFIG_NAME = "config_name";
     private static final String CONFIG_VALUE = "config_value";
 
+    private static final Schema CONFIG_ENTRY = new Schema(
+            new Field(CONFIG_NAME, STRING, "Configuration name"),
+            new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value"));
+
+    private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
+            new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+            new Field(RESOURCE_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY)));
+
+    private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema(
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0),
+                    "An array of resources to update with the provided configs."),
+            new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {ALTER_CONFIGS_REQUEST_V0};
+    }
+
     public static class Config {
         private final Collection<ConfigEntry> entries;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index 3a3eb9a..df9416e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -18,6 +18,9 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -26,12 +29,32 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class AlterConfigsResponse extends AbstractResponse {
 
     private static final String RESOURCES_KEY_NAME = "resources";
     private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
     private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
 
+    private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+            new Field(RESOURCE_NAME_KEY_NAME, STRING));
+
+    private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ALTER_CONFIGS_RESPONSE_V0};
+    }
+
     private final int throttleTimeMs;
     private final Map<Resource, ApiError> errors;
 
@@ -42,7 +65,7 @@ public class AlterConfigsResponse extends AbstractResponse {
     }
 
     public AlterConfigsResponse(Struct struct) {
-        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
         errors = new HashMap<>(resourcesArray.length);
         for (Object resourceObj : resourcesArray) {
@@ -65,7 +88,7 @@ public class AlterConfigsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> resourceStructs = new ArrayList<>(errors.size());
         for (Map.Entry<Resource, ApiError> entry : errors.entrySet()) {
             Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
index 2c2401b..7e58fd6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
@@ -20,14 +20,21 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class AlterReplicaDirRequest extends AbstractRequest {
 
@@ -39,9 +46,19 @@ public class AlterReplicaDirRequest extends AbstractRequest {
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
+    private static final Schema ALTER_REPLICA_DIR_REQUEST_V0 = new Schema(
+            new Field("log_dirs", new ArrayOf(new Schema(
+                    new Field("log_dir", STRING, "The absolute log directory path."),
+                    new Field("topics", new ArrayOf(new Schema(
+                            TOPIC_NAME,
+                            new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ALTER_REPLICA_DIR_REQUEST_V0};
+    }
+
     private final Map<TopicPartition, String> partitionDirs;
 
     public static class Builder extends AbstractRequest.Builder<AlterReplicaDirRequest> {
@@ -76,7 +93,7 @@ public class AlterReplicaDirRequest extends AbstractRequest {
             String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
             for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
                 Struct topicStruct = (Struct) topicStructObj;
-                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+                String topic = topicStruct.get(TOPIC_NAME);
                 for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
                     int partition = (Integer) partitionObj;
                     partitionDirs.put(new TopicPartition(topic, partition), logDir);
@@ -108,7 +125,7 @@ public class AlterReplicaDirRequest extends AbstractRequest {
             List<Struct> topicStructArray = new ArrayList<>();
             for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
                 Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
-                topicStruct.set(TOPIC_KEY_NAME, topicEntry.getKey());
+                topicStruct.set(TOPIC_NAME, topicEntry.getKey());
                 topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
                 topicStructArray.add(topicStruct);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
index f97f9a0..ed00b75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -20,14 +20,23 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
 
 public class AlterReplicaDirResponse extends AbstractResponse {
 
@@ -35,12 +44,19 @@ public class AlterReplicaDirResponse extends AbstractResponse {
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
-    // partition level key names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field("topics", new ArrayOf(new Schema(
+                    TOPIC_NAME,
+                    new Field("partitions", new ArrayOf(new Schema(
+                            PARTITION_ID,
+                            ERROR_CODE)))))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{ALTER_REPLICA_DIR_RESPONSE_V0};
+    }
 
     /**
      * Possible error code:
@@ -54,15 +70,15 @@ public class AlterReplicaDirResponse extends AbstractResponse {
     private final int throttleTimeMs;
 
     public AlterReplicaDirResponse(Struct struct) {
-        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         responses = new HashMap<>();
         for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicStruct = (Struct) topicStructObj;
-            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicStruct.get(TOPIC_NAME);
             for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionStruct = (Struct) partitionStructObj;
-                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
-                Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                int partition = partitionStruct.get(PARTITION_ID);
+                Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
                 responses.put(new TopicPartition(topic, partition), error);
             }
         }
@@ -79,18 +95,18 @@ public class AlterReplicaDirResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
         List<Struct> topicStructArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
             Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
-            topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+            topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
             List<Struct> partitionStructArray = new ArrayList<>();
             for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
                 Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
                 Errors response = responsesByPartitionEntry.getValue();
-                partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey());
-                partitionStruct.set(ERROR_CODE_KEY_NAME, response.code());
+                partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey());
+                partitionStruct.set(ERROR_CODE, response.code());
                 partitionStructArray.add(partitionStruct);
             }
             topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index d712123..dad21b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+
 /**
  * Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only
  * defined if it adds information over the default message associated with the error code.
@@ -31,9 +34,6 @@ public class ApiError {
 
     public static final ApiError NONE = new ApiError(Errors.NONE, null);
 
-    private static final String CODE_KEY_NAME = "error_code";
-    private static final String MESSAGE_KEY_NAME = "error_message";
-
     private final Errors error;
     private final String message;
 
@@ -45,12 +45,9 @@ public class ApiError {
     }
 
     public ApiError(Struct struct) {
-        error = Errors.forCode(struct.getShort(CODE_KEY_NAME));
+        error = Errors.forCode(struct.get(ERROR_CODE));
         // In some cases, the error message field was introduced in newer version
-        if (struct.hasField(MESSAGE_KEY_NAME))
-            message = struct.getString(MESSAGE_KEY_NAME);
-        else
-            message = null;
+        message = struct.getOrElse(ERROR_MESSAGE, null);
     }
 
     public ApiError(Errors error, String message) {
@@ -59,10 +56,9 @@ public class ApiError {
     }
 
     public void write(Struct struct) {
-        struct.set(CODE_KEY_NAME, error.code());
-        // In some cases, the error message field was introduced in a newer protocol API version
-        if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != Errors.NONE)
-            struct.set(MESSAGE_KEY_NAME, message);
+        struct.set(ERROR_CODE, error.code());
+        if (error != Errors.NONE)
+            struct.setIfExists(ERROR_MESSAGE, message);
     }
 
     public boolean is(Errors error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 025ef6c..22daf6c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -18,12 +18,22 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
 public class ApiVersionsRequest extends AbstractRequest {
+    private static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
+
+    /* v1 request is the same as v0. Throttle time has been added to response */
+    private static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
+    }
+
     public static class Builder extends AbstractRequest.Builder<ApiVersionsRequest> {
 
         public Builder() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 5a48c93..6a0418f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.RecordBatch;
 
@@ -28,15 +31,35 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ApiVersionsResponse extends AbstractResponse {
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
 
-    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
-    public static final String ERROR_CODE_KEY_NAME = "error_code";
-    public static final String API_VERSIONS_KEY_NAME = "api_versions";
-    public static final String API_KEY_NAME = "api_key";
-    public static final String MIN_VERSION_KEY_NAME = "min_version";
-    public static final String MAX_VERSION_KEY_NAME = "max_version";
+public class ApiVersionsResponse extends AbstractResponse {
+    private static final String API_VERSIONS_KEY_NAME = "api_versions";
+    private static final String API_KEY_NAME = "api_key";
+    private static final String MIN_VERSION_KEY_NAME = "min_version";
+    private static final String MAX_VERSION_KEY_NAME = "max_version";
+
+    private static final Schema API_VERSIONS_V0 = new Schema(
+            new Field(API_KEY_NAME, INT16, "API key."),
+            new Field(MIN_VERSION_KEY_NAME, INT16, "Minimum supported version."),
+            new Field(MAX_VERSION_KEY_NAME, INT16, "Maximum supported version."));
+
+    private static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(API_VERSIONS_KEY_NAME, new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."));
+    private static final Schema API_VERSIONS_RESPONSE_V1 = new Schema(
+            ERROR_CODE,
+            new Field(API_VERSIONS_KEY_NAME, new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
+            THROTTLE_TIME_MS);
+
+    // initialized lazily to avoid circular initialization dependence with ApiKeys
+    private static volatile ApiVersionsResponse defaultApiVersionsResponse;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
+    }
 
     /**
      * Possible error codes:
@@ -83,8 +106,8 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     public ApiVersionsResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        this.error = Errors.forCode(struct.get(ERROR_CODE));
         List<ApiVersion> tempApiVersions = new ArrayList<>();
         for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
             Struct apiVersionStruct = (Struct) apiVersionsObj;
@@ -99,9 +122,8 @@ public class ApiVersionsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.API_VERSIONS.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
         List<Struct> apiVersionList = new ArrayList<>();
         for (ApiVersion apiVersion : apiKeyToApiVersion.values()) {
             Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME);
@@ -116,7 +138,7 @@ public class ApiVersionsResponse extends AbstractResponse {
 
     public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) {
-            return API_VERSIONS_RESPONSE;
+            return defaultApiVersionsResponse();
         }
         return createApiVersionsResponse(throttleTimeMs, maxMagic);
     }
@@ -141,22 +163,28 @@ public class ApiVersionsResponse extends AbstractResponse {
         return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
     }
 
+    private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
+        Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
+        for (ApiVersion apiVersion: apiVersions) {
+            tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
+        }
+        return tempApiIdToApiVersion;
+    }
+
     public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) {
-        List<ApiVersion> versionList = new ArrayList<>();
+        List<ApiVersionsResponse.ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
             if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
-                versionList.add(new ApiVersion(apiKey));
+                versionList.add(new ApiVersionsResponse.ApiVersion(apiKey));
             }
         }
         return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList);
     }
 
-    private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
-        Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
-        for (ApiVersion apiVersion: apiVersions) {
-            tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
-        }
-        return tempApiIdToApiVersion;
+    public static ApiVersionsResponse defaultApiVersionsResponse() {
+        if (defaultApiVersionsResponse == null)
+            defaultApiVersionsResponse = createApiVersionsResponse(DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
+        return defaultApiVersionsResponse;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 1b49c6a..c77bd13 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -19,14 +19,26 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+
 public class ControlledShutdownRequest extends AbstractRequest {
     private static final String BROKER_ID_KEY_NAME = "broker_id";
 
+    private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema(
+            new Field(BROKER_ID_KEY_NAME, INT32, "The id of the broker for which controlled shutdown has been requested."));
+    private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1};
+    }
+
     public static class Builder extends AbstractRequest.Builder<ControlledShutdownRequest> {
         private final int brokerId;
 
@@ -35,7 +47,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
         }
 
         public Builder(int brokerId, Short desiredVersion) {
-            super(ApiKeys.CONTROLLED_SHUTDOWN_KEY, desiredVersion);
+            super(ApiKeys.CONTROLLED_SHUTDOWN, desiredVersion);
             this.brokerId = brokerId;
         }
 
@@ -74,7 +86,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
                 return new ControlledShutdownResponse(Errors.forException(e), Collections.<TopicPartition>emptySet());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN_KEY.latestVersion()));
+                        versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()));
         }
     }
 
@@ -84,12 +96,12 @@ public class ControlledShutdownRequest extends AbstractRequest {
 
     public static ControlledShutdownRequest parse(ByteBuffer buffer, short version) {
         return new ControlledShutdownRequest(
-                ApiKeys.CONTROLLED_SHUTDOWN_KEY.parseRequest(version, buffer), version);
+                ApiKeys.CONTROLLED_SHUTDOWN.parseRequest(version, buffer), version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN_KEY.requestSchema(version()));
+        Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.requestSchema(version()));
         struct.set(BROKER_ID_KEY_NAME, brokerId);
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 00973f0..e0b3860 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,13 +30,28 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
 public class ControlledShutdownResponse extends AbstractResponse {
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String PARTITIONS_REMAINING_KEY_NAME = "partitions_remaining";
 
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_KEY_NAME = "partition";
+    private static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema(
+            TOPIC_NAME,
+            PARTITION_ID);
+
+    private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field("partitions_remaining", new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " +
+                    "that the broker still leads."));
+
+    private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1};
+    }
 
     /**
      * Possible error codes:
@@ -52,12 +70,12 @@ public class ControlledShutdownResponse extends AbstractResponse {
     }
 
     public ControlledShutdownResponse(Struct struct) {
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        error = Errors.forCode(struct.get(ERROR_CODE));
         Set<TopicPartition> partitions = new HashSet<>();
         for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) {
             Struct topicPartition = (Struct) topicPartitionObj;
-            String topic = topicPartition.getString(TOPIC_KEY_NAME);
-            int partition = topicPartition.getInt(PARTITION_KEY_NAME);
+            String topic = topicPartition.get(TOPIC_NAME);
+            int partition = topicPartition.get(PARTITION_ID);
             partitions.add(new TopicPartition(topic, partition));
         }
         partitionsRemaining = partitions;
@@ -72,20 +90,19 @@ public class ControlledShutdownResponse extends AbstractResponse {
     }
 
     public static ControlledShutdownResponse parse(ByteBuffer buffer, short version) {
-        return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN_KEY.parseResponse(version, buffer));
+        return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN.parseResponse(version, buffer));
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN_KEY.responseSchema(version));
-
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.responseSchema(version));
+        struct.set(ERROR_CODE, error.code());
 
         List<Struct> partitionsRemainingList = new ArrayList<>(partitionsRemaining.size());
         for (TopicPartition topicPartition : partitionsRemaining) {
             Struct topicPartitionStruct = struct.instance(PARTITIONS_REMAINING_KEY_NAME);
-            topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic());
-            topicPartitionStruct.set(PARTITION_KEY_NAME, topicPartition.partition());
+            topicPartitionStruct.set(TOPIC_NAME, topicPartition.topic());
+            topicPartitionStruct.set(PARTITION_ID, topicPartition.partition());
             partitionsRemainingList.add(topicPartitionStruct);
         }
         struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 3598d4f..d281b3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -20,6 +20,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.utils.Utils;
@@ -28,8 +31,28 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.CommonFields.HOST;
+import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
+import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
+
 public class CreateAclsRequest extends AbstractRequest {
-    private final static String CREATIONS = "creations";
+    private final static String CREATIONS_KEY_NAME = "creations";
+
+    private static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
+            new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
+                    RESOURCE_TYPE,
+                    RESOURCE_NAME,
+                    PRINCIPAL,
+                    HOST,
+                    OPERATION,
+                    PERMISSION_TYPE))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{CREATE_ACLS_REQUEST_V0};
+    }
 
     public static class AclCreation {
         private final AclBinding acl;
@@ -88,7 +111,7 @@ public class CreateAclsRequest extends AbstractRequest {
     public CreateAclsRequest(Struct struct, short version) {
         super(version);
         this.aclCreations = new ArrayList<>();
-        for (Object creationStructObj : struct.getArray(CREATIONS)) {
+        for (Object creationStructObj : struct.getArray(CREATIONS_KEY_NAME)) {
             Struct creationStruct = (Struct) creationStructObj;
             aclCreations.add(AclCreation.fromStruct(creationStruct));
         }
@@ -99,11 +122,11 @@ public class CreateAclsRequest extends AbstractRequest {
         Struct struct = new Struct(ApiKeys.CREATE_ACLS.requestSchema(version()));
         List<Struct> requests = new ArrayList<>();
         for (AclCreation creation : aclCreations) {
-            Struct creationStruct = struct.instance(CREATIONS);
+            Struct creationStruct = struct.instance(CREATIONS_KEY_NAME);
             creation.setStructFields(creationStruct);
             requests.add(creationStruct);
         }
-        struct.set(CREATIONS, requests.toArray());
+        struct.set(CREATIONS_KEY_NAME, requests.toArray());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 1fc75da..836215e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -17,14 +17,31 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
 public class CreateAclsResponse extends AbstractResponse {
-    private final static String CREATION_RESPONSES = "creation_responses";
+    private final static String CREATION_RESPONSES_KEY_NAME = "creation_responses";
+
+    private static final Schema CREATE_ACLS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(CREATION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+                    ERROR_CODE,
+                    ERROR_MESSAGE))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{CREATE_ACLS_RESPONSE_V0};
+    }
 
     public static class AclCreationResponse {
         private final ApiError error;
@@ -53,9 +70,9 @@ public class CreateAclsResponse extends AbstractResponse {
     }
 
     public CreateAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         this.aclCreationResponses = new ArrayList<>();
-        for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
+        for (Object responseStructObj : struct.getArray(CREATION_RESPONSES_KEY_NAME)) {
             Struct responseStruct = (Struct) responseStructObj;
             ApiError error = new ApiError(responseStruct);
             this.aclCreationResponses.add(new AclCreationResponse(error));
@@ -65,14 +82,14 @@ public class CreateAclsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> responseStructs = new ArrayList<>();
         for (AclCreationResponse response : aclCreationResponses) {
-            Struct responseStruct = struct.instance(CREATION_RESPONSES);
+            Struct responseStruct = struct.instance(CREATION_RESPONSES_KEY_NAME);
             response.error.write(responseStruct);
             responseStructs.add(responseStruct);
         }
-        struct.set(CREATION_RESPONSES, responseStructs.toArray());
+        struct.set(CREATION_RESPONSES_KEY_NAME, responseStructs.toArray());
         return struct;
     }