You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/19 04:13:28 UTC
[7/8] kafka git commit: MINOR: Move request/response schemas to the
corresponding object representation
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index c7431d0..b5042c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -17,2058 +17,19 @@
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.ResponseHeader;
-import java.util.EnumSet;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.INT8;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
-import static org.apache.kafka.common.protocol.types.Type.RECORDS;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
public class Protocol {
- public static final Schema REQUEST_HEADER = new Schema(
- new Field("api_key", INT16, "The id of the request type."),
- new Field("api_version", INT16, "The version of the API."),
- new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"),
- new Field("client_id", NULLABLE_STRING, "A user specified identifier for the client making the request.", ""));
-
- // Version 0 of the controlled shutdown API used a non-standard request header (the clientId is missing).
- // This can be removed once we drop support for that version.
- public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0_HEADER = new Schema(
- new Field("api_key", INT16, "The id of the request type."),
- new Field("api_version", INT16, "The version of the API."),
- new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"));
-
-
- public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
- INT32,
- "The user-supplied value passed in with the request"));
-
- /* Metadata api */
-
- public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
- new ArrayOf(STRING),
- "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
-
- public static final Schema METADATA_REQUEST_V1 = new Schema(new Field("topics",
- ArrayOf.nullable(STRING),
- "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."));
-
- /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */
- public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
-
- /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
- public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
-
- /* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */
- public static final Schema METADATA_REQUEST_V4 = new Schema(new Field("topics",
- ArrayOf.nullable(STRING),
- "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."),
- new Field("allow_auto_topic_creation",
- BOOLEAN,
- "If this and the broker config 'auto.create.topics.enable' are true, " +
- "topics that don't exist will be created by the broker. " +
- "Otherwise, no topics will be created by the broker."));
-
- /* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */
- public static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4;
-
- public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
- new Field("host", STRING, "The hostname of the broker."),
- new Field("port", INT32,
- "The port on which the broker accepts requests."));
-
- public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
- INT16,
- "The error code for the partition, if any."),
- new Field("partition_id",
- INT32,
- "The id of the partition."),
- new Field("leader",
- INT32,
- "The id of the broker acting as leader for this partition."),
- new Field("replicas",
- new ArrayOf(INT32),
- "The set of all nodes that host this partition."),
- new Field("isr",
- new ArrayOf(INT32),
- "The set of nodes that are in sync with the leader for this partition."));
-
- public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
- INT16,
- "The error code for the given topic."),
- new Field("topic", STRING, "The name of the topic"),
- new Field("partition_metadata",
- new ArrayOf(PARTITION_METADATA_V0),
- "Metadata for each partition of the topic."));
-
- public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
- new ArrayOf(METADATA_BROKER_V0),
- "Host and port information for all brokers."),
- new Field("topic_metadata",
- new ArrayOf(TOPIC_METADATA_V0)));
-
- public static final Schema METADATA_BROKER_V1 = new Schema(new Field("node_id", INT32, "The broker id."),
- new Field("host", STRING, "The hostname of the broker."),
- new Field("port", INT32,
- "The port on which the broker accepts requests."),
- new Field("rack", NULLABLE_STRING, "The rack of the broker."));
-
- public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0;
-
- // PARTITION_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
- public static final Schema PARTITION_METADATA_V2 = new Schema(new Field("partition_error_code",
- INT16,
- "The error code for the partition, if any."),
- new Field("partition_id",
- INT32,
- "The id of the partition."),
- new Field("leader",
- INT32,
- "The id of the broker acting as leader for this partition."),
- new Field("replicas",
- new ArrayOf(INT32),
- "The set of all nodes that host this partition."),
- new Field("isr",
- new ArrayOf(INT32),
- "The set of nodes that are in sync with the leader for this partition."),
- new Field("offline_replicas",
- new ArrayOf(INT32),
- "The set of offline replicas of this partition."));
-
- public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
- new Field("topic", STRING, "The name of the topic"),
- new Field("is_internal", BOOLEAN,
- "Indicates if the topic is considered a Kafka internal topic"),
- new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1),
- "Metadata for each partition of the topic."));
-
- // TOPIC_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
- public static final Schema TOPIC_METADATA_V2 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
- new Field("topic", STRING, "The name of the topic"),
- new Field("is_internal", BOOLEAN,
- "Indicates if the topic is considered a Kafka internal topic"),
- new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V2),
- "Metadata for each partition of the topic."));
-
- public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
- "Host and port information for all brokers."),
- new Field("controller_id", INT32,
- "The broker id of the controller broker."),
- new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
-
- public static final Schema METADATA_RESPONSE_V2 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
- "Host and port information for all brokers."),
- new Field("cluster_id", NULLABLE_STRING,
- "The cluster id that this broker belongs to."),
- new Field("controller_id", INT32,
- "The broker id of the controller broker."),
- new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
-
- public static final Schema METADATA_RESPONSE_V3 = new Schema(
- newThrottleTimeField(),
- new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
- "Host and port information for all brokers."),
- new Field("cluster_id", NULLABLE_STRING,
- "The cluster id that this broker belongs to."),
- new Field("controller_id", INT32,
- "The broker id of the controller broker."),
- new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
-
- public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;
-
- // METADATA_RESPONSE_V5 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
- public static final Schema METADATA_RESPONSE_V5 = new Schema(
- newThrottleTimeField(),
- new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
- "Host and port information for all brokers."),
- new Field("cluster_id", NULLABLE_STRING,
- "The cluster id that this broker belongs to."),
- new Field("controller_id", INT32,
- "The broker id of the controller broker."),
- new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V2)));
-
- public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4, METADATA_REQUEST_V5};
- public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4, METADATA_RESPONSE_V5};
-
- /* Produce api */
-
- public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
- new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
- new Field("record_set", RECORDS)))));
-
- public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
- INT16,
- "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR."),
- new Field("timeout", INT32, "The time to await a response in ms."),
- new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
- public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(new Schema(new Field("partition",
- INT32),
- new Field("error_code",
- INT16),
- new Field("base_offset",
- INT64))))))));
- /**
- * The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0.
- * The version number is bumped up to indicate that the client supports quota throttle time field in the response.
- */
- public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;
- /**
- * The body of PRODUCE_REQUEST_V2 is the same as PRODUCE_REQUEST_V1.
- * The version number is bumped up to indicate that message format V1 is used which has relative offset and
- * timestamp.
- */
- public static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
-
- // Produce request V3 adds the transactional id which is used for authorization when attempting to write
- // transactional data. This version also adds support for message format V2.
- public static final Schema PRODUCE_REQUEST_V3 = new Schema(
- new Field("transactional_id",
- NULLABLE_STRING,
- "The transactional ID of the producer. This is used to authorize transaction produce requests. " +
- "This can be null for non-transactional producers."),
- new Field("acks",
- INT16,
- "The number of acknowledgments the producer requires the leader to have received before " +
- "considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader " +
- "and -1 for the full ISR."),
- new Field("timeout", INT32, "The time to await a response in ms."),
- new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
- /**
- * The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3.
- * The version number is bumped up to indicate that the client supports KafkaStorageException.
- * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
- */
- public static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
-
- public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
- new ArrayOf(new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(new Schema(new Field("partition",
- INT32),
- new Field("error_code",
- INT16),
- new Field("base_offset",
- INT64))))))),
- newThrottleTimeField());
- /**
- * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
- * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
- * time is used for the topic.
- */
- public static final Schema PRODUCE_RESPONSE_V2 = new Schema(new Field("responses",
- new ArrayOf(new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(new Schema(new Field("partition",
- INT32),
- new Field("error_code",
- INT16),
- new Field("base_offset",
- INT64),
- new Field("log_append_time",
- INT64,
- "The timestamp returned by broker after appending the messages. " +
- "If CreateTime is used for the topic, the timestamp will be -1. " +
- "If LogAppendTime is used for the topic, the timestamp will be " +
- "the broker local time when the messages are appended."))))))),
- newThrottleTimeField());
-
- public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
-
- /**
- * The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3.
- * The version number is bumped up to indicate that the client supports KafkaStorageException.
- * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
- */
- public static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
-
- public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, PRODUCE_REQUEST_V4};
- public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, PRODUCE_RESPONSE_V4};
-
- /* Offset commit api */
- public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Message offset to be committed."),
- new Field("metadata",
- NULLABLE_STRING,
- "Any associated metadata the client wants to keep."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Message offset to be committed."),
- new Field("timestamp",
- INT64,
- "Timestamp of the commit"),
- new Field("metadata",
- NULLABLE_STRING,
- "Any associated metadata the client wants to keep."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Message offset to be committed."),
- new Field("metadata",
- NULLABLE_STRING,
- "Any associated metadata the client wants to keep."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
- "Partitions to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
- STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
- "Partitions to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic",
- STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2),
- "Partitions to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The group id."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
- "Topics to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
- STRING,
- "The group id."),
- new Field("group_generation_id",
- INT32,
- "The generation of the group."),
- new Field("member_id",
- STRING,
- "The member id assigned by the group coordinator."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
- "Topics to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
- STRING,
- "The group id."),
- new Field("group_generation_id",
- INT32,
- "The generation of the consumer group."),
- new Field("member_id",
- STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("retention_time",
- INT64,
- "Time period in ms to retain the offset."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
- "Topics to commit offsets."));
-
- /* v3 request is same as v2. Throttle time has been added to response */
- public static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
-
- public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code",
- INT16));
-
- public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
-
- public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
- public static final Schema[] OFFSET_COMMIT_REQUEST = {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
-
- /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
- public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
- public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
-
- public static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
- newThrottleTimeField(),
- new Field("responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
- public static final Schema[] OFFSET_COMMIT_RESPONSE = {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
-
- /* Offset fetch api */
-
- /*
- * Wire formats of version 0 and 1 are the same, but with different functionality.
- * Wire format of version 2 is similar to version 1, with the exception of
- * - accepting 'null' as list of topics
- * - returning a top level error code
- * Version 0 will read the offsets from ZK.
- * Version 1 will read the offsets from Kafka.
- * Version 2 will read the offsets from Kafka, and returns all associated topic partition offsets if
- * a 'null' is passed instead of a list of specific topic partitions. It also returns a top level error code
- * for group or coordinator level errors.
- */
- public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."));
-
- public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to fetch offset."),
- new Field("partitions",
- new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch offsets."));
-
- public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("topics",
- new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch offsets."));
-
- public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Last committed message offset."),
- new Field("metadata",
- NULLABLE_STRING,
- "Any associated metadata the client wants to keep."),
- new Field("error_code", INT16));
-
- public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
-
- public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
-
- public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
- public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
-
- public static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("topics",
- ArrayOf.nullable(OFFSET_FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch offsets. If the topic array is null fetch offsets for all topics."));
-
- public static final Schema OFFSET_FETCH_RESPONSE_V2 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
- new Field("error_code",
- INT16));
-
- /* v3 request is the same as v2. Throttle time has been added to v3 response */
- public static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
- public static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema(
- newThrottleTimeField(),
- new Field("responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
- new Field("error_code",
- INT16));
-
- public static final Schema[] OFFSET_FETCH_REQUEST = {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
- public static final Schema[] OFFSET_FETCH_RESPONSE = {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
-
- /* List offset api */
- public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("timestamp", INT64, "Timestamp."),
- new Field("max_num_offsets",
- INT32,
- "Maximum offsets to return."));
- public static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("timestamp",
- INT64,
- "The target timestamp for the partition."));
-
- public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to list offset."),
- new Field("partitions",
- new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
- "Partitions to list offset."));
- public static final Schema LIST_OFFSET_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
- STRING,
- "Topic to list offset."),
- new Field("partitions",
- new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V1),
- "Partitions to list offset."));
-
- public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("topics",
- new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
- "Topics to list offsets."));
- public static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("topics",
- new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
- "Topics to list offsets."));
-
- public static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
- new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("isolation_level",
- INT8,
- "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
- "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
- "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
- "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
- "and enables the inclusion of the list of aborted transactions in the result, which allows " +
- "consumers to discard ABORTED transactional records"),
- new Field("topics",
- new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
- "Topics to list offsets."));;
-
- public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code", INT16),
- new Field("offsets",
- new ArrayOf(INT64),
- "A list of offsets."));
-
- public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V1 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code", INT16),
- new Field("timestamp",
- INT64,
- "The timestamp associated with the returned offset"),
- new Field("offset",
- INT64,
- "offset found"));
-
- public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
-
- public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V1 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V1)));
-
- public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
-
- public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new Field("responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
- public static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
- newThrottleTimeField(),
- new Field("responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
-
- public static final Schema[] LIST_OFFSET_REQUEST = {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
- public static final Schema[] LIST_OFFSET_RESPONSE = {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
-
- /* Fetch api */
- public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("fetch_offset",
- INT64,
- "Message offset."),
- new Field("max_bytes",
- INT32,
- "Maximum bytes to fetch."));
-
- // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed.
- public static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("fetch_offset",
- INT64,
- "Message offset."),
- new Field("log_start_offset",
- INT64,
- "Earliest available offset of the follower replica. " +
- "The field is only used when request is sent by follower. "),
- new Field("max_bytes",
- INT32,
- "Maximum bytes to fetch."));
-
- public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
- new Field("partitions",
- new ArrayOf(FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch."));
-
- public static final Schema FETCH_REQUEST_TOPIC_V5 = new Schema(new Field("topic", STRING, "Topic to fetch."),
- new Field("partitions",
- new ArrayOf(FETCH_REQUEST_PARTITION_V5),
- "Partitions to fetch."));
-
- public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("max_wait_time",
- INT32,
- "Maximum time in ms to wait for the response."),
- new Field("min_bytes",
- INT32,
- "Minimum bytes to accumulate in the response."),
- new Field("topics",
- new ArrayOf(FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch."));
-
- // The V1 Fetch Request body is the same as V0.
- // Only the version number is incremented to indicate a newer client
- public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
- // The V2 Fetch Request body is the same as V1.
- // Only the version number is incremented to indicate the client support message format V1 which uses
- // relative offset and has timestamp.
- public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
- // Fetch Request V3 added top level max_bytes field - the total size of partition data to accumulate in response.
- // The partition ordering is now relevant - partitions will be processed in order they appear in request.
- public static final Schema FETCH_REQUEST_V3 = new Schema(new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("max_wait_time",
- INT32,
- "Maximum time in ms to wait for the response."),
- new Field("min_bytes",
- INT32,
- "Minimum bytes to accumulate in the response."),
- new Field("max_bytes",
- INT32,
- "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
- "if the first message in the first non-empty partition of the fetch is larger than this " +
- "value, the message will still be returned to ensure that progress can be made."),
- new Field("topics",
- new ArrayOf(FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch in the order provided."));
-
- // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response).
- public static final Schema FETCH_REQUEST_V4 = new Schema(
- new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("max_wait_time",
- INT32,
- "Maximum time in ms to wait for the response."),
- new Field("min_bytes",
- INT32,
- "Minimum bytes to accumulate in the response."),
- new Field("max_bytes",
- INT32,
- "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
- "if the first message in the first non-empty partition of the fetch is larger than this " +
- "value, the message will still be returned to ensure that progress can be made."),
- new Field("isolation_level",
- INT8,
- "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
- "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
- "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
- "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
- "and enables the inclusion of the list of aborted transactions in the result, which allows " +
- "consumers to discard ABORTED transactional records"),
- new Field("topics",
- new ArrayOf(FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch in the order provided."));
-
- // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed.
- public static final Schema FETCH_REQUEST_V5 = new Schema(
- new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("max_wait_time",
- INT32,
- "Maximum time in ms to wait for the response."),
- new Field("min_bytes",
- INT32,
- "Minimum bytes to accumulate in the response."),
- new Field("max_bytes",
- INT32,
- "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
- "if the first message in the first non-empty partition of the fetch is larger than this " +
- "value, the message will still be returned to ensure that progress can be made."),
- new Field("isolation_level",
- INT8,
- "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
- "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
- "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
- "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
- "and enables the inclusion of the list of aborted transactions in the result, which allows " +
- "consumers to discard ABORTED transactional records"),
- new Field("topics",
- new ArrayOf(FETCH_REQUEST_TOPIC_V5),
- "Topics to fetch in the order provided."));
-
- /**
- * The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
- * The version number is bumped up to indicate that the client supports KafkaStorageException.
- * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
- */
- public static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5;
-
- public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code", INT16),
- new Field("high_watermark",
- INT64,
- "Last committed offset."));
- public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V0),
- new Field("record_set", RECORDS));
-
- public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
-
- public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
- public static final Schema FETCH_RESPONSE_V1 = new Schema(newThrottleTimeField(),
- new Field("responses",
- new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
- // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
- // record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1
- // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
- public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
- public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
-
- // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
- // last stable offset). It also exposes messages with magic v2 (along with older formats).
- private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
- new Field("producer_id", INT64, "The producer id associated with the aborted transactions"),
- new Field("first_offset", INT64, "The first offset in the aborted transaction"));
-
- public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
-
- public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
- new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code", INT16),
- new Field("high_watermark",
- INT64,
- "Last committed offset."),
- new Field("last_stable_offset",
- INT64,
- "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
- "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
- new Field("aborted_transactions",
- ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
-
- // FETCH_RESPONSE_PARTITION_HEADER_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
- public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema(
- new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code", INT16),
- new Field("high_watermark",
- INT64,
- "Last committed offset."),
- new Field("last_stable_offset",
- INT64,
- "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
- "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
- new Field("log_start_offset",
- INT64,
- "Earliest available offset."),
- new Field("aborted_transactions",
- ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V5)));
-
- public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
- new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
- new Field("record_set", RECORDS));
-
- public static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema(
- new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V5),
- new Field("record_set", RECORDS));
-
- public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
- new Field("topic", STRING),
- new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
-
- public static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema(
- new Field("topic", STRING),
- new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
-
- public static final Schema FETCH_RESPONSE_V4 = new Schema(
- newThrottleTimeField(),
- new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
-
- public static final Schema FETCH_RESPONSE_V5 = new Schema(
- newThrottleTimeField(),
- new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
-
- /**
- * The body of FETCH_RESPONSE_V6 is the same as FETCH_RESPONSE_V5.
- * The version number is bumped up to indicate that the client supports KafkaStorageException.
- * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
- */
- public static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
-
- public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5, FETCH_REQUEST_V6};
- public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6};
-
- /* List groups api */
- public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- public static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
-
- public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
- new Field("protocol_type", STRING));
- public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
- new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
- public static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
- newThrottleTimeField(),
- new Field("error_code", INT16),
- new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
-
- public static final Schema[] LIST_GROUPS_REQUEST = {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
- public static final Schema[] LIST_GROUPS_RESPONSE = {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
-
- /* Describe group api */
- public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
- new ArrayOf(STRING),
- "List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- public static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
-
- public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
- STRING,
- "The memberId assigned by the coordinator"),
- new Field("client_id",
- STRING,
- "The client id used in the member's latest join group request"),
- new Field("client_host",
- STRING,
- "The client host used in the request session corresponding to the member's join group."),
- new Field("member_metadata",
- BYTES,
- "The metadata corresponding to the current group protocol in use (will only be present if the group is stable)."),
- new Field("member_assignment",
- BYTES,
- "The current assignment provided by the group leader (will only be present if the group is stable)."));
-
- public static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(new Field("error_code", INT16),
- new Field("group_id",
- STRING),
- new Field("state",
- STRING,
- "The current state of the group (one of: Dead, Stable, AwaitingSync, PreparingRebalance, or empty if there is no active group)"),
- new Field("protocol_type",
- STRING,
- "The current group protocol type (will be empty if there is no active group)"),
- new Field("protocol",
- STRING,
- "The current group protocol (only provided if the group is Stable)"),
- new Field("members",
- new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0),
- "Current group members (only provided if the group is not Dead)"));
-
- public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
- public static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
- newThrottleTimeField(),
- new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
-
- public static final Schema[] DESCRIBE_GROUPS_REQUEST = {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
- public static final Schema[] DESCRIBE_GROUPS_RESPONSE = {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
-
- /* Find coordinator api */
- public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
- new Field("group_id",
- STRING,
- "The unique group id."));
-
- public static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
- new Field("coordinator_key",
- STRING,
- "Id to use for finding the coordinator (for groups, this is the groupId, " +
- "for transactional producers, this is the transactional id)"),
- new Field("coordinator_type",
- INT8,
- "The type of coordinator to find (0 = group, 1 = transaction)"));
-
- public static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
- new Field("node_id", INT32, "The broker id."),
- new Field("host", STRING, "The hostname of the broker."),
- new Field("port", INT32,
- "The port on which the broker accepts requests."));
-
- public static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
- new Field("error_code", INT16),
- new Field("coordinator",
- FIND_COORDINATOR_BROKER_V0,
- "Host and port information for the coordinator for a consumer group."));
-
- public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
- newThrottleTimeField(),
- new Field("error_code", INT16),
- new Field("error_message", NULLABLE_STRING),
- new Field("coordinator",
- FIND_COORDINATOR_BROKER_V0,
- "Host and port information for the coordinator for a consumer group."));
-
-
- public static final Schema[] FIND_COORDINATOR_REQUEST = {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
- public static final Schema[] FIND_COORDINATOR_RESPONSE = {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
-
- /* Controlled shutdown api */
- public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema(new Field("broker_id",
- INT32,
- "The id of the broker for which controlled shutdown has been requested."));
-
- public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema(new Field("topic", STRING),
- new Field("partition",
- INT32,
- "Topic partition id."));
-
- public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
- new Field("partitions_remaining",
- new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0),
- "The partitions that the broker still leads."));
-
- public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0;
- public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;
-
- public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1};
- public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1};
-
- /* Join group api */
- public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
- new Field("protocol_metadata", BYTES));
-
- public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The group id."),
- new Field("session_timeout",
- INT32,
- "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
- new Field("member_id",
- STRING,
- "The assigned consumer id or an empty string for a new consumer."),
- new Field("protocol_type",
- STRING,
- "Unique name for class of protocols implemented by group"),
- new Field("group_protocols",
- new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
- "List of protocols that the member supports"));
-
- public static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(new Field("group_id",
- STRING,
- "The group id."),
- new Field("session_timeout",
- INT32,
- "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
- new Field("rebalance_timeout",
- INT32,
- "The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group"),
- new Field("member_id",
- STRING,
- "The assigned consumer id or an empty string for a new consumer."),
- new Field("protocol_type",
- STRING,
- "Unique name for class of protocols implemented by group"),
- new Field("group_protocols",
- new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
- "List of protocols that the member supports"));
-
- /* v2 request is the same as v1. Throttle time has been added to response */
- public static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
-
- public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
- new Field("member_metadata", BYTES));
-
- public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
- new Field("generation_id",
- INT32,
- "The generation of the consumer group."),
- new Field("group_protocol",
- STRING,
- "The group protocol selected by the coordinator"),
- new Field("leader_id",
- STRING,
- "The leader of the group"),
- new Field("member_id",
- STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("members",
- new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
- public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
-
- public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
- newThrottleTimeField(),
- new Field("error_code", INT16),
- new Field("generation_id",
- INT32,
- "The generation of the consumer group."),
- new Field("group_protocol",
- STRING,
- "The group protocol selected by the coordinator"),
- new Field("leader_id",
- STRING,
- "The leader of the group"),
- new Field("member_id",
- STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("members",
- new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
-
- public static final Schema[] JOIN_GROUP_REQUEST = {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
- public static final Schema[] JOIN_GROUP_RESPONSE = {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
-
- /* SyncGroup api */
- public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
- new Field("member_assignment", BYTES));
- public static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING),
- new Field("generation_id", INT32),
- new Field("member_id", STRING),
- new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- public static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
-
- public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
- new Field("member_assignment", BYTES));
- public static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
- newThrottleTimeField(),
- new Field("error_code", INT16),
- new Field("member_assignment", BYTES));
- public static final Schema[] SYNC_GROUP_REQUEST = {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
- public static final Schema[] SYNC_GROUP_RESPONSE = {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
-
- /* Heartbeat api */
- public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
- new Field("group_generation_id",
- INT32,
- "The generation of the group."),
- new Field("member_id",
- STRING,
- "The member id assigned by the group coordinator."));
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- public static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
-
- public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
- public static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
- newThrottleTimeField(),
- new Field("error_code", INT16));
-
- public static final Schema[] HEARTBEAT_REQUEST = {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
- public static final Schema[] HEARTBEAT_RESPONSE = {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
-
- /* Leave group api */
- public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
- new Field("member_id",
- STRING,
- "The member id assigned by the group coordinator."));
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- public static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0;
-
- public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
- public static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema(
- newThrottleTimeField(),
- new Field("error_code", INT16));
-
- public static final Schema[] LEAVE_GROUP_REQUEST = {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
- public static final Schema[] LEAVE_GROUP_RESPONSE = {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
-
- /* Leader and ISR api */
- public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
- new Schema(new Field("topic", STRING, "Topic name."),
- new Field("partition", INT32, "Topic partition id."),
- new Field("controller_epoch", INT32, "The controller epoch."),
- new Field("leader", INT32, "The broker id for the leader."),
- new Field("leader_epoch", INT32, "The leader epoch."),
- new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
- new Field("zk_version", INT32, "T
<TRUNCATED>