You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/19 04:13:27 UTC
[6/8] kafka git commit: MINOR: Move request/response schemas to the
corresponding object representation
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
new file mode 100644
index 0000000..b031b4f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol.types;
+
+/**
+ * A field definition bound to a particular schema.
+ */
+public class BoundField {
+ public final Field def;
+ final int index;
+ final Schema schema;
+
+ public BoundField(Field def, Schema schema, int index) {
+ this.def = def;
+ this.schema = schema;
+ this.index = index;
+ }
+
+ @Override
+ public String toString() {
+ return def.name + ":" + def.type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 29a89d4..8da848b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -16,63 +16,67 @@
*/
package org.apache.kafka.common.protocol.types;
-/**
- * A field in a schema
- */
public class Field {
-
- public static final Object NO_DEFAULT = new Object();
-
- final int index;
public final String name;
+ public final String docString;
public final Type type;
+ public final boolean hasDefaultValue;
public final Object defaultValue;
- public final String doc;
- final Schema schema;
- /**
- * Create the field.
- *
- * @throws SchemaException If the default value is not primitive and the validation fails
- */
- public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
- this.index = index;
+ public Field(String name, Type type, String docString, boolean hasDefaultValue, Object defaultValue) {
this.name = name;
+ this.docString = docString;
this.type = type;
- this.doc = doc;
+ this.hasDefaultValue = hasDefaultValue;
this.defaultValue = defaultValue;
- this.schema = schema;
- if (defaultValue != NO_DEFAULT)
+
+ if (hasDefaultValue)
type.validate(defaultValue);
}
- public Field(int index, String name, Type type, String doc, Object defaultValue) {
- this(index, name, type, doc, defaultValue, null);
+ public Field(String name, Type type, String docString) {
+ this(name, type, docString, false, null);
}
- public Field(String name, Type type, String doc, Object defaultValue) {
- this(-1, name, type, doc, defaultValue);
+ public Field(String name, Type type, String docString, Object defaultValue) {
+ this(name, type, docString, true, defaultValue);
}
- public Field(String name, Type type, String doc) {
- this(name, type, doc, NO_DEFAULT);
+ public Field(String name, Type type) {
+ this(name, type, null, false, null);
}
- public Field(String name, Type type) {
- this(name, type, "");
+ public static class Int8 extends Field {
+ public Int8(String name, String docString) {
+ super(name, Type.INT8, docString, false, null);
+ }
}
- public Type type() {
- return type;
+ public static class Int32 extends Field {
+ public Int32(String name, String docString) {
+ super(name, Type.INT32, docString, false, null);
+ }
+
+ public Int32(String name, String docString, int defaultValue) {
+ super(name, Type.INT32, docString, true, defaultValue);
+ }
}
- public Schema schema() {
- return schema;
+ public static class Int16 extends Field {
+ public Int16(String name, String docString) {
+ super(name, Type.INT16, docString, false, null);
+ }
}
+ public static class Str extends Field {
+ public Str(String name, String docString) {
+ super(name, Type.STRING, docString, false, null);
+ }
+ }
- @Override
- public String toString() {
- return name + ":" + type;
+ public static class NullableStr extends Field {
+ public NullableStr(String name, String docString) {
+ super(name, Type.NULLABLE_STRING, docString, false, null);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index a9c08aa..187e14b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -25,8 +25,8 @@ import java.util.Map;
*/
public class Schema extends Type {
- private final Field[] fields;
- private final Map<String, Field> fieldsByName;
+ private final BoundField[] fields;
+ private final Map<String, BoundField> fieldsByName;
/**
* Construct the schema with a given list of its field values
@@ -34,14 +34,14 @@ public class Schema extends Type {
* @throws SchemaException If the given list have duplicate fields
*/
public Schema(Field... fs) {
- this.fields = new Field[fs.length];
+ this.fields = new BoundField[fs.length];
this.fieldsByName = new HashMap<>();
for (int i = 0; i < this.fields.length; i++) {
- Field field = fs[i];
- if (fieldsByName.containsKey(field.name))
- throw new SchemaException("Schema contains a duplicate field: " + field.name);
- this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
- this.fieldsByName.put(fs[i].name, this.fields[i]);
+ Field def = fs[i];
+ if (fieldsByName.containsKey(def.name))
+ throw new SchemaException("Schema contains a duplicate field: " + def.name);
+ this.fields[i] = new BoundField(def, this, i);
+ this.fieldsByName.put(def.name, this.fields[i]);
}
}
@@ -51,12 +51,12 @@ public class Schema extends Type {
@Override
public void write(ByteBuffer buffer, Object o) {
Struct r = (Struct) o;
- for (Field field : fields) {
+ for (BoundField field : fields) {
try {
- Object value = field.type().validate(r.get(field));
- field.type.write(buffer, value);
+ Object value = field.def.type.validate(r.get(field));
+ field.def.type.write(buffer, value);
} catch (Exception e) {
- throw new SchemaException("Error writing field '" + field.name + "': " +
+ throw new SchemaException("Error writing field '" + field.def.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
@@ -70,9 +70,9 @@ public class Schema extends Type {
Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
try {
- objects[i] = fields[i].type.read(buffer);
+ objects[i] = fields[i].def.type.read(buffer);
} catch (Exception e) {
- throw new SchemaException("Error reading field '" + fields[i].name + "': " +
+ throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
@@ -86,11 +86,11 @@ public class Schema extends Type {
public int sizeOf(Object o) {
int size = 0;
Struct r = (Struct) o;
- for (Field field : fields) {
+ for (BoundField field : fields) {
try {
- size += field.type.sizeOf(r.get(field));
+ size += field.def.type.sizeOf(r.get(field));
} catch (Exception e) {
- throw new SchemaException("Error computing size for field '" + field.name + "': " +
+ throw new SchemaException("Error computing size for field '" + field.def.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
@@ -110,7 +110,7 @@ public class Schema extends Type {
* @param slot The slot at which this field sits
* @return The field
*/
- public Field get(int slot) {
+ public BoundField get(int slot) {
return this.fields[slot];
}
@@ -120,14 +120,14 @@ public class Schema extends Type {
* @param name The name of the field
* @return The field
*/
- public Field get(String name) {
+ public BoundField get(String name) {
return this.fieldsByName.get(name);
}
/**
* Get all the fields in this schema
*/
- public Field[] fields() {
+ public BoundField[] fields() {
return this.fields;
}
@@ -151,11 +151,11 @@ public class Schema extends Type {
public Struct validate(Object item) {
try {
Struct struct = (Struct) item;
- for (Field field : fields) {
+ for (BoundField field : fields) {
try {
- field.type.validate(struct.get(field));
+ field.def.type.validate(struct.get(field));
} catch (SchemaException e) {
- throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
+ throw new SchemaException("Invalid value for field '" + field.def.name + "': " + e.getMessage());
}
}
return struct;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index c42390b..b3e9975 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -51,16 +51,16 @@ public class Struct {
* @param field The field for which to get the default value
* @throws SchemaException if the field has no value and has no default.
*/
- private Object getFieldOrDefault(Field field) {
+ private Object getFieldOrDefault(BoundField field) {
Object value = this.values[field.index];
if (value != null)
return value;
- else if (field.defaultValue != Field.NO_DEFAULT)
- return field.defaultValue;
- else if (field.type.isNullable())
+ else if (field.def.hasDefaultValue)
+ return field.def.defaultValue;
+ else if (field.def.type.isNullable())
return null;
else
- throw new SchemaException("Missing value for field '" + field.name + "' which has no default value.");
+ throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value.");
}
/**
@@ -70,11 +70,43 @@ public class Struct {
* @return The value for that field.
* @throws SchemaException if the field has no value and has no default.
*/
- public Object get(Field field) {
+ public Object get(BoundField field) {
validateField(field);
return getFieldOrDefault(field);
}
+ public Byte get(Field.Int8 field) {
+ return getByte(field.name);
+ }
+
+ public Integer get(Field.Int32 field) {
+ return getInt(field.name);
+ }
+
+ public Short get(Field.Int16 field) {
+ return getShort(field.name);
+ }
+
+ public String get(Field.Str field) {
+ return getString(field.name);
+ }
+
+ public String get(Field.NullableStr field) {
+ return getString(field.name);
+ }
+
+ public Integer getOrElse(Field.Int32 field, int alternative) {
+ if (hasField(field.name))
+ return getInt(field.name);
+ return alternative;
+ }
+
+ public String getOrElse(Field.NullableStr field, String alternative) {
+ if (hasField(field.name))
+ return getString(field.name);
+ return alternative;
+ }
+
/**
* Get the record value for the field with the given name by doing a hash table lookup (slower!)
*
@@ -83,7 +115,7 @@ public class Struct {
* @throws SchemaException If no such field exists
*/
public Object get(String name) {
- Field field = schema.get(name);
+ BoundField field = schema.get(name);
if (field == null)
throw new SchemaException("No such field: " + name);
return getFieldOrDefault(field);
@@ -98,7 +130,11 @@ public class Struct {
return schema.get(name) != null;
}
- public Struct getStruct(Field field) {
+ public boolean hasField(Field def) {
+ return schema.get(def.name) != null;
+ }
+
+ public Struct getStruct(BoundField field) {
return (Struct) get(field);
}
@@ -106,7 +142,7 @@ public class Struct {
return (Struct) get(name);
}
- public Byte getByte(Field field) {
+ public Byte getByte(BoundField field) {
return (Byte) get(field);
}
@@ -118,7 +154,7 @@ public class Struct {
return (Records) get(name);
}
- public Short getShort(Field field) {
+ public Short getShort(BoundField field) {
return (Short) get(field);
}
@@ -126,7 +162,7 @@ public class Struct {
return (Short) get(name);
}
- public Integer getInt(Field field) {
+ public Integer getInt(BoundField field) {
return (Integer) get(field);
}
@@ -138,7 +174,7 @@ public class Struct {
return (Long) get(name);
}
- public Long getLong(Field field) {
+ public Long getLong(BoundField field) {
return (Long) get(field);
}
@@ -146,7 +182,7 @@ public class Struct {
return (Long) get(name);
}
- public Object[] getArray(Field field) {
+ public Object[] getArray(BoundField field) {
return (Object[]) get(field);
}
@@ -154,7 +190,7 @@ public class Struct {
return (Object[]) get(name);
}
- public String getString(Field field) {
+ public String getString(BoundField field) {
return (String) get(field);
}
@@ -162,7 +198,7 @@ public class Struct {
return (String) get(name);
}
- public Boolean getBoolean(Field field) {
+ public Boolean getBoolean(BoundField field) {
return (Boolean) get(field);
}
@@ -170,7 +206,7 @@ public class Struct {
return (Boolean) get(name);
}
- public ByteBuffer getBytes(Field field) {
+ public ByteBuffer getBytes(BoundField field) {
Object result = get(field);
if (result instanceof byte[])
return ByteBuffer.wrap((byte[]) result);
@@ -191,7 +227,7 @@ public class Struct {
* @param value The value
* @throws SchemaException If the validation of the field failed
*/
- public Struct set(Field field, Object value) {
+ public Struct set(BoundField field, Object value) {
validateField(field);
this.values[field.index] = value;
return this;
@@ -205,13 +241,40 @@ public class Struct {
* @throws SchemaException If the field is not known
*/
public Struct set(String name, Object value) {
- Field field = this.schema.get(name);
+ BoundField field = this.schema.get(name);
if (field == null)
throw new SchemaException("Unknown field: " + name);
this.values[field.index] = value;
return this;
}
+ public Struct set(Field.Str def, String value) {
+ return set(def.name, value);
+ }
+
+ public Struct set(Field.NullableStr def, String value) {
+ return set(def.name, value);
+ }
+
+ public Struct set(Field.Int8 def, byte value) {
+ return set(def.name, value);
+ }
+
+ public Struct set(Field.Int32 def, int value) {
+ return set(def.name, value);
+ }
+
+ public Struct set(Field.Int16 def, short value) {
+ return set(def.name, value);
+ }
+
+ public Struct setIfExists(Field def, Object value) {
+ BoundField field = this.schema.get(def.name);
+ if (field != null)
+ this.values[field.index] = value;
+ return this;
+ }
+
/**
* Create a struct for the schema of a container type (struct or array). Note that for array type, this method
* assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
@@ -221,15 +284,15 @@ public class Struct {
* @return The struct
* @throws SchemaException If the given field is not a container type
*/
- public Struct instance(Field field) {
+ public Struct instance(BoundField field) {
validateField(field);
- if (field.type() instanceof Schema) {
- return new Struct((Schema) field.type());
- } else if (field.type() instanceof ArrayOf) {
- ArrayOf array = (ArrayOf) field.type();
+ if (field.def.type instanceof Schema) {
+ return new Struct((Schema) field.def.type);
+ } else if (field.def.type instanceof ArrayOf) {
+ ArrayOf array = (ArrayOf) field.def.type;
return new Struct((Schema) array.type());
} else {
- throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type());
+ throw new SchemaException("Field '" + field.def.name + "' is not a container type, it is of type " + field.def.type);
}
}
@@ -270,9 +333,9 @@ public class Struct {
*
* @throws SchemaException If validation fails
*/
- private void validateField(Field field) {
+ private void validateField(BoundField field) {
if (this.schema != field.schema)
- throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance.");
+ throw new SchemaException("Attempt to access field '" + field.def.name + "' from a different schema instance.");
if (field.index > values.length)
throw new SchemaException("Invalid field index: " + field.index);
}
@@ -291,10 +354,10 @@ public class Struct {
StringBuilder b = new StringBuilder();
b.append('{');
for (int i = 0; i < this.values.length; i++) {
- Field f = this.schema.get(i);
- b.append(f.name);
+ BoundField f = this.schema.get(i);
+ b.append(f.def.name);
b.append('=');
- if (f.type() instanceof ArrayOf && this.values[i] != null) {
+ if (f.def.type instanceof ArrayOf && this.values[i] != null) {
Object[] arrayValue = (Object[]) this.values[i];
b.append('[');
for (int j = 0; j < arrayValue.length; j++) {
@@ -317,8 +380,8 @@ public class Struct {
final int prime = 31;
int result = 1;
for (int i = 0; i < this.values.length; i++) {
- Field f = this.schema.get(i);
- if (f.type() instanceof ArrayOf) {
+ BoundField f = this.schema.get(i);
+ if (f.def.type instanceof ArrayOf) {
if (this.get(f) != null) {
Object[] arrayObject = (Object[]) this.get(f);
for (Object arrayItem: arrayObject)
@@ -346,9 +409,9 @@ public class Struct {
if (schema != other.schema)
return false;
for (int i = 0; i < this.values.length; i++) {
- Field f = this.schema.get(i);
+ BoundField f = this.schema.get(i);
boolean result;
- if (f.type() instanceof ArrayOf) {
+ if (f.def.type instanceof ArrayOf) {
result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
} else {
Object thisField = this.get(f);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 34fda50..1f1418f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -133,9 +133,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new SyncGroupRequest(struct, apiVersion);
case STOP_REPLICA:
return new StopReplicaRequest(struct, apiVersion);
- case CONTROLLED_SHUTDOWN_KEY:
+ case CONTROLLED_SHUTDOWN:
return new ControlledShutdownRequest(struct, apiVersion);
- case UPDATE_METADATA_KEY:
+ case UPDATE_METADATA:
return new UpdateMetadataRequest(struct, apiVersion);
case LEADER_AND_ISR:
return new LeaderAndIsrRequest(struct, apiVersion);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 95d1ef9..b6cb8fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public abstract class AbstractResponse extends AbstractRequestResponse {
- public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
public static final int DEFAULT_THROTTLE_TIME = 0;
protected Send toSend(String destination, ResponseHeader header, short apiVersion) {
@@ -66,9 +65,9 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new SyncGroupResponse(struct);
case STOP_REPLICA:
return new StopReplicaResponse(struct);
- case CONTROLLED_SHUTDOWN_KEY:
+ case CONTROLLED_SHUTDOWN:
return new ControlledShutdownResponse(struct);
- case UPDATE_METADATA_KEY:
+ case UPDATE_METADATA:
return new UpdateMetadataResponse(struct);
case LEADER_AND_ISR:
return new LeaderAndIsrResponse(struct);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 36b290f..e3e4d79 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -18,16 +18,32 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
public class AddOffsetsToTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
+ private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
+ new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."),
+ new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
+ new Field(EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
+ new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Consumer group id whose offsets should be included in the transaction."));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0};
+ }
+
public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
private final String transactionalId;
private final long producerId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 981a234..10dc279 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -18,12 +18,22 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
public class AddOffsetsToTxnResponse extends AbstractResponse {
- private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
+ THROTTLE_TIME_MS,
+ ERROR_CODE);
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0};
+ }
// Possible error codes:
// NotCoordinator
@@ -44,8 +54,8 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
}
public AddOffsetsToTxnResponse(Struct struct) {
- this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
- this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+ this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
+ this.error = Errors.forCode(struct.get(ERROR_CODE));
}
public int throttleTimeMs() {
@@ -59,8 +69,8 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
- struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
- struct.set(ERROR_CODE_KEY_NAME, error.code());
+ struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+ struct.set(ERROR_CODE, error.code());
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 6fe034c..c195e24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
@@ -28,14 +31,32 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
public class AddPartitionsToTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
- private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
- private static final String TOPIC_KEY_NAME = "topic";
+ private static final String TOPICS_KEY_NAME = "topics";
private static final String PARTITIONS_KEY_NAME = "partitions";
+ private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
+ new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."),
+ new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
+ new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
+ new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+ TOPIC_NAME,
+ new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
+ "The partitions to add to the transaction."));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0};
+ }
+
public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
private final String transactionalId;
private final long producerId;
@@ -93,10 +114,10 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
List<TopicPartition> partitions = new ArrayList<>();
- Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+ Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
- String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+ String topic = topicPartitionStruct.get(TOPIC_NAME);
for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
partitions.add(new TopicPartition(topic, (Integer) partitionObj));
}
@@ -131,13 +152,13 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
Object[] partitionsArray = new Object[mappedPartitions.size()];
int i = 0;
for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
- Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
- topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+ Struct topicPartitionsStruct = struct.instance(TOPICS_KEY_NAME);
+ topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
partitionsArray[i++] = topicPartitionsStruct;
}
- struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+ struct.set(TOPICS_KEY_NAME, partitionsArray);
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index f05310a..e9f6088 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
@@ -28,13 +31,27 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
public class AddPartitionsToTxnResponse extends AbstractResponse {
- private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String ERRORS_KEY_NAME = "errors";
- private static final String TOPIC_NAME = "topic";
- private static final String PARTITION = "partition";
private static final String PARTITION_ERRORS = "partition_errors";
+ private static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field(ERRORS_KEY_NAME, new ArrayOf(new Schema(
+ TOPIC_NAME,
+ new Field(PARTITION_ERRORS, new ArrayOf(new Schema(
+ PARTITION_ID,
+ ERROR_CODE)))))));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
+ }
+
private final int throttleTimeMs;
// Possible error codes:
@@ -56,15 +73,15 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
}
public AddPartitionsToTxnResponse(Struct struct) {
- this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
errors = new HashMap<>();
for (Object topic : struct.getArray(ERRORS_KEY_NAME)) {
Struct topicStruct = (Struct) topic;
- final String topicName = topicStruct.getString(TOPIC_NAME);
+ final String topicName = topicStruct.get(TOPIC_NAME);
for (Object partition : topicStruct.getArray(PARTITION_ERRORS)) {
Struct partitionStruct = (Struct) partition;
- TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.getInt(PARTITION));
- errors.put(topicPartition, Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME)));
+ TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.get(PARTITION_ID));
+ errors.put(topicPartition, Errors.forCode(partitionStruct.get(ERROR_CODE)));
}
}
}
@@ -80,7 +97,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
- struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+ struct.set(THROTTLE_TIME_MS, throttleTimeMs);
Map<String, Map<Integer, Errors>> errorsByTopic = CollectionUtils.groupDataByTopic(errors);
List<Struct> topics = new ArrayList<>(errorsByTopic.size());
@@ -90,8 +107,8 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, Errors> partitionErrors : entry.getValue().entrySet()) {
final Struct partitionData = topicErrorCodes.instance(PARTITION_ERRORS)
- .set(PARTITION, partitionErrors.getKey())
- .set(ERROR_CODE_KEY_NAME, partitionErrors.getValue().code());
+ .set(PARTITION_ID, partitionErrors.getKey())
+ .set(ERROR_CODE, partitionErrors.getValue().code());
partitionArray.add(partitionData);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index a964f85..14b39ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -18,6 +18,9 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -27,6 +30,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
public class AlterConfigsRequest extends AbstractRequest {
private static final String RESOURCES_KEY_NAME = "resources";
@@ -38,6 +46,24 @@ public class AlterConfigsRequest extends AbstractRequest {
private static final String CONFIG_NAME = "config_name";
private static final String CONFIG_VALUE = "config_value";
+ private static final Schema CONFIG_ENTRY = new Schema(
+ new Field(CONFIG_NAME, STRING, "Configuration name"),
+ new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value"));
+
+ private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
+ new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+ new Field(RESOURCE_NAME_KEY_NAME, STRING),
+ new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY)));
+
+ private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema(
+ new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0),
+ "An array of resources to update with the provided configs."),
+ new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[] {ALTER_CONFIGS_REQUEST_V0};
+ }
+
public static class Config {
private final Collection<ConfigEntry> entries;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index 3a3eb9a..df9416e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -18,6 +18,9 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -26,12 +29,32 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
public class AlterConfigsResponse extends AbstractResponse {
private static final String RESOURCES_KEY_NAME = "resources";
private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+ private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
+ ERROR_CODE,
+ ERROR_MESSAGE,
+ new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+ new Field(RESOURCE_NAME_KEY_NAME, STRING));
+
+ private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0)));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ALTER_CONFIGS_RESPONSE_V0};
+ }
+
private final int throttleTimeMs;
private final Map<Resource, ApiError> errors;
@@ -42,7 +65,7 @@ public class AlterConfigsResponse extends AbstractResponse {
}
public AlterConfigsResponse(Struct struct) {
- throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ throttleTimeMs = struct.get(THROTTLE_TIME_MS);
Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
errors = new HashMap<>(resourcesArray.length);
for (Object resourceObj : resourcesArray) {
@@ -65,7 +88,7 @@ public class AlterConfigsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version));
- struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+ struct.set(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> resourceStructs = new ArrayList<>(errors.size());
for (Map.Entry<Resource, ApiError> entry : errors.entrySet()) {
Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
index 2c2401b..7e58fd6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
@@ -20,14 +20,21 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
public class AlterReplicaDirRequest extends AbstractRequest {
@@ -39,9 +46,19 @@ public class AlterReplicaDirRequest extends AbstractRequest {
private static final String TOPICS_KEY_NAME = "topics";
// topic level key names
- private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partitions";
+ private static final Schema ALTER_REPLICA_DIR_REQUEST_V0 = new Schema(
+ new Field("log_dirs", new ArrayOf(new Schema(
+ new Field("log_dir", STRING, "The absolute log directory path."),
+ new Field("topics", new ArrayOf(new Schema(
+ TOPIC_NAME,
+ new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))))));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ALTER_REPLICA_DIR_REQUEST_V0};
+ }
+
private final Map<TopicPartition, String> partitionDirs;
public static class Builder extends AbstractRequest.Builder<AlterReplicaDirRequest> {
@@ -76,7 +93,7 @@ public class AlterReplicaDirRequest extends AbstractRequest {
String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
Struct topicStruct = (Struct) topicStructObj;
- String topic = topicStruct.getString(TOPIC_KEY_NAME);
+ String topic = topicStruct.get(TOPIC_NAME);
for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
int partition = (Integer) partitionObj;
partitionDirs.put(new TopicPartition(topic, partition), logDir);
@@ -108,7 +125,7 @@ public class AlterReplicaDirRequest extends AbstractRequest {
List<Struct> topicStructArray = new ArrayList<>();
for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
- topicStruct.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ topicStruct.set(TOPIC_NAME, topicEntry.getKey());
topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
topicStructArray.add(topicStruct);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
index f97f9a0..ed00b75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -20,14 +20,23 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
public class AlterReplicaDirResponse extends AbstractResponse {
@@ -35,12 +44,19 @@ public class AlterReplicaDirResponse extends AbstractResponse {
private static final String TOPICS_KEY_NAME = "topics";
// topic level key names
- private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partitions";
- // partition level key names
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field("topics", new ArrayOf(new Schema(
+ TOPIC_NAME,
+ new Field("partitions", new ArrayOf(new Schema(
+ PARTITION_ID,
+ ERROR_CODE)))))));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ALTER_REPLICA_DIR_RESPONSE_V0};
+ }
/**
* Possible error code:
@@ -54,15 +70,15 @@ public class AlterReplicaDirResponse extends AbstractResponse {
private final int throttleTimeMs;
public AlterReplicaDirResponse(Struct struct) {
- throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ throttleTimeMs = struct.get(THROTTLE_TIME_MS);
responses = new HashMap<>();
for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
Struct topicStruct = (Struct) topicStructObj;
- String topic = topicStruct.getString(TOPIC_KEY_NAME);
+ String topic = topicStruct.get(TOPIC_NAME);
for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
Struct partitionStruct = (Struct) partitionStructObj;
- int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
- Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+ int partition = partitionStruct.get(PARTITION_ID);
+ Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
responses.put(new TopicPartition(topic, partition), error);
}
}
@@ -79,18 +95,18 @@ public class AlterReplicaDirResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version));
- struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+ struct.set(THROTTLE_TIME_MS, throttleTimeMs);
Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
List<Struct> topicStructArray = new ArrayList<>();
for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
- topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+ topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
List<Struct> partitionStructArray = new ArrayList<>();
for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
Errors response = responsesByPartitionEntry.getValue();
- partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey());
- partitionStruct.set(ERROR_CODE_KEY_NAME, response.code());
+ partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey());
+ partitionStruct.set(ERROR_CODE, response.code());
partitionStructArray.add(partitionStruct);
}
topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index d712123..dad21b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+
/**
* Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only
* defined if it adds information over the default message associated with the error code.
@@ -31,9 +34,6 @@ public class ApiError {
public static final ApiError NONE = new ApiError(Errors.NONE, null);
- private static final String CODE_KEY_NAME = "error_code";
- private static final String MESSAGE_KEY_NAME = "error_message";
-
private final Errors error;
private final String message;
@@ -45,12 +45,9 @@ public class ApiError {
}
public ApiError(Struct struct) {
- error = Errors.forCode(struct.getShort(CODE_KEY_NAME));
+ error = Errors.forCode(struct.get(ERROR_CODE));
// In some cases, the error message field was introduced in newer version
- if (struct.hasField(MESSAGE_KEY_NAME))
- message = struct.getString(MESSAGE_KEY_NAME);
- else
- message = null;
+ message = struct.getOrElse(ERROR_MESSAGE, null);
}
public ApiError(Errors error, String message) {
@@ -59,10 +56,9 @@ public class ApiError {
}
public void write(Struct struct) {
- struct.set(CODE_KEY_NAME, error.code());
- // In some cases, the error message field was introduced in a newer protocol API version
- if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != Errors.NONE)
- struct.set(MESSAGE_KEY_NAME, message);
+ struct.set(ERROR_CODE, error.code());
+ if (error != Errors.NONE)
+ struct.setIfExists(ERROR_MESSAGE, message);
}
public boolean is(Errors error) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 025ef6c..22daf6c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -18,12 +18,22 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Collections;
public class ApiVersionsRequest extends AbstractRequest {
+ private static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
+
+ /* v1 request is the same as v0. Throttle time has been added to response */
+ private static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0;
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
+ }
+
public static class Builder extends AbstractRequest.Builder<ApiVersionsRequest> {
public Builder() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 5a48c93..6a0418f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
@@ -28,15 +31,35 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ApiVersionsResponse extends AbstractResponse {
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
- public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
- private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
- public static final String ERROR_CODE_KEY_NAME = "error_code";
- public static final String API_VERSIONS_KEY_NAME = "api_versions";
- public static final String API_KEY_NAME = "api_key";
- public static final String MIN_VERSION_KEY_NAME = "min_version";
- public static final String MAX_VERSION_KEY_NAME = "max_version";
+public class ApiVersionsResponse extends AbstractResponse {
+ private static final String API_VERSIONS_KEY_NAME = "api_versions";
+ private static final String API_KEY_NAME = "api_key";
+ private static final String MIN_VERSION_KEY_NAME = "min_version";
+ private static final String MAX_VERSION_KEY_NAME = "max_version";
+
+ private static final Schema API_VERSIONS_V0 = new Schema(
+ new Field(API_KEY_NAME, INT16, "API key."),
+ new Field(MIN_VERSION_KEY_NAME, INT16, "Minimum supported version."),
+ new Field(MAX_VERSION_KEY_NAME, INT16, "Maximum supported version."));
+
+ private static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(
+ ERROR_CODE,
+ new Field(API_VERSIONS_KEY_NAME, new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."));
+ private static final Schema API_VERSIONS_RESPONSE_V1 = new Schema(
+ ERROR_CODE,
+ new Field(API_VERSIONS_KEY_NAME, new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
+ THROTTLE_TIME_MS);
+
+ // initialized lazily to avoid circular initialization dependence with ApiKeys
+ private static volatile ApiVersionsResponse defaultApiVersionsResponse;
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
+ }
/**
* Possible error codes:
@@ -83,8 +106,8 @@ public class ApiVersionsResponse extends AbstractResponse {
}
public ApiVersionsResponse(Struct struct) {
- this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
- this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+ this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ this.error = Errors.forCode(struct.get(ERROR_CODE));
List<ApiVersion> tempApiVersions = new ArrayList<>();
for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
Struct apiVersionStruct = (Struct) apiVersionsObj;
@@ -99,9 +122,8 @@ public class ApiVersionsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.API_VERSIONS.responseSchema(version));
- if (struct.hasField(THROTTLE_TIME_KEY_NAME))
- struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
- struct.set(ERROR_CODE_KEY_NAME, error.code());
+ struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+ struct.set(ERROR_CODE, error.code());
List<Struct> apiVersionList = new ArrayList<>();
for (ApiVersion apiVersion : apiKeyToApiVersion.values()) {
Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME);
@@ -116,7 +138,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) {
if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) {
- return API_VERSIONS_RESPONSE;
+ return defaultApiVersionsResponse();
}
return createApiVersionsResponse(throttleTimeMs, maxMagic);
}
@@ -141,22 +163,28 @@ public class ApiVersionsResponse extends AbstractResponse {
return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
}
+ private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
+ Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
+ for (ApiVersion apiVersion: apiVersions) {
+ tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
+ }
+ return tempApiIdToApiVersion;
+ }
+
public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) {
- List<ApiVersion> versionList = new ArrayList<>();
+ List<ApiVersionsResponse.ApiVersion> versionList = new ArrayList<>();
for (ApiKeys apiKey : ApiKeys.values()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
- versionList.add(new ApiVersion(apiKey));
+ versionList.add(new ApiVersionsResponse.ApiVersion(apiKey));
}
}
return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList);
}
- private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
- Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
- for (ApiVersion apiVersion: apiVersions) {
- tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
- }
- return tempApiIdToApiVersion;
+ public static ApiVersionsResponse defaultApiVersionsResponse() {
+ if (defaultApiVersionsResponse == null)
+ defaultApiVersionsResponse = createApiVersionsResponse(DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
+ return defaultApiVersionsResponse;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 1b49c6a..c77bd13 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -19,14 +19,26 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Collections;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+
public class ControlledShutdownRequest extends AbstractRequest {
private static final String BROKER_ID_KEY_NAME = "broker_id";
+ private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema(
+ new Field(BROKER_ID_KEY_NAME, INT32, "The id of the broker for which controlled shutdown has been requested."));
+ private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0;
+
+ public static Schema[] schemaVersions() {
+ return new Schema[] {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1};
+ }
+
public static class Builder extends AbstractRequest.Builder<ControlledShutdownRequest> {
private final int brokerId;
@@ -35,7 +47,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
}
public Builder(int brokerId, Short desiredVersion) {
- super(ApiKeys.CONTROLLED_SHUTDOWN_KEY, desiredVersion);
+ super(ApiKeys.CONTROLLED_SHUTDOWN, desiredVersion);
this.brokerId = brokerId;
}
@@ -74,7 +86,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
return new ControlledShutdownResponse(Errors.forException(e), Collections.<TopicPartition>emptySet());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN_KEY.latestVersion()));
+ versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()));
}
}
@@ -84,12 +96,12 @@ public class ControlledShutdownRequest extends AbstractRequest {
public static ControlledShutdownRequest parse(ByteBuffer buffer, short version) {
return new ControlledShutdownRequest(
- ApiKeys.CONTROLLED_SHUTDOWN_KEY.parseRequest(version, buffer), version);
+ ApiKeys.CONTROLLED_SHUTDOWN.parseRequest(version, buffer), version);
}
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN_KEY.requestSchema(version()));
+ Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.requestSchema(version()));
struct.set(BROKER_ID_KEY_NAME, brokerId);
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 00973f0..e0b3860 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -27,13 +30,28 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
public class ControlledShutdownResponse extends AbstractResponse {
- private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String PARTITIONS_REMAINING_KEY_NAME = "partitions_remaining";
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITION_KEY_NAME = "partition";
+ private static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema(
+ TOPIC_NAME,
+ PARTITION_ID);
+
+ private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(
+ ERROR_CODE,
+ new Field("partitions_remaining", new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " +
+ "that the broker still leads."));
+
+ private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1};
+ }
/**
* Possible error codes:
@@ -52,12 +70,12 @@ public class ControlledShutdownResponse extends AbstractResponse {
}
public ControlledShutdownResponse(Struct struct) {
- error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+ error = Errors.forCode(struct.get(ERROR_CODE));
Set<TopicPartition> partitions = new HashSet<>();
for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) {
Struct topicPartition = (Struct) topicPartitionObj;
- String topic = topicPartition.getString(TOPIC_KEY_NAME);
- int partition = topicPartition.getInt(PARTITION_KEY_NAME);
+ String topic = topicPartition.get(TOPIC_NAME);
+ int partition = topicPartition.get(PARTITION_ID);
partitions.add(new TopicPartition(topic, partition));
}
partitionsRemaining = partitions;
@@ -72,20 +90,19 @@ public class ControlledShutdownResponse extends AbstractResponse {
}
public static ControlledShutdownResponse parse(ByteBuffer buffer, short version) {
- return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN_KEY.parseResponse(version, buffer));
+ return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN.parseResponse(version, buffer));
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN_KEY.responseSchema(version));
-
- struct.set(ERROR_CODE_KEY_NAME, error.code());
+ Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.responseSchema(version));
+ struct.set(ERROR_CODE, error.code());
List<Struct> partitionsRemainingList = new ArrayList<>(partitionsRemaining.size());
for (TopicPartition topicPartition : partitionsRemaining) {
Struct topicPartitionStruct = struct.instance(PARTITIONS_REMAINING_KEY_NAME);
- topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic());
- topicPartitionStruct.set(PARTITION_KEY_NAME, topicPartition.partition());
+ topicPartitionStruct.set(TOPIC_NAME, topicPartition.topic());
+ topicPartitionStruct.set(PARTITION_ID, topicPartition.partition());
partitionsRemainingList.add(topicPartitionStruct);
}
struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 3598d4f..d281b3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -20,6 +20,9 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.utils.Utils;
@@ -28,8 +31,28 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.kafka.common.protocol.CommonFields.HOST;
+import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
+import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
+
public class CreateAclsRequest extends AbstractRequest {
- private final static String CREATIONS = "creations";
+ private final static String CREATIONS_KEY_NAME = "creations";
+
+ private static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
+ new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
+ RESOURCE_TYPE,
+ RESOURCE_NAME,
+ PRINCIPAL,
+ HOST,
+ OPERATION,
+ PERMISSION_TYPE))));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{CREATE_ACLS_REQUEST_V0};
+ }
public static class AclCreation {
private final AclBinding acl;
@@ -88,7 +111,7 @@ public class CreateAclsRequest extends AbstractRequest {
public CreateAclsRequest(Struct struct, short version) {
super(version);
this.aclCreations = new ArrayList<>();
- for (Object creationStructObj : struct.getArray(CREATIONS)) {
+ for (Object creationStructObj : struct.getArray(CREATIONS_KEY_NAME)) {
Struct creationStruct = (Struct) creationStructObj;
aclCreations.add(AclCreation.fromStruct(creationStruct));
}
@@ -99,11 +122,11 @@ public class CreateAclsRequest extends AbstractRequest {
Struct struct = new Struct(ApiKeys.CREATE_ACLS.requestSchema(version()));
List<Struct> requests = new ArrayList<>();
for (AclCreation creation : aclCreations) {
- Struct creationStruct = struct.instance(CREATIONS);
+ Struct creationStruct = struct.instance(CREATIONS_KEY_NAME);
creation.setStructFields(creationStruct);
requests.add(creationStruct);
}
- struct.set(CREATIONS, requests.toArray());
+ struct.set(CREATIONS_KEY_NAME, requests.toArray());
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 1fc75da..836215e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -17,14 +17,31 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
public class CreateAclsResponse extends AbstractResponse {
- private final static String CREATION_RESPONSES = "creation_responses";
+ private final static String CREATION_RESPONSES_KEY_NAME = "creation_responses";
+
+ private static final Schema CREATE_ACLS_RESPONSE_V0 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field(CREATION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
+ ERROR_CODE,
+ ERROR_MESSAGE))));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{CREATE_ACLS_RESPONSE_V0};
+ }
public static class AclCreationResponse {
private final ApiError error;
@@ -53,9 +70,9 @@ public class CreateAclsResponse extends AbstractResponse {
}
public CreateAclsResponse(Struct struct) {
- this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
this.aclCreationResponses = new ArrayList<>();
- for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
+ for (Object responseStructObj : struct.getArray(CREATION_RESPONSES_KEY_NAME)) {
Struct responseStruct = (Struct) responseStructObj;
ApiError error = new ApiError(responseStruct);
this.aclCreationResponses.add(new AclCreationResponse(error));
@@ -65,14 +82,14 @@ public class CreateAclsResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version));
- struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+ struct.set(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> responseStructs = new ArrayList<>();
for (AclCreationResponse response : aclCreationResponses) {
- Struct responseStruct = struct.instance(CREATION_RESPONSES);
+ Struct responseStruct = struct.instance(CREATION_RESPONSES_KEY_NAME);
response.error.write(responseStruct);
responseStructs.add(responseStruct);
}
- struct.set(CREATION_RESPONSES, responseStructs.toArray());
+ struct.set(CREATION_RESPONSES_KEY_NAME, responseStructs.toArray());
return struct;
}