You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/19 18:07:37 UTC
kafka git commit: MINOR: Protocol schema refactor follow-up
Repository: kafka
Updated Branches:
refs/heads/trunk 5e0bb3df9 -> 92c06cbad
MINOR: Protocol schema refactor follow-up
- Use constants in a few places that were missed
- Remove ProtoUtils by moving its methods to Schema
- Merge SchemaVisitor and SchemaVisitorAdapter
- Change SchemaVisitor package.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3895 from ijuma/protocol-schema-refactor-follow-ups
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92c06cba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92c06cba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92c06cba
Branch: refs/heads/trunk
Commit: 92c06cbad591b88e69f416180052a72af455fa30
Parents: 5e0bb3d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Sep 19 11:07:32 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Sep 19 11:07:32 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/common/protocol/ApiKeys.java | 16 +++----
.../kafka/common/protocol/ProtoUtils.java | 47 --------------------
.../kafka/common/protocol/SchemaVisitor.java | 27 -----------
.../common/protocol/SchemaVisitorAdapter.java | 38 ----------------
.../kafka/common/protocol/types/Schema.java | 31 +++++++++++++
.../requests/AlterReplicaDirResponse.java | 6 +--
.../common/requests/ApiVersionsResponse.java | 2 +-
.../requests/ControlledShutdownResponse.java | 2 +-
.../common/requests/DeleteRecordsResponse.java | 4 +-
.../common/requests/DescribeLogDirsRequest.java | 4 +-
.../requests/DescribeLogDirsResponse.java | 16 +++----
.../kafka/common/requests/FetchRequest.java | 2 +-
.../kafka/common/requests/FetchResponse.java | 4 +-
.../common/requests/LeaderAndIsrRequest.java | 3 +-
.../common/requests/ListGroupsResponse.java | 8 ++--
.../common/requests/OffsetCommitResponse.java | 1 -
.../requests/OffsetsForLeaderEpochRequest.java | 2 +-
.../kafka/common/requests/RequestHeader.java | 1 -
.../common/requests/UpdateMetadataRequest.java | 3 +-
.../common/requests/WriteTxnMarkersRequest.java | 11 ++---
.../requests/WriteTxnMarkersResponse.java | 13 +++---
21 files changed, 81 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 0e087eb..62dce79 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -97,7 +97,7 @@ import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
@@ -325,18 +325,16 @@ public enum ApiKeys {
}
private static boolean retainsBufferReference(Schema schema) {
- final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE);
- SchemaVisitor detector = new SchemaVisitorAdapter() {
+ final AtomicBoolean hasBuffer = new AtomicBoolean(false);
+ Schema.Visitor detector = new Schema.Visitor() {
@Override
public void visit(Type field) {
- if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) {
- foundBufferReference.set(Boolean.TRUE);
- }
+ if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS)
+ hasBuffer.set(true);
}
};
- foundBufferReference.set(Boolean.FALSE);
- ProtoUtils.walk(schema, detector);
- return foundBufferReference.get();
+ schema.walk(detector);
+ return hasBuffer.get();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
deleted file mode 100644
index f9be12c..0000000
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.BoundField;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Type;
-
-public class ProtoUtils {
- public static void walk(Schema schema, SchemaVisitor visitor) {
- if (schema == null || visitor == null) {
- throw new IllegalArgumentException("Both schema and visitor must be provided");
- }
- handleNode(schema, visitor);
- }
-
- private static void handleNode(Type node, SchemaVisitor visitor) {
- if (node instanceof Schema) {
- Schema schema = (Schema) node;
- visitor.visit(schema);
- for (BoundField f : schema.fields()) {
- handleNode(f.def.type, visitor);
- }
- } else if (node instanceof ArrayOf) {
- ArrayOf array = (ArrayOf) node;
- visitor.visit(array);
- handleNode(array.type(), visitor);
- } else {
- visitor.visit(node);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
deleted file mode 100644
index e61cc77..0000000
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Type;
-
-public interface SchemaVisitor {
- void visit(Schema schema);
- void visit(ArrayOf array);
- void visit(Type field);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
deleted file mode 100644
index 62834d0..0000000
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Type;
-
-public abstract class SchemaVisitorAdapter implements SchemaVisitor {
- @Override
- public void visit(Schema schema) {
- //nop
- }
-
- @Override
- public void visit(ArrayOf array) {
- //nop
- }
-
- @Override
- public void visit(Type field) {
- //nop
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 187e14b..faa1540 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.protocol.types;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/**
* The schema for a compound record definition
@@ -164,4 +165,34 @@ public class Schema extends Type {
}
}
+ public void walk(Visitor visitor) {
+ Objects.requireNonNull(visitor, "visitor must be non-null");
+ handleNode(this, visitor);
+ }
+
+ private static void handleNode(Type node, Visitor visitor) {
+ if (node instanceof Schema) {
+ Schema schema = (Schema) node;
+ visitor.visit(schema);
+ for (BoundField f : schema.fields())
+ handleNode(f.def.type, visitor);
+ } else if (node instanceof ArrayOf) {
+ ArrayOf array = (ArrayOf) node;
+ visitor.visit(array);
+ handleNode(array.type(), visitor);
+ } else {
+ visitor.visit(node);
+ }
+ }
+
+ /**
+ * Override one or more of the visit methods with the desired logic.
+ */
+ public static abstract class Visitor {
+ public void visit(Schema schema) {}
+ public void visit(ArrayOf array) {}
+ public void visit(Type field) {}
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
index ed00b75..1767d45 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -48,9 +48,9 @@ public class AlterReplicaDirResponse extends AbstractResponse {
private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
- new Field("topics", new ArrayOf(new Schema(
+ new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
- new Field("partitions", new ArrayOf(new Schema(
+ new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
ERROR_CODE)))))));
@@ -127,4 +127,4 @@ public class AlterReplicaDirResponse extends AbstractResponse {
public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 6a0418f..2bdc8aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -165,7 +165,7 @@ public class ApiVersionsResponse extends AbstractResponse {
private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
- for (ApiVersion apiVersion: apiVersions) {
+ for (ApiVersion apiVersion : apiVersions) {
tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
}
return tempApiIdToApiVersion;
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index e0b3860..dfd68e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -44,7 +44,7 @@ public class ControlledShutdownResponse extends AbstractResponse {
private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(
ERROR_CODE,
- new Field("partitions_remaining", new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " +
+ new Field(PARTITIONS_REMAINING_KEY_NAME, new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " +
"that the broker still leads."));
private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index aeea1cd..5bfdec8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -62,7 +62,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
private static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
- new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+ new Field(TOPICS_KEY_NAME, new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
public static Schema[] schemaVersions() {
return new Schema[]{DELETE_RECORDS_RESPONSE_V0};
@@ -164,4 +164,4 @@ public class DeleteRecordsResponse extends AbstractResponse {
public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
index 0169da5..5f35c43 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
@@ -45,9 +45,9 @@ public class DescribeLogDirsRequest extends AbstractRequest {
private static final String PARTITIONS_KEY_NAME = "partitions";
private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema(
- new Field("topics", ArrayOf.nullable(new Schema(
+ new Field(TOPICS_KEY_NAME, ArrayOf.nullable(new Schema(
TOPIC_NAME,
- new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")))));
+ new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic.")))));
public static Schema[] schemaVersions() {
return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0};
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index e35056e..dc226d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -62,18 +62,18 @@ public class DescribeLogDirsResponse extends AbstractResponse {
private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
- new Field("log_dirs", new ArrayOf(new Schema(
+ new Field(LOG_DIRS_KEY_NAME, new ArrayOf(new Schema(
ERROR_CODE,
- new Field("log_dir", STRING, "The absolute log directory path."),
- new Field("topics", new ArrayOf(new Schema(
+ new Field(LOG_DIR_KEY_NAME, STRING, "The absolute log directory path."),
+ new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
- new Field("partitions", new ArrayOf(new Schema(
+ new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
- new Field("size", INT64, "The size of the log segments of the partition in bytes."),
- new Field("offset_lag", INT64, "The lag of the log's LEO w.r.t. partition's HW " +
+ new Field(SIZE_KEY_NAME, INT64, "The size of the log segments of the partition in bytes."),
+ new Field(OFFSET_LAG_KEY_NAME, INT64, "The lag of the log's LEO w.r.t. partition's HW " +
"(if it is the current log for the partition) or current replica's LEO " +
"(if it is the future log for the partition)"),
- new Field("is_future", BOOLEAN, "True if this log is created by " +
+ new Field(IS_FUTURE_KEY_NAME, BOOLEAN, "True if this log is created by " +
"AlterReplicaDirRequest and will replace the current log of the replica " +
"in the future.")))))))))));
@@ -211,4 +211,4 @@ public class DescribeLogDirsResponse extends AbstractResponse {
return builder.toString();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 4a60c94..3fea26c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -60,7 +60,7 @@ public class FetchRequest extends AbstractRequest {
new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."),
new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch."));
- // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+ // FETCH_REQUEST_PARTITION_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
private static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(
PARTITION_ID,
new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."),
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 417e845..f8d3090 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -84,8 +84,10 @@ public class FetchResponse extends AbstractResponse {
new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
// Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
// record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1
- // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
+ // (magic byte 0 and 1). For details, see Records, RecordBatch and Record.
private static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
+
+ // The partition ordering is now relevant - partitions will be processed in order they appear in request.
private static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
// The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 73f037f..27aaf0a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -93,7 +93,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
new Field(LIVE_LEADERS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
- // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies whether the replica should have existed on the broker or not.
+ // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies whether the replica should
+ // have existed on the broker or not.
private static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema(
new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index cdf4c59..8f48f39 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -38,15 +38,15 @@ public class ListGroupsResponse extends AbstractResponse {
private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(
- new Field("group_id", STRING),
- new Field("protocol_type", STRING));
+ new Field(GROUP_ID_KEY_NAME, STRING),
+ new Field(PROTOCOL_TYPE_KEY_NAME, STRING));
private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(
ERROR_CODE,
- new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+ new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
THROTTLE_TIME_MS,
ERROR_CODE,
- new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+ new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
public static Schema[] schemaVersions() {
return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 0181eef..13484ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -85,7 +85,6 @@ public class OffsetCommitResponse extends AbstractResponse {
OFFSET_COMMIT_RESPONSE_V3};
}
-
private final Map<TopicPartition, Errors> responseData;
private final int throttleTimeMs;
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index b5fce78..d0585be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -43,7 +43,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
/* Offsets for Leader Epoch api */
private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema(
PARTITION_ID,
- new Field("leader_epoch", INT32, "The epoch"));
+ new Field(LEADER_EPOCH, INT32, "The epoch"));
private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0 = new Schema(
TOPIC_NAME,
new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0)));
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 1284e7e..956d813 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -157,7 +157,6 @@ public class RequestHeader extends AbstractRequestResponse {
return result;
}
-
private static Schema schema(short apiKey, short version) {
if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN.id && version == 0)
// This will be removed once we remove support for v0 of ControlledShutdownRequest, which
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 67ae8e1..6c36bda 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -126,7 +126,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2;
- // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+ // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas field. This field specifies
+ // the list of replicas that are offline.
private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 = new Schema(
TOPIC_NAME,
PARTITION_ID,
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 96dfb2f..3f7a0c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -40,7 +40,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT64;
public class WriteTxnMarkersRequest extends AbstractRequest {
private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
- private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
+ private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
@@ -60,7 +60,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
"hosted by this transaction coordinator"));
private static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
- new Field(TXN_MARKER_ENTRY_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The transaction markers to be written."));
+ new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The transaction markers to " +
+ "be written."));
public static Schema[] schemaVersions() {
return new Schema[]{WRITE_TXN_MARKERS_REQUEST_V0};
@@ -160,7 +161,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
public WriteTxnMarkersRequest(Struct struct, short version) {
super(version);
List<TxnMarkerEntry> markers = new ArrayList<>();
- Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
+ Object[] markersArray = struct.getArray(TXN_MARKERS_KEY_NAME);
for (Object markerObj : markersArray) {
Struct markerStruct = (Struct) markerObj;
@@ -197,7 +198,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
Object[] markersArray = new Object[markers.size()];
int i = 0;
for (TxnMarkerEntry entry : markers) {
- Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
+ Struct markerStruct = struct.instance(TXN_MARKERS_KEY_NAME);
markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId);
markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch);
markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
@@ -215,7 +216,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
markerStruct.set(TOPICS_KEY_NAME, partitionsArray);
markersArray[i++] = markerStruct;
}
- struct.set(TXN_MARKER_ENTRY_KEY_NAME, markersArray);
+ struct.set(TXN_MARKERS_KEY_NAME, markersArray);
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 3372670..797fb59 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -35,7 +35,7 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT64;
public class WriteTxnMarkersResponse extends AbstractResponse {
- private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
+ private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String TOPICS_KEY_NAME = "topics";
@@ -45,7 +45,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
PARTITION_ID,
ERROR_CODE);
- private static final Schema WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0 = new Schema(
+ private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
@@ -53,7 +53,8 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
"Errors per partition from writing markers."));
private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
- new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers."));
+ new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "Errors per partition from " +
+ "writing markers."));
public static Schema[] schemaVersions() {
return new Schema[]{WRITE_TXN_MARKERS_RESPONSE_V0};
@@ -82,7 +83,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
public WriteTxnMarkersResponse(Struct struct) {
Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>();
- Object[] responseArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
+ Object[] responseArray = struct.getArray(TXN_MARKERS_KEY_NAME);
for (Object responseObj : responseArray) {
Struct responseStruct = (Struct) responseObj;
@@ -113,7 +114,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
Object[] responsesArray = new Object[errors.size()];
int k = 0;
for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) {
- Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
+ Struct responseStruct = struct.instance(TXN_MARKERS_KEY_NAME);
responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey());
Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
@@ -141,7 +142,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
responsesArray[k++] = responseStruct;
}
- struct.set(TXN_MARKER_ENTRY_KEY_NAME, responsesArray);
+ struct.set(TXN_MARKERS_KEY_NAME, responsesArray);
return struct;
}