You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/19 04:13:28 UTC

[7/8] kafka git commit: MINOR: Move request/response schemas to the corresponding object representation

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index c7431d0..b5042c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -17,2058 +17,19 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.BoundField;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.ResponseHeader;
 
-import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.INT8;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
-import static org.apache.kafka.common.protocol.types.Type.RECORDS;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 
 public class Protocol {
 
-    public static final Schema REQUEST_HEADER = new Schema(
-            new Field("api_key", INT16, "The id of the request type."),
-            new Field("api_version", INT16, "The version of the API."),
-            new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"),
-            new Field("client_id", NULLABLE_STRING, "A user specified identifier for the client making the request.", ""));
-
-    // Version 0 of the controlled shutdown API used a non-standard request header (the clientId is missing).
-    // This can be removed once we drop support for that version.
-    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0_HEADER = new Schema(
-            new Field("api_key", INT16, "The id of the request type."),
-            new Field("api_version", INT16, "The version of the API."),
-            new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"));
-
-
-    public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
-                                                                      INT32,
-                                                                      "The user-supplied value passed in with the request"));
-
-    /* Metadata api */
-
-    public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
-                                                                          new ArrayOf(STRING),
-                                                                          "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
-
-    public static final Schema METADATA_REQUEST_V1 = new Schema(new Field("topics",
-                                                                          ArrayOf.nullable(STRING),
-                                                                          "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."));
-
-    /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */
-    public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
-
-    /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
-    public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
-
-    /* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */
-    public static final Schema METADATA_REQUEST_V4 = new Schema(new Field("topics",
-                                                                          ArrayOf.nullable(STRING),
-                                                                         "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."),
-                                                                new Field("allow_auto_topic_creation",
-                                                                          BOOLEAN,
-                                                                          "If this and the broker config 'auto.create.topics.enable' are true, " +
-                                                                          "topics that don't exist will be created by the broker. " +
-                                                                          "Otherwise, no topics will be created by the broker."));
-
-    /* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */
-    public static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4;
-
-    public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
-                                                   new Field("host", STRING, "The hostname of the broker."),
-                                                   new Field("port", INT32,
-                                                             "The port on which the broker accepts requests."));
-
-    public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
-                                                                            INT16,
-                                                                            "The error code for the partition, if any."),
-                                                                  new Field("partition_id",
-                                                                            INT32,
-                                                                            "The id of the partition."),
-                                                                  new Field("leader",
-                                                                            INT32,
-                                                                            "The id of the broker acting as leader for this partition."),
-                                                                  new Field("replicas",
-                                                                            new ArrayOf(INT32),
-                                                                            "The set of all nodes that host this partition."),
-                                                                  new Field("isr",
-                                                                            new ArrayOf(INT32),
-                                                                            "The set of nodes that are in sync with the leader for this partition."));
-
-    public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
-                                                                        INT16,
-                                                                        "The error code for the given topic."),
-                                                              new Field("topic", STRING, "The name of the topic"),
-                                                              new Field("partition_metadata",
-                                                                        new ArrayOf(PARTITION_METADATA_V0),
-                                                                        "Metadata for each partition of the topic."));
-
-    public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
-                                                                           new ArrayOf(METADATA_BROKER_V0),
-                                                                           "Host and port information for all brokers."),
-                                                                 new Field("topic_metadata",
-                                                                           new ArrayOf(TOPIC_METADATA_V0)));
-
-    public static final Schema METADATA_BROKER_V1 = new Schema(new Field("node_id", INT32, "The broker id."),
-                                                      new Field("host", STRING, "The hostname of the broker."),
-                                                      new Field("port", INT32,
-                                                        "The port on which the broker accepts requests."),
-                                                      new Field("rack", NULLABLE_STRING, "The rack of the broker."));
-
-    public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0;
-
-    // PARTITION_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
-    public static final Schema PARTITION_METADATA_V2 = new Schema(new Field("partition_error_code",
-                                                                            INT16,
-                                                                            "The error code for the partition, if any."),
-                                                                  new Field("partition_id",
-                                                                            INT32,
-                                                                            "The id of the partition."),
-                                                                  new Field("leader",
-                                                                            INT32,
-                                                                            "The id of the broker acting as leader for this partition."),
-                                                                  new Field("replicas",
-                                                                            new ArrayOf(INT32),
-                                                                            "The set of all nodes that host this partition."),
-                                                                  new Field("isr",
-                                                                            new ArrayOf(INT32),
-                                                                            "The set of nodes that are in sync with the leader for this partition."),
-                                                                  new Field("offline_replicas",
-                                                                            new ArrayOf(INT32),
-                                                                            "The set of offline replicas of this partition."));
-
-    public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
-                                                              new Field("topic", STRING, "The name of the topic"),
-                                                              new Field("is_internal", BOOLEAN,
-                                                                  "Indicates if the topic is considered a Kafka internal topic"),
-                                                              new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1),
-                                                                  "Metadata for each partition of the topic."));
-
-    // TOPIC_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
-    public static final Schema TOPIC_METADATA_V2 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
-                                                              new Field("topic", STRING, "The name of the topic"),
-                                                              new Field("is_internal", BOOLEAN,
-                                                                  "Indicates if the topic is considered a Kafka internal topic"),
-                                                              new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V2),
-                                                                  "Metadata for each partition of the topic."));
-
-    public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
-                                                                    "Host and port information for all brokers."),
-                                                                 new Field("controller_id", INT32,
-                                                                     "The broker id of the controller broker."),
-                                                                 new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
-
-    public static final Schema METADATA_RESPONSE_V2 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
-                                                                    "Host and port information for all brokers."),
-                                                                 new Field("cluster_id", NULLABLE_STRING,
-                                                                     "The cluster id that this broker belongs to."),
-                                                                 new Field("controller_id", INT32,
-                                                                     "The broker id of the controller broker."),
-                                                                 new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
-
-    public static final Schema METADATA_RESPONSE_V3 = new Schema(
-         newThrottleTimeField(),
-         new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
-            "Host and port information for all brokers."),
-         new Field("cluster_id", NULLABLE_STRING,
-             "The cluster id that this broker belongs to."),
-         new Field("controller_id", INT32,
-             "The broker id of the controller broker."),
-         new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
-
-    public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;
-
-    // METADATA_RESPONSE_V5 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
-    public static final Schema METADATA_RESPONSE_V5 = new Schema(
-        newThrottleTimeField(),
-        new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
-            "Host and port information for all brokers."),
-        new Field("cluster_id", NULLABLE_STRING,
-            "The cluster id that this broker belongs to."),
-        new Field("controller_id", INT32,
-            "The broker id of the controller broker."),
-        new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V2)));
-
-    public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4, METADATA_REQUEST_V5};
-    public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4, METADATA_RESPONSE_V5};
-
-    /* Produce api */
-
-    public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
-                                                                  new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
-                                                                                                     new Field("record_set", RECORDS)))));
-
-    public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
-                                                                   INT16,
-                                                                   "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR."),
-                                                               new Field("timeout", INT32, "The time to await a response in ms."),
-                                                               new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
-    public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                    new ArrayOf(new Schema(new Field("topic", STRING),
-                                                                                           new Field("partition_responses",
-                                                                                                     new ArrayOf(new Schema(new Field("partition",
-                                                                                                                                      INT32),
-                                                                                                                            new Field("error_code",
-                                                                                                                                      INT16),
-                                                                                                                            new Field("base_offset",
-                                                                                                                                      INT64))))))));
-    /**
-     * The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0.
-     * The version number is bumped up to indicate that the client supports quota throttle time field in the response.
-     */
-    public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;
-    /**
-     * The body of PRODUCE_REQUEST_V2 is the same as PRODUCE_REQUEST_V1.
-     * The version number is bumped up to indicate that message format V1 is used which has relative offset and
-     * timestamp.
-     */
-    public static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
-
-    // Produce request V3 adds the transactional id which is used for authorization when attempting to write
-    // transactional data. This version also adds support for message format V2.
-    public static final Schema PRODUCE_REQUEST_V3 = new Schema(
-            new Field("transactional_id",
-                    NULLABLE_STRING,
-                    "The transactional ID of the producer. This is used to authorize transaction produce requests. " +
-                    "This can be null for non-transactional producers."),
-            new Field("acks",
-                    INT16,
-                    "The number of acknowledgments the producer requires the leader to have received before " +
-                    "considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader " +
-                    "and -1 for the full ISR."),
-            new Field("timeout", INT32, "The time to await a response in ms."),
-            new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
-    /**
-     * The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3.
-     * The version number is bumped up to indicate that the client supports KafkaStorageException.
-     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
-     */
-    public static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
-
-    public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
-                                                                          new ArrayOf(new Schema(new Field("topic", STRING),
-                                                                                                 new Field("partition_responses",
-                                                                                                           new ArrayOf(new Schema(new Field("partition",
-                                                                                                                                            INT32),
-                                                                                                                                  new Field("error_code",
-                                                                                                                                            INT16),
-                                                                                                                                  new Field("base_offset",
-                                                                                                                                            INT64))))))),
-                                                                newThrottleTimeField());
-    /**
-     * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
-     * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
-     * time is used for the topic.
-     */
-    public static final Schema PRODUCE_RESPONSE_V2 = new Schema(new Field("responses",
-                                                                new ArrayOf(new Schema(new Field("topic", STRING),
-                                                                                       new Field("partition_responses",
-                                                                                       new ArrayOf(new Schema(new Field("partition",
-                                                                                                                        INT32),
-                                                                                                              new Field("error_code",
-                                                                                                                        INT16),
-                                                                                                              new Field("base_offset",
-                                                                                                                        INT64),
-                                                                                                              new Field("log_append_time",
-                                                                                                                        INT64,
-                                                                                                                        "The timestamp returned by broker after appending the messages. " +
-                                                                                                                            "If CreateTime is used for the topic, the timestamp will be -1. " +
-                                                                                                                            "If LogAppendTime is used for the topic, the timestamp will be " +
-                                                                                                                            "the broker local time when the messages are appended."))))))),
-                                                                newThrottleTimeField());
-
-    public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
-
-    /**
-     * The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3.
-     * The version number is bumped up to indicate that the client supports KafkaStorageException.
-     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
-     */
-    public static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
-
-    public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, PRODUCE_REQUEST_V4};
-    public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, PRODUCE_RESPONSE_V4};
-
-    /* Offset commit api */
-    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Message offset to be committed."),
-                                                                               new Field("metadata",
-                                                                                         NULLABLE_STRING,
-                                                                                         "Any associated metadata the client wants to keep."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Message offset to be committed."),
-                                                                               new Field("timestamp",
-                                                                                         INT64,
-                                                                                         "Timestamp of the commit"),
-                                                                               new Field("metadata",
-                                                                                         NULLABLE_STRING,
-                                                                                         "Any associated metadata the client wants to keep."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Message offset to be committed."),
-                                                                               new Field("metadata",
-                                                                                         NULLABLE_STRING,
-                                                                                         "Any associated metadata the client wants to keep."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                                     STRING,
-                                                                                     "Topic to commit."),
-                                                                           new Field("partitions",
-                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
-                                                                                     "Partitions to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
-                                                                                     STRING,
-                                                                                     "Topic to commit."),
-                                                                           new Field("partitions",
-                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
-                                                                                     "Partitions to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic",
-                                                                                     STRING,
-                                                                                     "Topic to commit."),
-                                                                           new Field("partitions",
-                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2),
-                                                                                     "Partitions to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                               STRING,
-                                                                               "The group id."),
-                                                                     new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
-                                                                               "Topics to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
-                                                                               STRING,
-                                                                               "The group id."),
-                                                                     new Field("group_generation_id",
-                                                                               INT32,
-                                                                               "The generation of the group."),
-                                                                     new Field("member_id",
-                                                                               STRING,
-                                                                               "The member id assigned by the group coordinator."),
-                                                                     new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
-                                                                               "Topics to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
-                                                                               STRING,
-                                                                               "The group id."),
-                                                                     new Field("group_generation_id",
-                                                                               INT32,
-                                                                               "The generation of the consumer group."),
-                                                                     new Field("member_id",
-                                                                               STRING,
-                                                                               "The consumer id assigned by the group coordinator."),
-                                                                     new Field("retention_time",
-                                                                               INT64,
-                                                                               "Time period in ms to retain the offset."),
-                                                                     new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
-                                                                               "Topics to commit offsets."));
-
-    /* v3 request is same as v2. Throttle time has been added to response */
-    public static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
-
-    public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                          INT32,
-                                                                                          "Topic partition id."),
-                                                                                new Field("error_code",
-                                                                                          INT16));
-
-    public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                            new Field("partition_responses",
-                                                                                      new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
-
-    public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                                new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
-    public static final Schema[] OFFSET_COMMIT_REQUEST = {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
-
-    /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
-    public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
-    public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
-
-    public static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
-            newThrottleTimeField(),
-            new Field("responses",
-                       new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
-    public static final Schema[] OFFSET_COMMIT_RESPONSE = {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
-
-    /* Offset fetch api */
-
-    /*
-     * Wire formats of version 0 and 1 are the same, but with different functionality.
-     * Wire format of version 2 is similar to version 1, with the exception of
-     * - accepting 'null' as list of topics
-     * - returning a top level error code
-     * Version 0 will read the offsets from ZK.
-     * Version 1 will read the offsets from Kafka.
-     * Version 2 will read the offsets from Kafka, and returns all associated topic partition offsets if
-     * a 'null' is passed instead of a list of specific topic partitions. It also returns a top level error code
-     * for group or coordinator level errors.
-     */
-    public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                        INT32,
-                                                                                        "Topic partition id."));
-
-    public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                                    STRING,
-                                                                                    "Topic to fetch offset."),
-                                                                          new Field("partitions",
-                                                                                    new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
-                                                                                    "Partitions to fetch offsets."));
-
-    public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                              STRING,
-                                                                              "The consumer group id."),
-                                                                    new Field("topics",
-                                                                              new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
-                                                                              "Topics to fetch offsets."));
-
-    public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Last committed message offset."),
-                                                                               new Field("metadata",
-                                                                                         NULLABLE_STRING,
-                                                                                         "Any associated metadata the client wants to keep."),
-                                                                               new Field("error_code", INT16));
-
-    public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                           new Field("partition_responses",
-                                                                                     new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
-
-    public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
-
-    public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
-    public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
-
-    public static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema(new Field("group_id",
-                                                                              STRING,
-                                                                              "The consumer group id."),
-                                                                              new Field("topics",
-                                                                                        ArrayOf.nullable(OFFSET_FETCH_REQUEST_TOPIC_V0),
-                                                                                        "Topics to fetch offsets. If the topic array is null fetch offsets for all topics."));
-
-    public static final Schema OFFSET_FETCH_RESPONSE_V2 = new Schema(new Field("responses",
-                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
-                                                                     new Field("error_code",
-                                                                               INT16));
-
-    /* v3 request is the same as v2. Throttle time has been added to v3 response */
-    public static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2;
-    public static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema(
-            newThrottleTimeField(),
-            new Field("responses",
-                    new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
-            new Field("error_code",
-                    INT16));
-
-    public static final Schema[] OFFSET_FETCH_REQUEST = {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
-    public static final Schema[] OFFSET_FETCH_RESPONSE = {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
-
-    /* List offset api */
-    public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                       INT32,
-                                                                                       "Topic partition id."),
-                                                                             new Field("timestamp", INT64, "Timestamp."),
-                                                                             new Field("max_num_offsets",
-                                                                                       INT32,
-                                                                                       "Maximum offsets to return."));
-    public static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
-                                                                                       INT32,
-                                                                                       "Topic partition id."),
-                                                                             new Field("timestamp",
-                                                                                       INT64,
-                                                                                       "The target timestamp for the partition."));
-
-    public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                                   STRING,
-                                                                                   "Topic to list offset."),
-                                                                         new Field("partitions",
-                                                                                   new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
-                                                                                   "Partitions to list offset."));
-    public static final Schema LIST_OFFSET_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
-                                                                                   STRING,
-                                                                                   "Topic to list offset."),
-                                                                         new Field("partitions",
-                                                                                   new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V1),
-                                                                                   "Partitions to list offset."));
-
-    public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
-                                                                             INT32,
-                                                                             "Broker id of the follower. For normal consumers, use -1."),
-                                                                   new Field("topics",
-                                                                             new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
-                                                                             "Topics to list offsets."));
-    public static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(new Field("replica_id",
-                                                                             INT32,
-                                                                             "Broker id of the follower. For normal consumers, use -1."),
-                                                                   new Field("topics",
-                                                                             new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
-                                                                             "Topics to list offsets."));
-
-    public static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
-            new Field("replica_id",
-                    INT32,
-                    "Broker id of the follower. For normal consumers, use -1."),
-            new Field("isolation_level",
-                    INT8,
-                    "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
-                            "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
-                            "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
-                            "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
-                            "and enables the inclusion of the list of aborted transactions in the result, which allows " +
-                            "consumers to discard ABORTED transactional records"),
-            new Field("topics",
-                    new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
-                    "Topics to list offsets."));;
-
-    public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                        INT32,
-                                                                                        "Topic partition id."),
-                                                                              new Field("error_code", INT16),
-                                                                              new Field("offsets",
-                                                                                        new ArrayOf(INT64),
-                                                                                        "A list of offsets."));
-
-    public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V1 = new Schema(new Field("partition",
-                                                                                        INT32,
-                                                                                        "Topic partition id."),
-                                                                              new Field("error_code", INT16),
-                                                                              new Field("timestamp",
-                                                                                        INT64,
-                                                                                        "The timestamp associated with the returned offset"),
-                                                                              new Field("offset",
-                                                                                        INT64,
-                                                                                        "offset found"));
-
-    public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                          new Field("partition_responses",
-                                                                                    new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
-
-    public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V1 = new Schema(new Field("topic", STRING),
-                                                                          new Field("partition_responses",
-                                                                                    new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V1)));
-
-    public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                              new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
-
-    public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new Field("responses",
-                                                                              new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
-    public static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
-            newThrottleTimeField(),
-            new Field("responses",
-                    new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
-
-    public static final Schema[] LIST_OFFSET_REQUEST = {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
-    public static final Schema[] LIST_OFFSET_RESPONSE = {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
-
-    /* Fetch api */
-    public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                 INT32,
-                                                                                 "Topic partition id."),
-                                                                       new Field("fetch_offset",
-                                                                                 INT64,
-                                                                                 "Message offset."),
-                                                                       new Field("max_bytes",
-                                                                                 INT32,
-                                                                                 "Maximum bytes to fetch."));
-
-    // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed.
-    public static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(new Field("partition",
-                                                                                 INT32,
-                                                                                 "Topic partition id."),
-                                                                       new Field("fetch_offset",
-                                                                                 INT64,
-                                                                                 "Message offset."),
-                                                                       new Field("log_start_offset",
-                                                                                 INT64,
-                                                                                 "Earliest available offset of the follower replica. " +
-                                                                                 "The field is only used when request is sent by follower. "),
-                                                                       new Field("max_bytes",
-                                                                                 INT32,
-                                                                                 "Maximum bytes to fetch."));
-
-    public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
-                                                                   new Field("partitions",
-                                                                             new ArrayOf(FETCH_REQUEST_PARTITION_V0),
-                                                                             "Partitions to fetch."));
-
-    public static final Schema FETCH_REQUEST_TOPIC_V5 = new Schema(new Field("topic", STRING, "Topic to fetch."),
-                                                                   new Field("partitions",
-                                                                             new ArrayOf(FETCH_REQUEST_PARTITION_V5),
-                                                                             "Partitions to fetch."));
-
-    public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
-                                                                       INT32,
-                                                                       "Broker id of the follower. For normal consumers, use -1."),
-                                                             new Field("max_wait_time",
-                                                                       INT32,
-                                                                       "Maximum time in ms to wait for the response."),
-                                                             new Field("min_bytes",
-                                                                       INT32,
-                                                                       "Minimum bytes to accumulate in the response."),
-                                                             new Field("topics",
-                                                                       new ArrayOf(FETCH_REQUEST_TOPIC_V0),
-                                                                       "Topics to fetch."));
-
-    // The V1 Fetch Request body is the same as V0.
-    // Only the version number is incremented to indicate a newer client
-    public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
-    // The V2 Fetch Request body is the same as V1.
-    // Only the version number is incremented to indicate the client support message format V1 which uses
-    // relative offset and has timestamp.
-    public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
-    // Fetch Request V3 added top level max_bytes field - the total size of partition data to accumulate in response.
-    // The partition ordering is now relevant - partitions will be processed in order they appear in request.
-    public static final Schema FETCH_REQUEST_V3 = new Schema(new Field("replica_id",
-                                                                       INT32,
-                                                                       "Broker id of the follower. For normal consumers, use -1."),
-                                                             new Field("max_wait_time",
-                                                                       INT32,
-                                                                       "Maximum time in ms to wait for the response."),
-                                                             new Field("min_bytes",
-                                                                       INT32,
-                                                                       "Minimum bytes to accumulate in the response."),
-                                                             new Field("max_bytes",
-                                                                       INT32,
-                                                                       "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
-                                                                       "if the first message in the first non-empty partition of the fetch is larger than this " +
-                                                                       "value, the message will still be returned to ensure that progress can be made."),
-                                                             new Field("topics",
-                                                                       new ArrayOf(FETCH_REQUEST_TOPIC_V0),
-                                                                       "Topics to fetch in the order provided."));
-
-    // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response).
-    public static final Schema FETCH_REQUEST_V4 = new Schema(
-            new Field("replica_id",
-                    INT32,
-                    "Broker id of the follower. For normal consumers, use -1."),
-            new Field("max_wait_time",
-                    INT32,
-                    "Maximum time in ms to wait for the response."),
-            new Field("min_bytes",
-                    INT32,
-                    "Minimum bytes to accumulate in the response."),
-            new Field("max_bytes",
-                    INT32,
-                    "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
-                    "if the first message in the first non-empty partition of the fetch is larger than this " +
-                    "value, the message will still be returned to ensure that progress can be made."),
-            new Field("isolation_level",
-                    INT8,
-                    "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
-                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
-                     "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
-                     "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
-                     "and enables the inclusion of the list of aborted transactions in the result, which allows " +
-                     "consumers to discard ABORTED transactional records"),
-            new Field("topics",
-                    new ArrayOf(FETCH_REQUEST_TOPIC_V0),
-                    "Topics to fetch in the order provided."));
-
-    // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed.
-    public static final Schema FETCH_REQUEST_V5 = new Schema(
-            new Field("replica_id",
-                    INT32,
-                    "Broker id of the follower. For normal consumers, use -1."),
-            new Field("max_wait_time",
-                    INT32,
-                    "Maximum time in ms to wait for the response."),
-            new Field("min_bytes",
-                    INT32,
-                    "Minimum bytes to accumulate in the response."),
-            new Field("max_bytes",
-                    INT32,
-                    "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
-                    "if the first message in the first non-empty partition of the fetch is larger than this " +
-                    "value, the message will still be returned to ensure that progress can be made."),
-            new Field("isolation_level",
-                    INT8,
-                    "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
-                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
-                     "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
-                     "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
-                     "and enables the inclusion of the list of aborted transactions in the result, which allows " +
-                     "consumers to discard ABORTED transactional records"),
-            new Field("topics",
-                    new ArrayOf(FETCH_REQUEST_TOPIC_V5),
-                    "Topics to fetch in the order provided."));
-
-    /**
-     * The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
-     * The version number is bumped up to indicate that the client supports KafkaStorageException.
-     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
-     */
-    public static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5;
-
-    public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("error_code", INT16),
-                                                                               new Field("high_watermark",
-                                                                                         INT64,
-                                                                                         "Last committed offset."));
-    public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V0),
-                                                                        new Field("record_set", RECORDS));
-
-    public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                    new Field("partition_responses",
-                                                                              new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
-
-    public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                        new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
-    public static final Schema FETCH_RESPONSE_V1 = new Schema(newThrottleTimeField(),
-                                                              new Field("responses",
-                                                                      new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-    // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
-    // record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1
-    // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
-    public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
-    public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
-
-    // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
-    // last stable offset). It also exposes messages with magic v2 (along with older formats).
-    private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
-            new Field("producer_id", INT64, "The producer id associated with the aborted transactions"),
-            new Field("first_offset", INT64, "The first offset in the aborted transaction"));
-
-    public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
-
-    public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
-            new Field("partition",
-                    INT32,
-                    "Topic partition id."),
-            new Field("error_code", INT16),
-            new Field("high_watermark",
-                    INT64,
-                    "Last committed offset."),
-            new Field("last_stable_offset",
-                    INT64,
-                    "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
-                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
-            new Field("aborted_transactions",
-                    ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
-
-    // FETCH_RESPONSE_PARTITION_HEADER_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
-    public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema(
-            new Field("partition",
-                    INT32,
-                    "Topic partition id."),
-            new Field("error_code", INT16),
-            new Field("high_watermark",
-                    INT64,
-                    "Last committed offset."),
-            new Field("last_stable_offset",
-                    INT64,
-                    "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
-                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
-            new Field("log_start_offset",
-                    INT64,
-                    "Earliest available offset."),
-            new Field("aborted_transactions",
-                    ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V5)));
-
-    public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
-            new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
-            new Field("record_set", RECORDS));
-
-    public static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema(
-            new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V5),
-            new Field("record_set", RECORDS));
-
-    public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
-            new Field("topic", STRING),
-            new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
-
-    public static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema(
-            new Field("topic", STRING),
-            new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
-
-    public static final Schema FETCH_RESPONSE_V4 = new Schema(
-            newThrottleTimeField(),
-            new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
-
-    public static final Schema FETCH_RESPONSE_V5 = new Schema(
-            newThrottleTimeField(),
-            new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
-
-    /**
-     * The body of FETCH_RESPONSE_V6 is the same as FETCH_RESPONSE_V5.
-     * The version number is bumped up to indicate that the client supports KafkaStorageException.
-     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
-     */
-    public static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
-
-    public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5, FETCH_REQUEST_V6};
-    public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6};
-
-    /* List groups api */
-    public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    public static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
-
-    public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
-                                                                          new Field("protocol_type", STRING));
-    public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                    new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
-    public static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
-            newThrottleTimeField(),
-            new Field("error_code", INT16),
-            new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
-
-    public static final Schema[] LIST_GROUPS_REQUEST = {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
-    public static final Schema[] LIST_GROUPS_RESPONSE = {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
-
-    /* Describe group api */
-    public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
-                                                                                 new ArrayOf(STRING),
-                                                                                 "List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    public static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
-
-    public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
-                                                                                         STRING,
-                                                                                         "The memberId assigned by the coordinator"),
-                                                                               new Field("client_id",
-                                                                                         STRING,
-                                                                                         "The client id used in the member's latest join group request"),
-                                                                               new Field("client_host",
-                                                                                         STRING,
-                                                                                         "The client host used in the request session corresponding to the member's join group."),
-                                                                               new Field("member_metadata",
-                                                                                         BYTES,
-                                                                                         "The metadata corresponding to the current group protocol in use (will only be present if the group is stable)."),
-                                                                               new Field("member_assignment",
-                                                                                         BYTES,
-                                                                                         "The current assignment provided by the group leader (will only be present if the group is stable)."));
-
-    public static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(new Field("error_code", INT16),
-                                                                                       new Field("group_id",
-                                                                                                 STRING),
-                                                                                       new Field("state",
-                                                                                                 STRING,
-                                                                                                 "The current state of the group (one of: Dead, Stable, AwaitingSync, PreparingRebalance, or empty if there is no active group)"),
-                                                                                       new Field("protocol_type",
-                                                                                                 STRING,
-                                                                                                 "The current group protocol type (will be empty if there is no active group)"),
-                                                                                       new Field("protocol",
-                                                                                                 STRING,
-                                                                                                 "The current group protocol (only provided if the group is Stable)"),
-                                                                                       new Field("members",
-                                                                                                 new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0),
-                                                                                                 "Current group members (only provided if the group is not Dead)"));
-
-    public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
-    public static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
-            newThrottleTimeField(),
-            new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
-
-    public static final Schema[] DESCRIBE_GROUPS_REQUEST = {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
-    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
-
-    /* Find coordinator api */
-    public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
-            new Field("group_id",
-                    STRING,
-                    "The unique group id."));
-
-    public static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
-            new Field("coordinator_key",
-                    STRING,
-                    "Id to use for finding the coordinator (for groups, this is the groupId, " +
-                            "for transactional producers, this is the transactional id)"),
-            new Field("coordinator_type",
-                    INT8,
-                    "The type of coordinator to find (0 = group, 1 = transaction)"));
-
-    public static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
-            new Field("node_id", INT32, "The broker id."),
-            new Field("host", STRING, "The hostname of the broker."),
-            new Field("port", INT32,
-                    "The port on which the broker accepts requests."));
-
-    public static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
-            new Field("error_code", INT16),
-            new Field("coordinator",
-                    FIND_COORDINATOR_BROKER_V0,
-                    "Host and port information for the coordinator for a consumer group."));
-
-    public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
-            newThrottleTimeField(),
-            new Field("error_code", INT16),
-            new Field("error_message", NULLABLE_STRING),
-            new Field("coordinator",
-                    FIND_COORDINATOR_BROKER_V0,
-                    "Host and port information for the coordinator for a consumer group."));
-
-
-    public static final Schema[] FIND_COORDINATOR_REQUEST = {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
-    public static final Schema[] FIND_COORDINATOR_RESPONSE = {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
-
-    /* Controlled shutdown api */
-    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema(new Field("broker_id",
-                                                                                     INT32,
-                                                                                     "The id of the broker for which controlled shutdown has been requested."));
-
-    public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema(new Field("topic", STRING),
-                                                                             new Field("partition",
-                                                                                       INT32,
-                                                                                       "Topic partition id."));
-
-    public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                            new Field("partitions_remaining",
-                                                                                      new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0),
-                                                                                      "The partitions that the broker still leads."));
-
-    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0;
-    public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;
-
-    public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1};
-    public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1};
-
-    /* Join group api */
-    public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
-                                                                           new Field("protocol_metadata", BYTES));
-
-    public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                            STRING,
-                                                                            "The group id."),
-                                                                  new Field("session_timeout",
-                                                                            INT32,
-                                                                            "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
-                                                                  new Field("member_id",
-                                                                            STRING,
-                                                                            "The assigned consumer id or an empty string for a new consumer."),
-                                                                  new Field("protocol_type",
-                                                                            STRING,
-                                                                            "Unique name for class of protocols implemented by group"),
-                                                                  new Field("group_protocols",
-                                                                            new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
-                                                                            "List of protocols that the member supports"));
-
-    public static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(new Field("group_id",
-                                                                            STRING,
-                                                                            "The group id."),
-                                                                  new Field("session_timeout",
-                                                                            INT32,
-                                                                            "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
-                                                                  new Field("rebalance_timeout",
-                                                                            INT32,
-                                                                            "The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group"),
-                                                                  new Field("member_id",
-                                                                            STRING,
-                                                                            "The assigned consumer id or an empty string for a new consumer."),
-                                                                  new Field("protocol_type",
-                                                                            STRING,
-                                                                            "Unique name for class of protocols implemented by group"),
-                                                                  new Field("group_protocols",
-                                                                            new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
-                                                                            "List of protocols that the member supports"));
-
-    /* v2 request is the same as v1. Throttle time has been added to response */
-    public static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
-
-    public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
-                                                                          new Field("member_metadata", BYTES));
-
-    public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                   new Field("generation_id",
-                                                                             INT32,
-                                                                             "The generation of the consumer group."),
-                                                                   new Field("group_protocol",
-                                                                             STRING,
-                                                                             "The group protocol selected by the coordinator"),
-                                                                   new Field("leader_id",
-                                                                             STRING,
-                                                                             "The leader of the group"),
-                                                                   new Field("member_id",
-                                                                             STRING,
-                                                                             "The consumer id assigned by the group coordinator."),
-                                                                   new Field("members",
-                                                                             new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
-    public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
-
-    public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
-            newThrottleTimeField(),
-            new Field("error_code", INT16),
-            new Field("generation_id",
-                      INT32,
-                      "The generation of the consumer group."),
-            new Field("group_protocol",
-                      STRING,
-                      "The group protocol selected by the coordinator"),
-            new Field("leader_id",
-                      STRING,
-                      "The leader of the group"),
-            new Field("member_id",
-                      STRING,
-                      "The consumer id assigned by the group coordinator."),
-            new Field("members",
-                      new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
-
-    public static final Schema[] JOIN_GROUP_REQUEST = {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
-    public static final Schema[] JOIN_GROUP_RESPONSE = {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
-
-    /* SyncGroup api */
-    public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
-                                                                         new Field("member_assignment", BYTES));
-    public static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING),
-                                                                  new Field("generation_id", INT32),
-                                                                  new Field("member_id", STRING),
-                                                                  new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    public static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
-
-    public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                   new Field("member_assignment", BYTES));
-    public static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
-            newThrottleTimeField(),
-            new Field("error_code", INT16),
-            new Field("member_assignment", BYTES));
-    public static final Schema[] SYNC_GROUP_REQUEST = {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
-    public static final Schema[] SYNC_GROUP_RESPONSE = {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
-
-    /* Heartbeat api */
-    public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
-                                                                 new Field("group_generation_id",
-                                                                           INT32,
-                                                                           "The generation of the group."),
-                                                                 new Field("member_id",
-                                                                           STRING,
-                                                                           "The member id assigned by the group coordinator."));
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    public static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
-
-    public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
-    public static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
-            newThrottleTimeField(),
-            new Field("error_code", INT16));
-
-    public static final Schema[] HEARTBEAT_REQUEST = {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
-    public static final Schema[] HEARTBEAT_RESPONSE = {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
-
-    /* Leave group api */
-    public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
-                                                                   new Field("member_id",
-                                                                             STRING,
-                                                                             "The member id assigned by the group coordinator."));
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    public static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0;
-
-    public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
-    public static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema(
-            newThrottleTimeField(),
-            new Field("error_code", INT16));
-
-    public static final Schema[] LEAVE_GROUP_REQUEST = {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
-    public static final Schema[] LEAVE_GROUP_RESPONSE = {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
-
-    /* Leader and ISR api */
-    public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
-            new Schema(new Field("topic", STRING, "Topic name."),
-                       new Field("partition", INT32, "Topic partition id."),
-                       new Field("controller_epoch", INT32, "The controller epoch."),
-                       new Field("leader", INT32, "The broker id for the leader."),
-                       new Field("leader_epoch", INT32, "The leader epoch."),
-                       new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
-                       new Field("zk_version", INT32, "T

<TRUNCATED>