You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 05:51:24 UTC

[3/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)

KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #3076 from ijuma/kafka-3267-describe-alter-configs-protocol


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/972b7545
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/972b7545
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/972b7545

Branch: refs/heads/trunk
Commit: 972b7545363ae85a55f94cf7ea83614be8840b75
Parents: e1abf17
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu May 18 06:51:02 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 06:51:02 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   4 +-
 .../kafka/clients/admin/AclOperation.java       |  12 +-
 .../apache/kafka/clients/admin/AdminClient.java |  60 +++++
 .../clients/admin/AlterConfigsOptions.java      |  45 ++++
 .../clients/admin/AlterConfigsResults.java      |  42 ++++
 .../org/apache/kafka/clients/admin/Config.java  |  63 ++++++
 .../apache/kafka/clients/admin/ConfigEntry.java |  59 +++++
 .../kafka/clients/admin/ConfigResource.java     |  65 ++++++
 .../clients/admin/DescribeConfigsOptions.java   |  37 +++
 .../clients/admin/DescribeConfigsResults.java   |  59 +++++
 .../kafka/clients/admin/KafkaAdminClient.java   | 184 ++++++++++++++-
 .../kafka/clients/admin/ResourceType.java       |   7 +-
 .../kafka/common/config/AbstractConfig.java     |   7 +
 .../errors/BrokerAuthorizationException.java    |  23 ++
 .../apache/kafka/common/protocol/ApiKeys.java   |   4 +-
 .../apache/kafka/common/protocol/Errors.java    |  20 +-
 .../apache/kafka/common/protocol/Protocol.java  | 182 ++++++++++-----
 .../kafka/common/protocol/types/Schema.java     |  16 +-
 .../kafka/common/requests/AbstractRequest.java  |   6 +
 .../kafka/common/requests/AbstractResponse.java |   4 +
 .../common/requests/AlterConfigsRequest.java    | 179 +++++++++++++++
 .../common/requests/AlterConfigsResponse.java   |  88 ++++++++
 .../apache/kafka/common/requests/ApiError.java  |  99 ++++++++
 .../common/requests/CreateTopicsRequest.java    |  12 +-
 .../common/requests/CreateTopicsResponse.java   |  68 +-----
 .../common/requests/DescribeConfigsRequest.java | 142 ++++++++++++
 .../requests/DescribeConfigsResponse.java       | 186 +++++++++++++++
 .../apache/kafka/common/requests/Resource.java  |  60 +++++
 .../kafka/common/requests/ResourceType.java     |  42 ++++
 .../kafka/server/policy/CreateTopicPolicy.java  |   6 +-
 .../kafka/clients/admin/AclOperationTest.java   |   4 +-
 .../clients/admin/KafkaAdminClientTest.java     |  20 +-
 .../kafka/clients/admin/ResourceTypeTest.java   |   3 +-
 .../common/requests/RequestResponseTest.java    |  60 ++++-
 .../src/main/scala/kafka/admin/AclCommand.scala |  14 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  33 ++-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  17 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   2 +-
 .../scala/kafka/security/auth/Operation.scala   |  20 +-
 .../kafka/security/auth/ResourceType.scala      |   7 +-
 .../security/auth/SimpleAclAuthorizer.scala     |   2 +-
 .../main/scala/kafka/server/AdminManager.scala  | 109 ++++++++-
 .../kafka/server/DelayedCreateTopics.scala      |   8 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  82 ++++++-
 .../main/scala/kafka/server/KafkaServer.scala   |   6 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   | 132 ++++++-----
 .../api/KafkaAdminClientIntegrationTest.scala   | 224 ++++++++++++++++++-
 .../scala/unit/kafka/admin/AclCommandTest.scala |  11 +-
 .../AbstractCreateTopicsRequestTest.scala       |   8 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |  18 +-
 .../processor/internals/StreamsKafkaClient.java |   3 +-
 52 files changed, 2271 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 66548d9..dc00bee 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -19,7 +19,7 @@
               files=".*/requests/AbstractResponse.java"/>
 
     <suppress checks="MethodLength"
-              files="KerberosLogin.java"/>
+              files="KerberosLogin.java|RequestResponseTest.java"/>
 
     <suppress checks="ParameterNumber"
               files="NetworkClient.java"/>
@@ -46,7 +46,7 @@
               files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/>
 
     <suppress checks="JavaNCSS"
-              files="KerberosLogin.java"/>
+              files="AbstractRequest.java|KerberosLogin.java"/>
 
     <suppress checks="JavaNCSS"
               files="AbstractRequest.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
index 14fb61b..062e5e3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
@@ -73,7 +73,17 @@ public enum AclOperation {
     /**
      * CLUSTER_ACTION operation.
      */
-    CLUSTER_ACTION((byte) 9);
+    CLUSTER_ACTION((byte) 9),
+
+    /**
+     * DESCRIBE_CONFIGS operation.
+     */
+    DESCRIBE_CONFIGS((byte) 10),
+
+    /**
+     * ALTER_CONFIGS operation.
+     */
+    ALTER_CONFIGS((byte) 11);
 
     private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 8bb495c..4cfc174 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -32,6 +32,7 @@ import java.util.Properties;
  */
 @InterfaceStability.Unstable
 public abstract class AdminClient implements AutoCloseable {
+
     /**
      * Create a new AdminClient with the given configuration.
      *
@@ -196,6 +197,7 @@ public abstract class AdminClient implements AutoCloseable {
     public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
 
     /**
+<<<<<<< HEAD
      * Similar to #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions),
      * but uses the default options.
      *
@@ -260,4 +262,62 @@ public abstract class AdminClient implements AutoCloseable {
      * @return                  The DeleteAclsResult.
      */
     public abstract DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
+
+
+     /**
+     * Get the configuration for the specified resources with the default options.
+     *
+     * See {@link #describeConfigs(Collection, DescribeConfigsOptions)} for more details.
+     *
+     * @param resources         The resources (topic and broker resource types are currently supported)
+     * @return                  The DescribeConfigsResults
+     */
+    public DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources) {
+        return describeConfigs(resources, new DescribeConfigsOptions());
+    }
+
+    /**
+     * Get the configuration for the specified resources.
+     *
+     * The returned configuration includes default values and the isDefault() method can be used to distinguish them
+     * from user supplied values.
+     *
+     * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
+     * is not disclosed.
+     *
+     * Config entries where isReadOnly() is true cannot be updated.
+     *
+     * @param resources         The resources (topic and broker resource types are currently supported)
+     * @param options           The options to use when describing configs
+     * @return                  The DescribeConfigsResults
+     */
+    public abstract DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources,
+                                                           DescribeConfigsOptions options);
+
+    /**
+     * Update the configuration for the specified resources with the default options.
+     *
+     * See {@link #alterConfigs(Map, AlterConfigsOptions)} for more details.
+     *
+     * @param configs         The resources with their configs (topic is the only resource type with configs that can
+     *                        be updated currently)
+     * @return                The AlterConfigsResults
+     */
+    public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs) {
+        return alterConfigs(configs, new AlterConfigsOptions());
+    }
+
+    /**
+     * Update the configuration for the specified resources with the default options.
+     *
+     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
+     * a particular resource are updated atomically.
+     *
+     * @param configs         The resources with their configs (topic is the only resource type with configs that can
+     *                        be updated currently)
+     * @param options         The options to use when describing configs
+     * @return                The AlterConfigsResults
+     */
+    public abstract AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
new file mode 100644
index 0000000..5698fed
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Unstable
+public class AlterConfigsOptions {
+
+    private Integer timeoutMs = null;
+    private boolean validateOnly = false;
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+
+    public boolean isValidateOnly() {
+        return validateOnly;
+    }
+
+    public AlterConfigsOptions timeoutMs(Integer timeout) {
+        this.timeoutMs = timeout;
+        return this;
+    }
+
+    public AlterConfigsOptions validateOnly(boolean validateOnly) {
+        this.validateOnly = validateOnly;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
new file mode 100644
index 0000000..3f44cfd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+@InterfaceStability.Unstable
+public class AlterConfigsResults {
+
+    private final Map<ConfigResource, KafkaFuture<Void>> futures;
+
+    AlterConfigsResults(Map<ConfigResource, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<ConfigResource, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
new file mode 100644
index 0000000..189a0b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class Config {
+
+    private final Collection<ConfigEntry> entries;
+
+    public Config(Collection<ConfigEntry> entries) {
+        this.entries = entries;
+    }
+
+    public Collection<ConfigEntry> entries() {
+        return Collections.unmodifiableCollection(entries);
+    }
+
+    public ConfigEntry get(String name) {
+        for (ConfigEntry entry : entries)
+            if (entry.name().equals(name))
+                return entry;
+        return null;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        Config config = (Config) o;
+
+        return entries.equals(config.entries);
+    }
+
+    @Override
+    public int hashCode() {
+        return entries.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "Config(entries=" + entries + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
new file mode 100644
index 0000000..cafc8fb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+public class ConfigEntry {
+
+    private final String name;
+    private final String value;
+    private final boolean isDefault;
+    private final boolean isSensitive;
+    private final boolean isReadOnly;
+
+    public ConfigEntry(String name, String value) {
+        this(name, value, false, false, false);
+    }
+
+    public ConfigEntry(String name, String value, boolean isDefault, boolean isSensitive, boolean isReadOnly) {
+        this.name = name;
+        this.value = value;
+        this.isDefault = isDefault;
+        this.isSensitive = isSensitive;
+        this.isReadOnly = isReadOnly;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public String value() {
+        return value;
+    }
+
+    public boolean isDefault() {
+        return isDefault;
+    }
+
+    public boolean isSensitive() {
+        return isSensitive;
+    }
+
+    public boolean isReadOnly() {
+        return isReadOnly;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
new file mode 100644
index 0000000..61af4a8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+public final class ConfigResource {
+
+    public enum Type {
+        BROKER, TOPIC, UNKNOWN;
+    }
+
+    private final Type type;
+    private final String name;
+
+    public ConfigResource(Type type, String name) {
+        this.type = type;
+        this.name = name;
+    }
+
+    public Type type() {
+        return type;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConfigResource that = (ConfigResource) o;
+
+        return type == that.type && name.equals(that.name);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = type.hashCode();
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "ConfigResource{type=" + type + ", name='" + name + "'}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
new file mode 100644
index 0000000..f167bab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for describeConfigs.
+ */
+@InterfaceStability.Unstable
+public class DescribeConfigsOptions {
+    private Integer timeoutMs = null;
+
+    public DescribeConfigsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
new file mode 100644
index 0000000..c29872a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Unstable
+public class DescribeConfigsResults {
+
+    private final Map<ConfigResource, KafkaFuture<Config>> futures;
+
+    DescribeConfigsResults(Map<ConfigResource, KafkaFuture<Config>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<ConfigResource, KafkaFuture<Config>> results() {
+        return futures;
+    }
+
+    public KafkaFuture<Map<ConfigResource, Config>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+                thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
+                    @Override
+                    public Map<ConfigResource, Config> apply(Void v) {
+                        Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
+                        for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
+                            try {
+                                configs.put(entry.getKey(), entry.getValue().get());
+                            } catch (InterruptedException | ExecutionException e) {
+                                // This should be unreachable, because allOf ensured that all the futures
+                                // completed successfully.
+                                throw new RuntimeException(e);
+                            }
+                        }
+                        return configs;
+                    }
+                });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 9f1b1b2..76919ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AlterConfigsRequest;
+import org.apache.kafka.common.requests.AlterConfigsResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.CreateAclsRequest;
@@ -69,8 +71,13 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsRequest;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.Resource;
+import org.apache.kafka.common.requests.ResourceType;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
@@ -355,13 +362,26 @@ public class KafkaAdminClient extends AdminClient {
         Node provide();
     }
 
+    private class ConstantNodeIdProvider implements NodeProvider {
+        private final int nodeId;
+
+        ConstantNodeIdProvider(int nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public Node provide() {
+            return metadata.fetch().nodeById(nodeId);
+        }
+    }
+
     /**
      * Provides a constant node which is known at construction time.
      */
-    private static class ConstantAdminNodeProvider implements NodeProvider {
+    private static class ConstantNodeProvider implements NodeProvider {
         private final Node node;
 
-        ConstantAdminNodeProvider(Node node) {
+        ConstantNodeProvider(Node node) {
             this.node = node;
         }
 
@@ -853,7 +873,7 @@ public class KafkaAdminClient extends AdminClient {
             public void handleResponse(AbstractResponse abstractResponse) {
                 CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
                 // Handle server responses for particular topics.
-                for (Map.Entry<String, CreateTopicsResponse.Error> entry : response.errors().entrySet()) {
+                for (Map.Entry<String, ApiError> entry : response.errors().entrySet()) {
                     KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
                     if (future == null) {
                         log.warn("Server response mentioned unknown topic {}", entry.getKey());
@@ -1071,7 +1091,7 @@ public class KafkaAdminClient extends AdminClient {
                 continue;
             final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>();
             nodeFutures.put(node, nodeFuture);
-            runnable.call(new Call("apiVersions", deadlineMs, new ConstantAdminNodeProvider(node)) {
+            runnable.call(new Call("apiVersions", deadlineMs, new ConstantNodeProvider(node)) {
                     @Override
                     public AbstractRequest.Builder createRequest(int timeoutMs) {
                         return new ApiVersionsRequest.Builder();
@@ -1229,4 +1249,160 @@ public class KafkaAdminClient extends AdminClient {
         }, now);
         return new DeleteAclsResults(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
     }
+
+    @Override
+    public DescribeConfigsResults describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
+        final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>();
+        final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size());
+
+        final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new HashMap<>(configResources.size());
+        final Collection<Resource> brokerResources = new ArrayList<>();
+
+        for (ConfigResource resource : configResources) {
+            if (resource.type() != ConfigResource.Type.BROKER) {
+                singleRequestFutures.put(resource, new KafkaFutureImpl<Config>());
+                singleRequestResources.add(configResourceToResource(resource));
+            } else {
+                brokerFutures.put(resource, new KafkaFutureImpl<Config>());
+                brokerResources.add(configResourceToResource(resource));
+            }
+        }
+
+        final long now = time.milliseconds();
+        runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DescribeConfigsRequest.Builder(singleRequestResources);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
+                for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : singleRequestFutures.entrySet()) {
+                    ConfigResource configResource = entry.getKey();
+                    KafkaFutureImpl<Config> future = entry.getValue();
+                    DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
+                    if (!config.error().is(Errors.NONE)) {
+                        future.completeExceptionally(config.error().exception());
+                        continue;
+                    }
+                    List<ConfigEntry> configEntries = new ArrayList<>();
+                    for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
+                        configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
+                                configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+                    }
+                    future.complete(new Config(configEntries));
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(singleRequestFutures.values(), throwable);
+            }
+        }, now);
+
+        for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : brokerFutures.entrySet()) {
+            final KafkaFutureImpl<Config> brokerFuture = entry.getValue();
+            final Resource resource = configResourceToResource(entry.getKey());
+            int nodeId = Integer.parseInt(resource.name());
+            runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+                    new ConstantNodeIdProvider(nodeId)) {
+
+                @Override
+                AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new DescribeConfigsRequest.Builder(Collections.singleton(resource));
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
+                    DescribeConfigsResponse.Config config = response.configs().get(resource);
+
+                    if (!config.error().is(Errors.NONE))
+                        brokerFuture.completeExceptionally(config.error().exception());
+                    else {
+                        List<ConfigEntry> configEntries = new ArrayList<>();
+                        for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
+                            configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
+                                    configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+                        }
+                        brokerFuture.complete(new Config(configEntries));
+                    }
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(singleRequestFutures.values(), throwable);
+                }
+            }, now);
+        }
+
+        Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size());
+        allFutures.putAll(singleRequestFutures);
+        allFutures.putAll(brokerFutures);
+        return new DescribeConfigsResults(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
+    }
+
+    private Resource configResourceToResource(ConfigResource configResource) {
+        ResourceType resourceType;
+        switch (configResource.type()) {
+            case TOPIC:
+                resourceType = ResourceType.TOPIC;
+                break;
+            case BROKER:
+                resourceType = ResourceType.BROKER;
+                break;
+            default:
+                throw new IllegalArgumentException("Unexpected resource type " + configResource.type());
+        }
+        return new Resource(resourceType, configResource.name());
+    }
+
+    @Override
+    public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
+        final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size());
+        for (ConfigResource configResource : configs.keySet()) {
+            futures.put(configResource, new KafkaFutureImpl<Void>());
+        }
+        final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(configs.size());
+        for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
+            List<AlterConfigsRequest.ConfigEntry> configEntries = new ArrayList<>();
+            for (ConfigEntry configEntry: entry.getValue().entries())
+                configEntries.add(new AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
+            ConfigResource resource = entry.getKey();
+            requestMap.put(configResourceToResource(resource), new AlterConfigsRequest.Config(configEntries));
+        }
+
+        final long now = time.milliseconds();
+        runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            public AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new AlterConfigsRequest.Builder(requestMap, options.isValidateOnly());
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                AlterConfigsResponse response = (AlterConfigsResponse) abstractResponse;
+                for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
+                    KafkaFutureImpl<Void> future = entry.getValue();
+                    ApiException exception = response.errors().get(configResourceToResource(entry.getKey())).exception();
+                    if (exception != null) {
+                        future.completeExceptionally(exception);
+                    } else {
+                        future.complete(null);
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        }, now);
+        return new AlterConfigsResults(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
index 66a91e3..ca4fa0a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
@@ -48,7 +48,12 @@ public enum ResourceType {
     /**
      * The cluster as a whole.
      */
-    CLUSTER((byte) 4);
+    CLUSTER((byte) 4),
+
+    /**
+     * A broker.
+     */
+    BROKER((byte) 5);
 
     private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index dc7fd7c..d2b6d34 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -126,6 +126,13 @@ public class AbstractConfig {
         return (String) get(key);
     }
 
+    public ConfigDef.Type typeOf(String key) {
+        ConfigDef.ConfigKey configKey = definition.configKeys().get(key);
+        if (configKey == null)
+            return null;
+        return configKey.type;
+    }
+
     public Password getPassword(String key) {
         return (Password) get(key);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java
new file mode 100644
index 0000000..9f7211e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class BrokerAuthorizationException extends ApiException {
+    public BrokerAuthorizationException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 709d927..36f6403 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -65,7 +65,9 @@ public enum ApiKeys {
     TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false),
     DESCRIBE_ACLS(29, "DescribeAcls", false),
     CREATE_ACLS(30, "CreateAcls", false),
-    DELETE_ACLS(31, "DeleteAcls", false);
+    DELETE_ACLS(31, "DeleteAcls", false),
+    DESCRIBE_CONFIGS(32, "DescribeConfigs", false),
+    ALTER_CONFIGS(33, "AlterConfigs", false);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index c15edc1..db94b2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.BrokerAuthorizationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ConcurrentTransactionsException;
@@ -489,13 +490,18 @@ public enum Errors {
             return new ProducerIdAuthorizationException(message);
         }
     }),
-    SECURITY_DISABLED(55, "Security features are disabled.",
-        new ApiExceptionBuilder() {
-            @Override
-            public ApiException build(String message) {
-                return new SecurityDisabledException(message);
-            }
-        });
+    SECURITY_DISABLED(55, "Security features are disabled.", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new SecurityDisabledException(message);
+        }
+    }),
+    BROKER_AUTHORIZATION_FAILED(56, "Broker authorization failed", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new BrokerAuthorizationException(message);
+        }
+    });
              
     private interface ApiExceptionBuilder {
         ApiException build(String message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index e970eb1..d5ce469 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -142,9 +142,8 @@ public class Protocol {
              "The broker id of the controller broker."),
          new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
 
-
-    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
-    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
+    public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
+    public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
 
     /* Produce api */
 
@@ -227,8 +226,8 @@ public class Protocol {
                                                                 newThrottleTimeField());
     public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
 
-    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
-    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3};
+    public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
+    public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3};
 
     /* Offset commit api */
     public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -337,7 +336,7 @@ public class Protocol {
     public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
                                                                                 new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
+    public static final Schema[] OFFSET_COMMIT_REQUEST = {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
 
     /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
     public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
@@ -348,7 +347,7 @@ public class Protocol {
             new Field("responses",
                        new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
+    public static final Schema[] OFFSET_COMMIT_RESPONSE = {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
 
     /* Offset fetch api */
 
@@ -423,8 +422,8 @@ public class Protocol {
             new Field("error_code",
                     INT16));
 
-    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
-    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
+    public static final Schema[] OFFSET_FETCH_REQUEST = {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
+    public static final Schema[] OFFSET_FETCH_RESPONSE = {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
 
     /* List offset api */
     public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -520,8 +519,8 @@ public class Protocol {
             new Field("responses",
                     new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
 
-    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
-    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
+    public static final Schema[] LIST_OFFSET_REQUEST = {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
+    public static final Schema[] LIST_OFFSET_RESPONSE = {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
 
     /* Fetch api */
     public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -748,8 +747,8 @@ public class Protocol {
             newThrottleTimeField(),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
 
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
+    public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
+    public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
@@ -766,8 +765,8 @@ public class Protocol {
             new Field("error_code", INT16),
             new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
 
-    public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
-    public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
+    public static final Schema[] LIST_GROUPS_REQUEST = {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
+    public static final Schema[] LIST_GROUPS_RESPONSE = {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
 
     /* Describe group api */
     public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
@@ -814,8 +813,8 @@ public class Protocol {
             newThrottleTimeField(),
             new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
 
-    public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
-    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
+    public static final Schema[] DESCRIBE_GROUPS_REQUEST = {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
+    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
 
     /* Find coordinator api */
     public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
@@ -853,8 +852,8 @@ public class Protocol {
                     "Host and port information for the coordinator for a consumer group."));
 
 
-    public static final Schema[] FIND_COORDINATOR_REQUEST = new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
-    public static final Schema[] FIND_COORDINATOR_RESPONSE = new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
+    public static final Schema[] FIND_COORDINATOR_REQUEST = {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
+    public static final Schema[] FIND_COORDINATOR_RESPONSE = {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
 
     /* Controlled shutdown api */
     public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -872,8 +871,8 @@ public class Protocol {
                                                                                       "The partitions that the broker still leads."));
 
     /* V0 is not supported as it would require changes to the request header not to include `clientId` */
-    public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = new Schema[] {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
-    public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
+    public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
+    public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
 
     /* Join group api */
     public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
@@ -937,6 +936,7 @@ public class Protocol {
                                                                              new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
     public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
+
     public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
             newThrottleTimeField(),
             new Field("error_code", INT16),
@@ -956,8 +956,8 @@ public class Protocol {
                       new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
 
-    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
-    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
+    public static final Schema[] JOIN_GROUP_REQUEST = {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
+    public static final Schema[] JOIN_GROUP_RESPONSE = {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
 
     /* SyncGroup api */
     public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
@@ -976,8 +976,8 @@ public class Protocol {
             newThrottleTimeField(),
             new Field("error_code", INT16),
             new Field("member_assignment", BYTES));
-    public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
-    public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
+    public static final Schema[] SYNC_GROUP_REQUEST = {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
+    public static final Schema[] SYNC_GROUP_RESPONSE = {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
 
     /* Heartbeat api */
     public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -996,8 +996,8 @@ public class Protocol {
             newThrottleTimeField(),
             new Field("error_code", INT16));
 
-    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
-    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
+    public static final Schema[] HEARTBEAT_REQUEST = {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
+    public static final Schema[] HEARTBEAT_RESPONSE = {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
 
     /* Leave group api */
     public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -1013,8 +1013,8 @@ public class Protocol {
             newThrottleTimeField(),
             new Field("error_code", INT16));
 
-    public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
-    public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
+    public static final Schema[] LEAVE_GROUP_REQUEST = {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
+    public static final Schema[] LEAVE_GROUP_RESPONSE = {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
 
     /* Leader and ISR api */
     public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
@@ -1046,8 +1046,8 @@ public class Protocol {
                                                                        new Field("partitions",
                                                                                  new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0)));
 
-    public static final Schema[] LEADER_AND_ISR_REQUEST = new Schema[] {LEADER_AND_ISR_REQUEST_V0};
-    public static final Schema[] LEADER_AND_ISR_RESPONSE = new Schema[] {LEADER_AND_ISR_RESPONSE_V0};
+    public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0};
+    public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0};
 
     /* Replica api */
     public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
@@ -1068,8 +1068,8 @@ public class Protocol {
                                                                      new Field("partitions",
                                                                                new ArrayOf(STOP_REPLICA_RESPONSE_PARTITION_V0)));
 
-    public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0};
-    public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0};
+    public static final Schema[] STOP_REPLICA_REQUEST = {STOP_REPLICA_REQUEST_V0};
+    public static final Schema[] STOP_REPLICA_RESPONSE = {STOP_REPLICA_RESPONSE_V0};
 
     /* Update metadata api */
 
@@ -1148,9 +1148,9 @@ public class Protocol {
 
     public static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2;
 
-    public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1,
+    public static final Schema[] UPDATE_METADATA_REQUEST = {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1,
         UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3};
-    public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1,
+    public static final Schema[] UPDATE_METADATA_RESPONSE = {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1,
         UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3};
 
     /* SASL handshake api */
@@ -1161,8 +1161,8 @@ public class Protocol {
             new Field("error_code", INT16),
             new Field("enabled_mechanisms", new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server."));
 
-    public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0};
-    public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0};
+    public static final Schema[] SASL_HANDSHAKE_REQUEST = {SASL_HANDSHAKE_REQUEST_V0};
+    public static final Schema[] SASL_HANDSHAKE_RESPONSE = {SASL_HANDSHAKE_RESPONSE_V0};
 
     /* ApiVersion api */
     public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
@@ -1185,8 +1185,8 @@ public class Protocol {
     public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
 
     /* Admin requests common */
-    public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"),
-        new Field("config_value", STRING, "Configuration value"));
+    public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"),
+        new Field("config_value", NULLABLE_STRING, "Configuration value"));
 
     public static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new Schema(
         new Field("partition_id", INT32),
@@ -1212,7 +1212,7 @@ public class Protocol {
         new Field("replica_assignment",
             new ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY),
             "Replica assignment among kafka brokers for this topic partitions. If this is set num_partitions and replication_factor must be unset."),
-        new Field("configs",
+        new Field("config_entries",
             new ArrayOf(CONFIG_ENTRY),
             "Topic level configuration for topic to be set."));
 
@@ -1254,8 +1254,8 @@ public class Protocol {
                     new ArrayOf(TOPIC_ERROR),
                     "An array of per topic errors."));
 
-    public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
-    public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
+    public static final Schema[] CREATE_TOPICS_REQUEST = {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
+    public static final Schema[] CREATE_TOPICS_RESPONSE = {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
 
     /* DeleteTopic api */
     public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
@@ -1278,8 +1278,8 @@ public class Protocol {
                 new ArrayOf(TOPIC_ERROR_CODE),
                 "An array of per topic error codes."));
 
-    public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
-    public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
+    public static final Schema[] DELETE_TOPICS_REQUEST = {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
+    public static final Schema[] DELETE_TOPICS_RESPONSE = {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
 
     public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
                                                                                 new Field("offset", INT64, "The offset before which the messages will be deleted."));
@@ -1301,8 +1301,8 @@ public class Protocol {
             newThrottleTimeField(),
             new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
-    public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
+    public static final Schema[] DELETE_RECORDS_REQUEST = {DELETE_RECORDS_REQUEST_V0};
+    public static final Schema[] DELETE_RECORDS_RESPONSE = {DELETE_RECORDS_RESPONSE_V0};
 
     /* Transactions API */
     public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
@@ -1327,9 +1327,9 @@ public class Protocol {
                     "The epoch for the producer id. Will always be 0 if no transactional id was specified in the request.")
     );
 
-    public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0};
+    public static final Schema[] INIT_PRODUCER_ID_REQUEST = {INIT_PRODUCER_ID_REQUEST_V0};
 
-    public static final Schema[] INIT_PRODUCER_ID_RESPONSE = new Schema[] {INIT_PRODUCER_ID_RESPONSE_V0};
+    public static final Schema[] INIT_PRODUCER_ID_RESPONSE = {INIT_PRODUCER_ID_RESPONSE_V0};
 
     /* Offsets for Leader Epoch api */
     public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema(
@@ -1378,8 +1378,8 @@ public class Protocol {
                     new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
                     "An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
 
-    public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
-    public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
+    public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
+    public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
 
     public static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
             new Field("transactional_id",
@@ -1408,8 +1408,8 @@ public class Protocol {
                                                                               INT16)))))))
     );
 
-    public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0};
-    public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = new Schema[] {ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
+    public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = {ADD_PARTITIONS_TO_TXN_REQUEST_V0};
+    public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = {ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
 
     public static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
             new Field("transactional_id",
@@ -1432,8 +1432,8 @@ public class Protocol {
                     "An integer error code.")
     );
 
-    public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = new Schema[] {ADD_OFFSETS_TO_TXN_REQUEST_V0};
-    public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = new Schema[] {ADD_OFFSETS_TO_TXN_RESPONSE_V0};
+    public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = {ADD_OFFSETS_TO_TXN_REQUEST_V0};
+    public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = {ADD_OFFSETS_TO_TXN_RESPONSE_V0};
 
     public static final Schema END_TXN_REQUEST_V0 = new Schema(
             new Field("transactional_id",
@@ -1457,8 +1457,8 @@ public class Protocol {
                     "An integer error code.")
     );
 
-    public static final Schema[] END_TXN_REQUEST = new Schema[] {END_TXN_REQUEST_V0};
-    public static final Schema[] END_TXN_RESPONSE = new Schema[] {END_TXN_RESPONSE_V0};
+    public static final Schema[] END_TXN_REQUEST = {END_TXN_REQUEST_V0};
+    public static final Schema[] END_TXN_RESPONSE = {END_TXN_RESPONSE_V0};
 
     public static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
             new Field("producer_id",
@@ -1506,8 +1506,8 @@ public class Protocol {
             new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers.")
     );
 
-    public static final Schema[] WRITE_TXN_REQUEST = new Schema[] {WRITE_TXN_MARKERS_REQUEST_V0};
-    public static final Schema[] WRITE_TXN_RESPONSE = new Schema[] {WRITE_TXN_MARKERS_RESPONSE_V0};
+    public static final Schema[] WRITE_TXN_REQUEST = {WRITE_TXN_MARKERS_REQUEST_V0};
+    public static final Schema[] WRITE_TXN_RESPONSE = {WRITE_TXN_MARKERS_RESPONSE_V0};
 
     public static final Schema TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0 = new Schema(
             new Field("partition", INT32),
@@ -1546,8 +1546,66 @@ public class Protocol {
                     "Errors per partition from writing markers.")
     );
 
-    public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] {TXN_OFFSET_COMMIT_REQUEST_V0};
-    public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] {TXN_OFFSET_COMMIT_RESPONSE_V0};
+    public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = {TXN_OFFSET_COMMIT_REQUEST_V0};
+    public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = {TXN_OFFSET_COMMIT_RESPONSE_V0};
+
+    /* DescribeConfigs API */
+
+    public static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
+            new Field("resource_type", INT8),
+            new Field("resource_name", STRING),
+            new Field("config_names", ArrayOf.nullable(STRING))
+    );
+
+    public static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema(
+            new Field("resources", new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0),
+                    "An array of config resources to be returned."));
+
+    public static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
+            new Field("error_code", INT16),
+            new Field("error_message", NULLABLE_STRING),
+            new Field("resource_type", INT8),
+            new Field("resource_name", STRING),
+            new Field("config_entries", new ArrayOf(new Schema(
+                    new Field("config_name", STRING),
+                    new Field("config_value", NULLABLE_STRING),
+                    new Field("read_only", BOOLEAN),
+                    new Field("is_default", BOOLEAN),
+                    new Field("is_sensitive", BOOLEAN)
+            ))
+    ));
+
+    public static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
+            new Field("resources", new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0)));
+
+    public static final Schema[] DESCRIBE_CONFIGS_REQUEST = {DESCRIBE_CONFIGS_REQUEST_V0};
+    public static final Schema[] DESCRIBE_CONFIGS_RESPONSE = {DESCRIBE_CONFIGS_RESPONSE_V0};
+
+    /* AlterConfigs API */
+
+    public static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
+            new Field("resource_type", INT8),
+            new Field("resource_name", STRING),
+            new Field("config_entries", new ArrayOf(CONFIG_ENTRY)));
+
+    public static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema(
+            new Field("resources", new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0),
+                    "An array of resources to update with the provided configs."),
+            new Field("validate_only", BOOLEAN));
+
+    public static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
+            new Field("error_code", INT16),
+            new Field("error_message", NULLABLE_STRING),
+            new Field("resource_type", INT8),
+            new Field("resource_name", STRING));
+
+    public static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema(
+            newThrottleTimeField(),
+            new Field("resources", new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0)));
+
+    public static final Schema[] ALTER_CONFIGS_REQUEST = {ALTER_CONFIGS_REQUEST_V0};
+    public static final Schema[] ALTER_CONFIGS_RESPONSE = {ALTER_CONFIGS_RESPONSE_V0};
 
     public static final Schema DESCRIBE_ACLS_REQUEST_V0 = new Schema(
         new Field("resource_type", INT8, "The filter resource type."),
@@ -1675,6 +1733,8 @@ public class Protocol {
         REQUESTS[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_REQUEST;
         REQUESTS[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_REQUEST;
         REQUESTS[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_REQUEST;
+        REQUESTS[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_REQUEST;
+        REQUESTS[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1708,6 +1768,8 @@ public class Protocol {
         RESPONSES[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_RESPONSE;
         RESPONSES[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_RESPONSE;
+        RESPONSES[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_RESPONSE;
+        RESPONSES[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index f275ada..fbb520c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -56,8 +56,7 @@ public class Schema extends Type {
                 Object value = field.type().validate(r.get(field));
                 field.type.write(buffer, value);
             } catch (Exception e) {
-                throw new SchemaException("Error writing field '" + field.name +
-                                          "': " +
+                throw new SchemaException("Error writing field '" + field.name + "': " +
                                           (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
         }
@@ -73,8 +72,7 @@ public class Schema extends Type {
             try {
                 objects[i] = fields[i].type.read(buffer);
             } catch (Exception e) {
-                throw new SchemaException("Error reading field '" + fields[i].name +
-                                          "': " +
+                throw new SchemaException("Error reading field '" + fields[i].name + "': " +
                                           (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
         }
@@ -88,8 +86,14 @@ public class Schema extends Type {
     public int sizeOf(Object o) {
         int size = 0;
         Struct r = (Struct) o;
-        for (Field field : fields)
-            size += field.type.sizeOf(r.get(field));
+        for (Field field : fields) {
+            try {
+                size += field.type.sizeOf(r.get(field));
+            } catch (Exception e) {
+                throw new SchemaException("Error computing size for field '" + field.name + "': " +
+                        (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
+            }
+        }
         return size;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 16c0c21..2cd88e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -208,6 +208,12 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case DELETE_ACLS:
                 request = new DeleteAclsRequest(struct, version);
                 break;
+            case DESCRIBE_CONFIGS:
+                request = new DescribeConfigsRequest(struct, version);
+                break;
+            case ALTER_CONFIGS:
+                request = new AlterConfigsRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index aee4f5e..1000ef5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -114,6 +114,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new CreateAclsResponse(struct);
             case DELETE_ACLS:
                 return new DeleteAclsResponse(struct);
+            case DESCRIBE_CONFIGS:
+                return new DescribeConfigsResponse(struct);
+            case ALTER_CONFIGS:
+                return new AlterConfigsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
new file mode 100644
index 0000000..a964f85
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterConfigsRequest extends AbstractRequest {
+
+    private static final String RESOURCES_KEY_NAME = "resources";
+    private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+    private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
+
+    private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
+    private static final String CONFIG_NAME = "config_name";
+    private static final String CONFIG_VALUE = "config_value";
+
+    public static class Config {
+        private final Collection<ConfigEntry> entries;
+
+        public Config(Collection<ConfigEntry> entries) {
+            this.entries = entries;
+        }
+
+        public Collection<ConfigEntry> entries() {
+            return entries;
+        }
+    }
+
+    public static class ConfigEntry {
+        private final String name;
+        private final String value;
+
+        public ConfigEntry(String name, String value) {
+            this.name = name;
+            this.value = value;
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public String value() {
+            return value;
+        }
+
+    }
+
+    public static class Builder extends AbstractRequest.Builder {
+
+        private final Map<Resource, Config> configs;
+        private final boolean validateOnly;
+
+        public Builder(Map<Resource, Config> configs, boolean validateOnly) {
+            super(ApiKeys.ALTER_CONFIGS);
+            this.configs = configs;
+            this.validateOnly = validateOnly;
+        }
+
+        @Override
+        public AlterConfigsRequest build(short version) {
+            return new AlterConfigsRequest(version, configs, validateOnly);
+        }
+    }
+
+    private final Map<Resource, Config> configs;
+    private final boolean validateOnly;
+
+    public AlterConfigsRequest(short version, Map<Resource, Config> configs, boolean validateOnly) {
+        super(version);
+        this.configs = configs;
+        this.validateOnly = validateOnly;
+    }
+
+    public AlterConfigsRequest(Struct struct, short version) {
+        super(version);
+        validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
+        Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+        configs = new HashMap<>(resourcesArray.length);
+        for (Object resourcesObj : resourcesArray) {
+            Struct resourcesStruct = (Struct) resourcesObj;
+
+            ResourceType resourceType = ResourceType.forId(resourcesStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = resourcesStruct.getString(RESOURCE_NAME_KEY_NAME);
+            Resource resource = new Resource(resourceType, resourceName);
+
+            Object[] configEntriesArray = resourcesStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
+            List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length);
+            for (Object configEntriesObj: configEntriesArray) {
+                Struct configEntriesStruct = (Struct) configEntriesObj;
+                String configName = configEntriesStruct.getString(CONFIG_NAME);
+                String configValue = configEntriesStruct.getString(CONFIG_VALUE);
+                configEntries.add(new ConfigEntry(configName, configValue));
+            }
+            Config config = new Config(configEntries);
+            configs.put(resource, config);
+        }
+    }
+
+    public Map<Resource, Config> configs() {
+        return configs;
+    }
+
+    public boolean validateOnly() {
+        return validateOnly;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.requestSchema(version()));
+        struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
+        List<Struct> resourceStructs = new ArrayList<>(configs.size());
+        for (Map.Entry<Resource, Config> entry : configs.entrySet()) {
+            Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+
+            Resource resource = entry.getKey();
+            resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+            resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+
+            Config config = entry.getValue();
+            List<Struct> configEntryStructs = new ArrayList<>(config.entries.size());
+            for (ConfigEntry configEntry : config.entries) {
+                Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME);
+                configEntriesStruct.set(CONFIG_NAME, configEntry.name);
+                configEntriesStruct.set(CONFIG_VALUE, configEntry.value);
+                configEntryStructs.add(configEntriesStruct);
+            }
+            resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0]));
+
+            resourceStructs.add(resourceStruct);
+        }
+        struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        short version = version();
+        switch (version) {
+            case 0:
+                ApiError error = ApiError.fromThrowable(e);
+                Map<Resource, ApiError> errors = new HashMap<>(configs.size());
+                for (Resource resource : configs.keySet())
+                    errors.put(resource, error);
+                return new AlterConfigsResponse(throttleTimeMs, errors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        version, this.getClass().getSimpleName(), ApiKeys.ALTER_CONFIGS.latestVersion()));
+        }
+    }
+
+    public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {
+        return new AlterConfigsRequest(ApiKeys.ALTER_CONFIGS.parseRequest(version, buffer), version);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
new file mode 100644
index 0000000..8f904d8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterConfigsResponse extends AbstractResponse {
+
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
+
+    private static final String RESOURCES_KEY_NAME = "resources";
+    private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+    private final int throttleTimeMs;
+    private final Map<Resource, ApiError> errors;
+
+    public AlterConfigsResponse(int throttleTimeMs, Map<Resource, ApiError> errors) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.errors = errors;
+
+    }
+
+    public AlterConfigsResponse(Struct struct) {
+        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+        errors = new HashMap<>(resourcesArray.length);
+        for (Object resourceObj : resourcesArray) {
+            Struct resourceStruct = (Struct) resourceObj;
+            ApiError error = new ApiError(resourceStruct);
+            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+            errors.put(new Resource(resourceType, resourceName), error);
+        }
+    }
+
+    public Map<Resource, ApiError> errors() {
+        return errors;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        List<Struct> resourceStructs = new ArrayList<>(errors.size());
+        for (Map.Entry<Resource, ApiError> entry : errors.entrySet()) {
+            Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+            Resource resource = entry.getKey();
+            entry.getValue().write(resourceStruct);
+            resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+            resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+            resourceStructs.add(resourceStruct);
+        }
+        struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+        return struct;
+    }
+
+    public static AlterConfigsResponse parse(ByteBuffer buffer, short version) {
+        return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer));
+    }
+
+}