You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/05/29 22:29:55 UTC

[kafka] branch 2.6 updated: KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (#8723)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 2965c9c  KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (#8723)
2965c9c is described below

commit 2965c9c7b44fef6ae953e69f07394cf3b5f6d120
Author: Shailesh Panwar <52...@users.noreply.github.com>
AuthorDate: Fri May 29 15:18:50 2020 -0700

    KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (#8723)
    
    Adds documentation and type of ConfigEntry in version 3 of DescribeConfigsResponse
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../apache/kafka/clients/admin/ConfigEntry.java    | 39 ++++++++-
 .../clients/admin/DescribeConfigsOptions.java      | 15 ++++
 .../kafka/clients/admin/KafkaAdminClient.java      | 55 +++++++++++--
 .../apache/kafka/common/config/AbstractConfig.java |  7 ++
 .../common/requests/DescribeConfigsRequest.java    | 39 ++++++++-
 .../common/requests/DescribeConfigsResponse.java   | 94 +++++++++++++++++++++-
 .../common/message/DescribeConfigsRequest.json     | 10 ++-
 .../common/message/DescribeConfigsResponse.json    |  8 +-
 .../org/apache/kafka/clients/admin/ConfigTest.java |  3 +-
 .../kafka/common/config/AbstractConfigTest.java    | 19 +++++
 .../kafka/common/requests/RequestResponseTest.java | 26 +++++-
 .../src/main/scala/kafka/server/AdminManager.scala | 43 +++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 14 files changed, 328 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 42cc627..b6d947f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -37,6 +37,8 @@ public class ConfigEntry {
     private final boolean isSensitive;
     private final boolean isReadOnly;
     private final List<ConfigSynonym> synonyms;
+    private final ConfigType type;
+    private final String documentation;
 
     /**
      * Create a configuration entry with the provided values.
@@ -65,7 +67,9 @@ public class ConfigEntry {
              isDefault ? ConfigSource.DEFAULT_CONFIG : ConfigSource.UNKNOWN,
              isSensitive,
              isReadOnly,
-             Collections.<ConfigSynonym>emptyList());
+             Collections.<ConfigSynonym>emptyList(),
+             ConfigType.UNKNOWN,
+             null);
     }
 
     /**
@@ -79,7 +83,7 @@ public class ConfigEntry {
      * @param synonyms Synonym configs in order of precedence
      */
     ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly,
-                List<ConfigSynonym> synonyms) {
+                List<ConfigSynonym> synonyms, ConfigType type, String documentation) {
         Objects.requireNonNull(name, "name should not be null");
         this.name = name;
         this.value = value;
@@ -87,6 +91,8 @@ public class ConfigEntry {
         this.isSensitive = isSensitive;
         this.isReadOnly = isReadOnly;
         this.synonyms = synonyms;
+        this.type = type;
+        this.documentation = documentation;
     }
 
     /**
@@ -141,6 +147,20 @@ public class ConfigEntry {
         return  synonyms;
     }
 
+    /**
+     * Return the config data type.
+     */
+    public ConfigType type() {
+        return type;
+    }
+
+    /**
+     * Return the config documentation.
+     */
+    public String documentation() {
+        return documentation;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)
@@ -183,6 +203,21 @@ public class ConfigEntry {
                 ")";
     }
 
+    /**
+     * Data type of configuration entry.
+     */
+    public enum ConfigType {
+        UNKNOWN,
+        BOOLEAN,
+        STRING,
+        INT,
+        SHORT,
+        LONG,
+        DOUBLE,
+        LIST,
+        CLASS,
+        PASSWORD
+    }
 
     /**
      * Source of configuration entries.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
index 450cb82..bfb9c18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -30,6 +30,7 @@ import java.util.Collection;
 public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
 
     private boolean includeSynonyms = false;
+    private boolean includeDocumentation = false;
 
     /**
      * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
@@ -50,6 +51,13 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
     }
 
     /**
+     * Return true if config documentation should be returned in the response.
+     */
+    public boolean includeDocumentation() {
+        return includeDocumentation;
+    }
+
+    /**
      * Set to true if synonym configs should be returned in the response.
      */
     public DescribeConfigsOptions includeSynonyms(boolean includeSynonyms) {
@@ -57,4 +65,11 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
         return this;
     }
 
+    /**
+     * Set to true if config documentation should be returned in the response.
+     */
+    public DescribeConfigsOptions includeDocumentation(boolean includeDocumentation) {
+        this.includeDocumentation = includeDocumentation;
+        return this;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 52e5c37..a968eed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1472,7 +1472,9 @@ public class KafkaAdminClient extends AdminClient {
                                                 configSource(DescribeConfigsResponse.ConfigSource.forId(config.configSource())),
                                                 config.isSensitive(),
                                                 config.readOnly(),
-                                                Collections.emptyList()))
+                                                Collections.emptyList(),
+                                                null,
+                                                null))
                                         .collect(Collectors.toSet()));
                                 topicMetadataAndConfig = new TopicMetadataAndConfig(result.numPartitions(),
                                         result.replicationFactor(),
@@ -1936,7 +1938,8 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
                     return new DescribeConfigsRequest.Builder(unifiedRequestResources)
-                            .includeSynonyms(options.includeSynonyms());
+                            .includeSynonyms(options.includeSynonyms())
+                            .includeDocumentation(options.includeDocumentation());
                 }
 
                 @Override
@@ -1960,7 +1963,8 @@ public class KafkaAdminClient extends AdminClient {
                             configEntries.add(new ConfigEntry(configEntry.name(),
                                     configEntry.value(), configSource(configEntry.source()),
                                     configEntry.isSensitive(), configEntry.isReadOnly(),
-                                    configSynonyms(configEntry)));
+                                    configSynonyms(configEntry), configType(configEntry.type()),
+                                    configEntry.documentation()));
                         }
                         future.complete(new Config(configEntries));
                     }
@@ -1983,7 +1987,8 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
                     return new DescribeConfigsRequest.Builder(Collections.singleton(resource))
-                            .includeSynonyms(options.includeSynonyms());
+                            .includeSynonyms(options.includeSynonyms())
+                            .includeDocumentation(options.includeDocumentation());
                 }
 
                 @Override
@@ -2003,7 +2008,7 @@ public class KafkaAdminClient extends AdminClient {
                         for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
                             configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
                                 configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(),
-                                configSynonyms(configEntry)));
+                                configSynonyms(configEntry), configType(configEntry.type()), configEntry.documentation()));
                         }
                         brokerFuture.complete(new Config(configEntries));
                     }
@@ -2056,6 +2061,46 @@ public class KafkaAdminClient extends AdminClient {
         return configSource;
     }
 
+    private ConfigEntry.ConfigType configType(DescribeConfigsResponse.ConfigType type) {
+        if (type == null) {
+            return ConfigEntry.ConfigType.UNKNOWN;
+        }
+
+        ConfigEntry.ConfigType configType;
+        switch (type) {
+            case BOOLEAN:
+                configType = ConfigEntry.ConfigType.BOOLEAN;
+                break;
+            case CLASS:
+                configType = ConfigEntry.ConfigType.CLASS;
+                break;
+            case DOUBLE:
+                configType = ConfigEntry.ConfigType.DOUBLE;
+                break;
+            case INT:
+                configType = ConfigEntry.ConfigType.INT;
+                break;
+            case LIST:
+                configType = ConfigEntry.ConfigType.LIST;
+                break;
+            case LONG:
+                configType = ConfigEntry.ConfigType.LONG;
+                break;
+            case PASSWORD:
+                configType = ConfigEntry.ConfigType.PASSWORD;
+                break;
+            case SHORT:
+                configType = ConfigEntry.ConfigType.SHORT;
+                break;
+            case STRING:
+                configType = ConfigEntry.ConfigType.STRING;
+                break;
+            default:
+                configType = ConfigEntry.ConfigType.UNKNOWN;
+        }
+        return configType;
+    }
+
     @Override
     @Deprecated
     public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e28b8c9..968c549 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -201,6 +201,13 @@ public class AbstractConfig {
         return configKey.type;
     }
 
+    public String documentationOf(String key) {
+        ConfigDef.ConfigKey configKey = definition.configKeys().get(key);
+        if (configKey == null)
+            return null;
+        return configKey.documentation;
+    }
+
     public Password getPassword(String key) {
         return (Password) get(key);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 4bcc380..8ea7630 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -43,6 +43,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
     private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
     private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
     private static final String CONFIG_NAMES_KEY_NAME = "config_names";
+    private static final String INCLUDE_DOCUMENTATION = "include_documentation";
 
     private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
             new Field(RESOURCE_TYPE_KEY_NAME, INT8),
@@ -61,13 +62,24 @@ public class DescribeConfigsRequest extends AbstractRequest {
      */
     private static final Schema DESCRIBE_CONFIGS_REQUEST_V2 = DESCRIBE_CONFIGS_REQUEST_V1;
 
+    private static final Schema DESCRIBE_CONFIGS_REQUEST_V3 = new Schema(
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."),
+            new Field(INCLUDE_SYNONYMS, BOOLEAN),
+            new Field(INCLUDE_DOCUMENTATION, BOOLEAN));
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2};
+        return new Schema[] {
+            DESCRIBE_CONFIGS_REQUEST_V0,
+            DESCRIBE_CONFIGS_REQUEST_V1,
+            DESCRIBE_CONFIGS_REQUEST_V2,
+            DESCRIBE_CONFIGS_REQUEST_V3
+        };
     }
 
     public static class Builder extends AbstractRequest.Builder<DescribeConfigsRequest> {
         private final Map<ConfigResource, Collection<String>> resourceToConfigNames;
         private boolean includeSynonyms;
+        private boolean includeDocumentation;
 
         public Builder(Map<ConfigResource, Collection<String>> resourceToConfigNames) {
             super(ApiKeys.DESCRIBE_CONFIGS);
@@ -79,6 +91,11 @@ public class DescribeConfigsRequest extends AbstractRequest {
             return this;
         }
 
+        public Builder includeDocumentation(boolean includeDocumentation) {
+            this.includeDocumentation = includeDocumentation;
+            return this;
+        }
+
         public Builder(Collection<ConfigResource> resources) {
             this(toResourceToConfigNames(resources));
         }
@@ -92,17 +109,27 @@ public class DescribeConfigsRequest extends AbstractRequest {
 
         @Override
         public DescribeConfigsRequest build(short version) {
-            return new DescribeConfigsRequest(version, resourceToConfigNames, includeSynonyms);
+            return new DescribeConfigsRequest(
+                version, resourceToConfigNames, includeSynonyms, includeDocumentation);
         }
     }
 
     private final Map<ConfigResource, Collection<String>> resourceToConfigNames;
     private final boolean includeSynonyms;
+    private final boolean includeDocumentation;
 
-    public DescribeConfigsRequest(short version, Map<ConfigResource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) {
+    public DescribeConfigsRequest(
+        short version, Map<ConfigResource, Collection<String>> resourceToConfigNames,
+        boolean includeSynonyms) {
+        this(version, resourceToConfigNames, includeSynonyms, false);
+    }
+    public DescribeConfigsRequest(
+        short version, Map<ConfigResource, Collection<String>> resourceToConfigNames,
+        boolean includeSynonyms, boolean includeDocumentation) {
         super(ApiKeys.DESCRIBE_CONFIGS, version);
         this.resourceToConfigNames = Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames");
         this.includeSynonyms = includeSynonyms;
+        this.includeDocumentation = includeDocumentation;
     }
 
     public DescribeConfigsRequest(Struct struct, short version) {
@@ -125,6 +152,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
             resourceToConfigNames.put(new ConfigResource(resourceType, resourceName), configNames);
         }
         this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? struct.getBoolean(INCLUDE_SYNONYMS) : false;
+        this.includeDocumentation = struct.hasField(INCLUDE_DOCUMENTATION) ? struct.getBoolean(INCLUDE_DOCUMENTATION) : false;
     }
 
     public Collection<ConfigResource> resources() {
@@ -142,6 +170,10 @@ public class DescribeConfigsRequest extends AbstractRequest {
         return includeSynonyms;
     }
 
+    public boolean includeDocumentation() {
+        return includeDocumentation;
+    }
+
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
@@ -159,6 +191,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
         }
         struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
         struct.setIfExists(INCLUDE_SYNONYMS, includeSynonyms);
+        struct.setIfExists(INCLUDE_DOCUMENTATION, includeDocumentation);
         return struct;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index c24bbe6..bd46bc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -56,6 +56,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
     private static final String IS_SENSITIVE_KEY_NAME = "is_sensitive";
     private static final String IS_DEFAULT_KEY_NAME = "is_default";
     private static final String READ_ONLY_KEY_NAME = "read_only";
+    private static final String CONFIG_TYPE_KEY_NAME = "config_type";
+    private static final String CONFIG_DOCUMENTATION_KEY_NAME = "config_documentation";
 
     private static final String CONFIG_SYNONYMS_KEY_NAME = "config_synonyms";
     private static final String CONFIG_SOURCE_KEY_NAME = "config_source";
@@ -80,6 +82,16 @@ public class DescribeConfigsResponse extends AbstractResponse {
             new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
             new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)));
 
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V3 = new Schema(
+            new Field(CONFIG_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+            new Field(READ_ONLY_KEY_NAME, BOOLEAN),
+            new Field(CONFIG_SOURCE_KEY_NAME, INT8),
+            new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
+            new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)),
+            new Field(CONFIG_TYPE_KEY_NAME, INT8),
+            new Field(CONFIG_DOCUMENTATION_KEY_NAME, NULLABLE_STRING));
+
     private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
             ERROR_CODE,
             ERROR_MESSAGE,
@@ -94,6 +106,13 @@ public class DescribeConfigsResponse extends AbstractResponse {
             new Field(RESOURCE_NAME_KEY_NAME, STRING),
             new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V1)));
 
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V3 = new Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+            new Field(RESOURCE_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V3)));
+
     private static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0)));
@@ -102,13 +121,22 @@ public class DescribeConfigsResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1)));
 
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_V3 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V3)));
+
     /**
      * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
      */
     private static final Schema DESCRIBE_CONFIGS_RESPONSE_V2 = DESCRIBE_CONFIGS_RESPONSE_V1;
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0, DESCRIBE_CONFIGS_RESPONSE_V1, DESCRIBE_CONFIGS_RESPONSE_V2};
+        return new Schema[]{
+            DESCRIBE_CONFIGS_RESPONSE_V0,
+            DESCRIBE_CONFIGS_RESPONSE_V1,
+            DESCRIBE_CONFIGS_RESPONSE_V2,
+            DESCRIBE_CONFIGS_RESPONSE_V3
+        };
     }
 
     public static class Config {
@@ -136,9 +164,16 @@ public class DescribeConfigsResponse extends AbstractResponse {
         private final ConfigSource source;
         private final boolean readOnly;
         private final Collection<ConfigSynonym> synonyms;
+        private final ConfigType type;
+        private final String documentation;
 
         public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly,
-                           Collection<ConfigSynonym> synonyms) {
+            Collection<ConfigSynonym> synonyms) {
+            this(name, value, source, isSensitive, readOnly, synonyms, ConfigType.UNKNOWN, null);
+        }
+
+        public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly,
+                           Collection<ConfigSynonym> synonyms, ConfigType type, String documentation) {
 
             this.name = Objects.requireNonNull(name, "name");
             this.value = value;
@@ -146,6 +181,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
             this.isSensitive = isSensitive;
             this.readOnly = readOnly;
             this.synonyms = Objects.requireNonNull(synonyms, "synonyms");
+            this.type = type;
+            this.documentation = documentation;
         }
 
         public String name() {
@@ -171,6 +208,14 @@ public class DescribeConfigsResponse extends AbstractResponse {
         public Collection<ConfigSynonym> synonyms() {
             return synonyms;
         }
+
+        public ConfigType type() {
+            return type;
+        }
+
+        public String documentation() {
+            return documentation;
+        }
     }
 
     public enum ConfigSource {
@@ -198,6 +243,34 @@ public class DescribeConfigsResponse extends AbstractResponse {
         }
     }
 
+    public enum ConfigType {
+        UNKNOWN((byte) 0),
+        BOOLEAN((byte) 1),
+        STRING((byte) 2),
+        INT((byte) 3),
+        SHORT((byte) 4),
+        LONG((byte) 5),
+        DOUBLE((byte) 6),
+        LIST((byte) 7),
+        CLASS((byte) 8),
+        PASSWORD((byte) 9);
+
+        final byte id;
+        private static final ConfigType[] VALUES = values();
+
+        ConfigType(byte id) {
+            this.id = id;
+        }
+
+        public static ConfigType forId(byte id) {
+            if (id < 0)
+                throw new IllegalArgumentException("id should be positive, id: " + id);
+            if (id >= VALUES.length)
+                return UNKNOWN;
+            return VALUES[id];
+        }
+    }
+
     public static class ConfigSynonym {
         private final String name;
         private final String value;
@@ -248,6 +321,14 @@ public class DescribeConfigsResponse extends AbstractResponse {
                 String configName = configEntriesStruct.getString(CONFIG_NAME_KEY_NAME);
                 String configValue = configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME);
                 boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME);
+                ConfigType type = ConfigType.UNKNOWN;
+                if (configEntriesStruct.hasField(CONFIG_TYPE_KEY_NAME)) {
+                    type = ConfigType.forId(configEntriesStruct.getByte(CONFIG_TYPE_KEY_NAME));
+                }
+                String documentation = null;
+                if (configEntriesStruct.hasField(CONFIG_DOCUMENTATION_KEY_NAME)) {
+                    documentation = configEntriesStruct.getString(CONFIG_DOCUMENTATION_KEY_NAME);
+                }
                 ConfigSource configSource;
                 if (configEntriesStruct.hasField(CONFIG_SOURCE_KEY_NAME))
                     configSource = ConfigSource.forId(configEntriesStruct.getByte(CONFIG_SOURCE_KEY_NAME));
@@ -281,9 +362,10 @@ public class DescribeConfigsResponse extends AbstractResponse {
                         ConfigSource source = ConfigSource.forId(synonymStruct.getByte(CONFIG_SOURCE_KEY_NAME));
                         synonyms.add(new ConfigSynonym(synonymConfigName, synonymConfigValue, source));
                     }
-                } else
+                } else {
                     synonyms = Collections.emptyList();
-                configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms));
+                }
+                configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms, type, documentation));
             }
             Config config = new Config(error, configEntries);
             configs.put(resource, config);
@@ -336,6 +418,10 @@ public class DescribeConfigsResponse extends AbstractResponse {
                 configEntriesStruct.setIfExists(CONFIG_SOURCE_KEY_NAME, configEntry.source.id);
                 configEntriesStruct.setIfExists(IS_DEFAULT_KEY_NAME, configEntry.source == ConfigSource.DEFAULT_CONFIG);
                 configEntriesStruct.set(READ_ONLY_KEY_NAME, configEntry.readOnly);
+                if (configEntriesStruct.hasField(CONFIG_TYPE_KEY_NAME) && configEntry.type != null) {
+                    configEntriesStruct.set(CONFIG_TYPE_KEY_NAME, configEntry.type.id);
+                }
+                configEntriesStruct.setIfExists(CONFIG_DOCUMENTATION_KEY_NAME, configEntry.documentation);
                 configEntryStructs.add(configEntriesStruct);
                 if (configEntriesStruct.hasField(CONFIG_SYNONYMS_KEY_NAME)) {
                     List<Struct> configSynonymStructs = new ArrayList<>(configEntry.synonyms.size());
diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
index 5d4d13b..1438a36 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
@@ -17,9 +17,9 @@
   "apiKey": 32,
   "type": "request",
   "name": "DescribeConfigsRequest",
-  // Version 1 adds IncludeSynoyms.
+  // Version 1 adds IncludeSynonyms.
   // Version 2 is the same as version 1.
-  "validVersions": "0-2",
+  "validVersions": "0-3",
   "flexibleVersions": "none",
   "fields": [
     { "name": "Resources", "type": "[]DescribeConfigsResource", "versions": "0+",
@@ -31,7 +31,9 @@
       { "name": "ConfigurationKeys", "type": "[]string", "versions": "0+", "nullableVersions": "0+",
         "about": "The configuration keys to list, or null to list all configuration keys." }
     ]},
-    { "name": "IncludeSynoyms", "type": "bool", "versions": "1+", "default": "false", "ignorable": false,
-      "about": "True if we should include all synonyms." }
+    { "name": "IncludeSynonyms", "type": "bool", "versions": "1+", "default": "false", "ignorable": false,
+      "about": "True if we should include all synonyms." },
+    { "name": "IncludeDocumentation", "type": "bool", "versions": "3+", "default": "false", "ignorable": false,
+      "about": "True if we should include configuration documentation." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/DescribeConfigsResponse.json b/clients/src/main/resources/common/message/DescribeConfigsResponse.json
index 82e44f7..a70c959 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsResponse.json
@@ -19,7 +19,7 @@
   "name": "DescribeConfigsResponse",
   // Version 1 adds ConfigSource and the synonyms.
   // Starting in version 2, on quota violation, brokers send out responses before throttling.
-  "validVersions": "0-2",
+  "validVersions": "0-3",
   "flexibleVersions": "none",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -59,7 +59,11 @@
             "about": "The synonym value." },
           { "name": "Source", "type": "int8", "versions": "1+",
             "about": "The synonym source." }
-        ]}
+        ]},
+        { "name": "ConfigType", "type": "int8", "versions": "3+", "default": "0", "ignorable": true,
+          "about": "The configuration data type. Type can be one of the following values - BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD" },
+        { "name": "Documentation", "type": "string", "versions": "3+", "nullableVersions": "0+", "ignorable": true,
+          "about": "The configuration documentation." }
       ]}
     ]}
   ]
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
index 1146a35..ed4c0bd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.admin.ConfigEntry.ConfigType;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -92,6 +93,6 @@ public class ConfigTest {
 
     public static ConfigEntry newConfigEntry(String name, String value, ConfigEntry.ConfigSource source, boolean isSensitive,
                                              boolean isReadOnly, List<ConfigEntry.ConfigSynonym> synonyms) {
-        return new ConfigEntry(name, value, source, isSensitive, isReadOnly, synonyms);
+        return new ConfigEntry(name, value, source, isSensitive, isReadOnly, synonyms, ConfigType.UNKNOWN, null);
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index d395a87..73f83d7 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -482,6 +482,25 @@ public class AbstractConfigTest {
         assertEquals(config.originals().get("sasl.truststore.location"), "/usr/vault");
     }
 
+    @Test
+    public void testDocumentationOf() {
+        Properties props = new Properties();
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+
+        assertEquals(
+            config.documentationOf(TestIndirectConfigResolution.INDIRECT_CONFIGS),
+            TestIndirectConfigResolution.INDIRECT_CONFIGS_DOC
+        );
+    }
+
+    @Test
+    public void testDocumentationOfExpectNull() {
+        Properties props = new Properties();
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+
+        assertNull(config.documentationOf("xyz"));
+    }
+
     private static class TestIndirectConfigResolution extends AbstractConfig {
 
         private static final ConfigDef CONFIG;
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c712c18..c508dbc 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -144,6 +144,7 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.requests.CreateTopicsRequest.Builder;
+import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigType;
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
@@ -434,6 +435,9 @@ public class RequestResponseTest {
         checkResponse(createDescribeConfigsResponse(), 0, false);
         checkRequest(createDescribeConfigsRequest(1), true);
         checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false);
+        checkRequest(createDescribeConfigsRequestWithDocumentation(1), false);
+        checkRequest(createDescribeConfigsRequestWithDocumentation(2), false);
+        checkRequest(createDescribeConfigsRequestWithDocumentation(3), false);
         checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException(), true);
         checkResponse(createDescribeConfigsResponse(), 1, false);
         checkDescribeConfigsResponseVersions();
@@ -498,7 +502,12 @@ public class RequestResponseTest {
                 assertEquals(expectedEntry.value(), entry.value());
                 assertEquals(expectedEntry.isReadOnly(), entry.isReadOnly());
                 assertEquals(expectedEntry.isSensitive(), entry.isSensitive());
-                if (version == 1 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG &&
+                if (version < 3) {
+                    assertEquals(ConfigType.UNKNOWN, entry.type());
+                } else {
+                    assertEquals(expectedEntry.type(), entry.type());
+                }
+                if (version == 1 || version == 3 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG &&
                         expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
                     assertEquals(expectedEntry.source(), entry.source());
                 else
@@ -516,6 +525,10 @@ public class RequestResponseTest {
         DescribeConfigsResponse deserialized1 = (DescribeConfigsResponse) deserialize(response,
                 response.toStruct((short) 1), (short) 1);
         verifyDescribeConfigsResponse(response, deserialized1, 1);
+
+        DescribeConfigsResponse deserialized3 = (DescribeConfigsResponse) deserialize(response,
+            response.toStruct((short) 3), (short) 3);
+        verifyDescribeConfigsResponse(response, deserialized3, 3);
     }
 
     private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
@@ -1873,6 +1886,12 @@ public class RequestResponseTest {
         return new DescribeConfigsRequest.Builder(resources).build((short) version);
     }
 
+    private DescribeConfigsRequest createDescribeConfigsRequestWithDocumentation(int version) {
+        Map<ConfigResource, Collection<String>> resources = new HashMap<>();
+        resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), asList("foo", "bar"));
+        return new DescribeConfigsRequest.Builder(resources).includeDocumentation(true).build((short) version);
+    }
+
     private DescribeConfigsResponse createDescribeConfigsResponse() {
         Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>();
         List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList();
@@ -1880,7 +1899,10 @@ public class RequestResponseTest {
                 new DescribeConfigsResponse.ConfigEntry("config_name", "config_value",
                         DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms),
                 new DescribeConfigsResponse.ConfigEntry("another_name", "another value",
-                        DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms)
+                        DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms),
+                new DescribeConfigsResponse.ConfigEntry("yet_another_name", "yet another value",
+                        DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms,
+                            ConfigType.BOOLEAN, "some description")
         );
         configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new DescribeConfigsResponse.Config(
                 ApiError.NONE, configEntries));
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 3330ee9..183a5d3 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -164,7 +164,7 @@ class AdminManager(val config: KafkaConfig,
         // For responses with DescribeConfigs permission, populate metadata and configs
         includeConfigsAndMetatadata.get(topic.name).foreach { result =>
           val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), configs)
-          val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false)(_, _)
+          val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _)
           val topicConfigs = logConfig.values.asScala.map { case (k, v) =>
             val entry = createEntry(k, v)
             val source = ConfigSource.values.indices.map(_.toByte)
@@ -347,7 +347,7 @@ class AdminManager(val config: KafkaConfig,
     }
   }
 
-  def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = {
+  def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean, includeDocumentation: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = {
     resourceToConfigNames.map { case (resource, configNames) =>
 
       def allConfigs(config: AbstractConfig) = {
@@ -374,7 +374,7 @@ class AdminManager(val config: KafkaConfig,
               // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
               val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
               val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
-              createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+              createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation))
             } else {
               new DescribeConfigsResponse.Config(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
             }
@@ -382,10 +382,10 @@ class AdminManager(val config: KafkaConfig,
           case ConfigResource.Type.BROKER =>
             if (resource.name == null || resource.name.isEmpty)
               createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
-                createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms))
+                createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
             else if (resourceNameToBrokerId(resource.name) == config.brokerId)
               createResponseConfig(allConfigs(config),
-                createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms))
+                createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
             else
               throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.name}")
 
@@ -658,6 +658,27 @@ class AdminManager(val config: KafkaConfig,
     DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
   }
 
+  private def brokerDocumentation(name: String): String = {
+    config.documentationOf(name)
+  }
+
+  private def configResponseType(configType: Option[ConfigDef.Type]): DescribeConfigsResponse.ConfigType = {
+    if (configType.isEmpty)
+      DescribeConfigsResponse.ConfigType.UNKNOWN
+    else configType.get match {
+      case ConfigDef.Type.BOOLEAN => DescribeConfigsResponse.ConfigType.BOOLEAN
+      case ConfigDef.Type.STRING => DescribeConfigsResponse.ConfigType.STRING
+      case ConfigDef.Type.INT => DescribeConfigsResponse.ConfigType.INT
+      case ConfigDef.Type.SHORT => DescribeConfigsResponse.ConfigType.SHORT
+      case ConfigDef.Type.LONG => DescribeConfigsResponse.ConfigType.LONG
+      case ConfigDef.Type.DOUBLE => DescribeConfigsResponse.ConfigType.DOUBLE
+      case ConfigDef.Type.LIST => DescribeConfigsResponse.ConfigType.LIST
+      case ConfigDef.Type.CLASS => DescribeConfigsResponse.ConfigType.CLASS
+      case ConfigDef.Type.PASSWORD => DescribeConfigsResponse.ConfigType.PASSWORD
+      case _ => DescribeConfigsResponse.ConfigType.UNKNOWN
+    }
+  }
+
   private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = {
     val dynamicConfig = config.dynamicConfig
     val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
@@ -676,7 +697,7 @@ class AdminManager(val config: KafkaConfig,
     allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config
   }
 
-  private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean)
+  private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
                                     (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
     val configEntryType = LogConfig.configType(name)
     val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
@@ -692,10 +713,12 @@ class AdminManager(val config: KafkaConfig,
     }
     val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
     val synonyms = if (!includeSynonyms) List.empty else allSynonyms
-    new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava)
+    val dataType = configResponseType(configEntryType)
+    val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null
+    new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava, dataType, configDocumentation)
   }
 
-  private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
+  private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean, includeDocumentation: Boolean)
                                      (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
     val allNames = brokerSynonyms(name)
     val configEntryType = KafkaConfig.configType(name)
@@ -711,7 +734,9 @@ class AdminManager(val config: KafkaConfig,
     val synonyms = if (!includeSynonyms) List.empty else allSynonyms
     val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
     val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
-    new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava)
+    val dataType = configResponseType(configEntryType)
+    val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null
+    new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava, dataType, configDocumentation)
   }
 
   private def sanitizeEntityName(entityName: String): String =
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 28e20cd..86649c1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2579,7 +2579,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
     val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource =>
       resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
-    }.toMap, describeConfigsRequest.includeSynonyms)
+    }.toMap, describeConfigsRequest.includeSynonyms, describeConfigsRequest.includeDocumentation)
     val unauthorizedConfigs = unauthorizedResources.map { resource =>
       val error = configsAuthorizationApiError(resource)
       resource -> new DescribeConfigsResponse.Config(error, util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 0b9c4a4..9da7319 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -257,7 +257,7 @@ class KafkaApisTest {
 
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
     val config = new DescribeConfigsResponse.Config(ApiError.NONE, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
-    EasyMock.expect(adminManager.describeConfigs(anyObject(), EasyMock.eq(true)))
+    EasyMock.expect(adminManager.describeConfigs(anyObject(), EasyMock.eq(true), EasyMock.eq(false)))
         .andReturn(Map(configResource -> config))
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,