You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/05/29 22:29:55 UTC
[kafka] branch 2.6 updated: KAFKA-9494;
Include additional metadata information in DescribeConfig response
(KIP-569) (#8723)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 2965c9c KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (#8723)
2965c9c is described below
commit 2965c9c7b44fef6ae953e69f07394cf3b5f6d120
Author: Shailesh Panwar <52...@users.noreply.github.com>
AuthorDate: Fri May 29 15:18:50 2020 -0700
KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (#8723)
Adds documentation and type of ConfigEntry in version 3 of DescribeConfigsResponse
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../apache/kafka/clients/admin/ConfigEntry.java | 39 ++++++++-
.../clients/admin/DescribeConfigsOptions.java | 15 ++++
.../kafka/clients/admin/KafkaAdminClient.java | 55 +++++++++++--
.../apache/kafka/common/config/AbstractConfig.java | 7 ++
.../common/requests/DescribeConfigsRequest.java | 39 ++++++++-
.../common/requests/DescribeConfigsResponse.java | 94 +++++++++++++++++++++-
.../common/message/DescribeConfigsRequest.json | 10 ++-
.../common/message/DescribeConfigsResponse.json | 8 +-
.../org/apache/kafka/clients/admin/ConfigTest.java | 3 +-
.../kafka/common/config/AbstractConfigTest.java | 19 +++++
.../kafka/common/requests/RequestResponseTest.java | 26 +++++-
.../src/main/scala/kafka/server/AdminManager.scala | 43 +++++++---
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
14 files changed, 328 insertions(+), 34 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 42cc627..b6d947f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -37,6 +37,8 @@ public class ConfigEntry {
private final boolean isSensitive;
private final boolean isReadOnly;
private final List<ConfigSynonym> synonyms;
+ private final ConfigType type;
+ private final String documentation;
/**
* Create a configuration entry with the provided values.
@@ -65,7 +67,9 @@ public class ConfigEntry {
isDefault ? ConfigSource.DEFAULT_CONFIG : ConfigSource.UNKNOWN,
isSensitive,
isReadOnly,
- Collections.<ConfigSynonym>emptyList());
+ Collections.<ConfigSynonym>emptyList(),
+ ConfigType.UNKNOWN,
+ null);
}
/**
@@ -79,7 +83,7 @@ public class ConfigEntry {
* @param synonyms Synonym configs in order of precedence
*/
ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly,
- List<ConfigSynonym> synonyms) {
+ List<ConfigSynonym> synonyms, ConfigType type, String documentation) {
Objects.requireNonNull(name, "name should not be null");
this.name = name;
this.value = value;
@@ -87,6 +91,8 @@ public class ConfigEntry {
this.isSensitive = isSensitive;
this.isReadOnly = isReadOnly;
this.synonyms = synonyms;
+ this.type = type;
+ this.documentation = documentation;
}
/**
@@ -141,6 +147,20 @@ public class ConfigEntry {
return synonyms;
}
+ /**
+ * Return the config data type.
+ */
+ public ConfigType type() {
+ return type;
+ }
+
+ /**
+ * Return the config documentation.
+ */
+ public String documentation() {
+ return documentation;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -183,6 +203,21 @@ public class ConfigEntry {
")";
}
+ /**
+ * Data type of configuration entry.
+ */
+ public enum ConfigType {
+ UNKNOWN,
+ BOOLEAN,
+ STRING,
+ INT,
+ SHORT,
+ LONG,
+ DOUBLE,
+ LIST,
+ CLASS,
+ PASSWORD
+ }
/**
* Source of configuration entries.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
index 450cb82..bfb9c18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -30,6 +30,7 @@ import java.util.Collection;
public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
private boolean includeSynonyms = false;
+ private boolean includeDocumentation = false;
/**
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
@@ -50,6 +51,13 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
}
/**
+ * Return true if config documentation should be returned in the response.
+ */
+ public boolean includeDocumentation() {
+ return includeDocumentation;
+ }
+
+ /**
* Set to true if synonym configs should be returned in the response.
*/
public DescribeConfigsOptions includeSynonyms(boolean includeSynonyms) {
@@ -57,4 +65,11 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
return this;
}
+ /**
+ * Set to true if config documentation should be returned in the response.
+ */
+ public DescribeConfigsOptions includeDocumentation(boolean includeDocumentation) {
+ this.includeDocumentation = includeDocumentation;
+ return this;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 52e5c37..a968eed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1472,7 +1472,9 @@ public class KafkaAdminClient extends AdminClient {
configSource(DescribeConfigsResponse.ConfigSource.forId(config.configSource())),
config.isSensitive(),
config.readOnly(),
- Collections.emptyList()))
+ Collections.emptyList(),
+ null,
+ null))
.collect(Collectors.toSet()));
topicMetadataAndConfig = new TopicMetadataAndConfig(result.numPartitions(),
result.replicationFactor(),
@@ -1936,7 +1938,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
return new DescribeConfigsRequest.Builder(unifiedRequestResources)
- .includeSynonyms(options.includeSynonyms());
+ .includeSynonyms(options.includeSynonyms())
+ .includeDocumentation(options.includeDocumentation());
}
@Override
@@ -1960,7 +1963,8 @@ public class KafkaAdminClient extends AdminClient {
configEntries.add(new ConfigEntry(configEntry.name(),
configEntry.value(), configSource(configEntry.source()),
configEntry.isSensitive(), configEntry.isReadOnly(),
- configSynonyms(configEntry)));
+ configSynonyms(configEntry), configType(configEntry.type()),
+ configEntry.documentation()));
}
future.complete(new Config(configEntries));
}
@@ -1983,7 +1987,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
return new DescribeConfigsRequest.Builder(Collections.singleton(resource))
- .includeSynonyms(options.includeSynonyms());
+ .includeSynonyms(options.includeSynonyms())
+ .includeDocumentation(options.includeDocumentation());
}
@Override
@@ -2003,7 +2008,7 @@ public class KafkaAdminClient extends AdminClient {
for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(),
- configSynonyms(configEntry)));
+ configSynonyms(configEntry), configType(configEntry.type()), configEntry.documentation()));
}
brokerFuture.complete(new Config(configEntries));
}
@@ -2056,6 +2061,46 @@ public class KafkaAdminClient extends AdminClient {
return configSource;
}
+ private ConfigEntry.ConfigType configType(DescribeConfigsResponse.ConfigType type) {
+ if (type == null) {
+ return ConfigEntry.ConfigType.UNKNOWN;
+ }
+
+ ConfigEntry.ConfigType configType;
+ switch (type) {
+ case BOOLEAN:
+ configType = ConfigEntry.ConfigType.BOOLEAN;
+ break;
+ case CLASS:
+ configType = ConfigEntry.ConfigType.CLASS;
+ break;
+ case DOUBLE:
+ configType = ConfigEntry.ConfigType.DOUBLE;
+ break;
+ case INT:
+ configType = ConfigEntry.ConfigType.INT;
+ break;
+ case LIST:
+ configType = ConfigEntry.ConfigType.LIST;
+ break;
+ case LONG:
+ configType = ConfigEntry.ConfigType.LONG;
+ break;
+ case PASSWORD:
+ configType = ConfigEntry.ConfigType.PASSWORD;
+ break;
+ case SHORT:
+ configType = ConfigEntry.ConfigType.SHORT;
+ break;
+ case STRING:
+ configType = ConfigEntry.ConfigType.STRING;
+ break;
+ default:
+ configType = ConfigEntry.ConfigType.UNKNOWN;
+ }
+ return configType;
+ }
+
@Override
@Deprecated
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e28b8c9..968c549 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -201,6 +201,13 @@ public class AbstractConfig {
return configKey.type;
}
+ public String documentationOf(String key) {
+ ConfigDef.ConfigKey configKey = definition.configKeys().get(key);
+ if (configKey == null)
+ return null;
+ return configKey.documentation;
+ }
+
public Password getPassword(String key) {
return (Password) get(key);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 4bcc380..8ea7630 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -43,6 +43,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
private static final String CONFIG_NAMES_KEY_NAME = "config_names";
+ private static final String INCLUDE_DOCUMENTATION = "include_documentation";
private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
new Field(RESOURCE_TYPE_KEY_NAME, INT8),
@@ -61,13 +62,24 @@ public class DescribeConfigsRequest extends AbstractRequest {
*/
private static final Schema DESCRIBE_CONFIGS_REQUEST_V2 = DESCRIBE_CONFIGS_REQUEST_V1;
+ private static final Schema DESCRIBE_CONFIGS_REQUEST_V3 = new Schema(
+ new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."),
+ new Field(INCLUDE_SYNONYMS, BOOLEAN),
+ new Field(INCLUDE_DOCUMENTATION, BOOLEAN));
+
public static Schema[] schemaVersions() {
- return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2};
+ return new Schema[] {
+ DESCRIBE_CONFIGS_REQUEST_V0,
+ DESCRIBE_CONFIGS_REQUEST_V1,
+ DESCRIBE_CONFIGS_REQUEST_V2,
+ DESCRIBE_CONFIGS_REQUEST_V3
+ };
}
public static class Builder extends AbstractRequest.Builder<DescribeConfigsRequest> {
private final Map<ConfigResource, Collection<String>> resourceToConfigNames;
private boolean includeSynonyms;
+ private boolean includeDocumentation;
public Builder(Map<ConfigResource, Collection<String>> resourceToConfigNames) {
super(ApiKeys.DESCRIBE_CONFIGS);
@@ -79,6 +91,11 @@ public class DescribeConfigsRequest extends AbstractRequest {
return this;
}
+ public Builder includeDocumentation(boolean includeDocumentation) {
+ this.includeDocumentation = includeDocumentation;
+ return this;
+ }
+
public Builder(Collection<ConfigResource> resources) {
this(toResourceToConfigNames(resources));
}
@@ -92,17 +109,27 @@ public class DescribeConfigsRequest extends AbstractRequest {
@Override
public DescribeConfigsRequest build(short version) {
- return new DescribeConfigsRequest(version, resourceToConfigNames, includeSynonyms);
+ return new DescribeConfigsRequest(
+ version, resourceToConfigNames, includeSynonyms, includeDocumentation);
}
}
private final Map<ConfigResource, Collection<String>> resourceToConfigNames;
private final boolean includeSynonyms;
+ private final boolean includeDocumentation;
- public DescribeConfigsRequest(short version, Map<ConfigResource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) {
+ public DescribeConfigsRequest(
+ short version, Map<ConfigResource, Collection<String>> resourceToConfigNames,
+ boolean includeSynonyms) {
+ this(version, resourceToConfigNames, includeSynonyms, false);
+ }
+ public DescribeConfigsRequest(
+ short version, Map<ConfigResource, Collection<String>> resourceToConfigNames,
+ boolean includeSynonyms, boolean includeDocumentation) {
super(ApiKeys.DESCRIBE_CONFIGS, version);
this.resourceToConfigNames = Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames");
this.includeSynonyms = includeSynonyms;
+ this.includeDocumentation = includeDocumentation;
}
public DescribeConfigsRequest(Struct struct, short version) {
@@ -125,6 +152,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
resourceToConfigNames.put(new ConfigResource(resourceType, resourceName), configNames);
}
this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? struct.getBoolean(INCLUDE_SYNONYMS) : false;
+ this.includeDocumentation = struct.hasField(INCLUDE_DOCUMENTATION) ? struct.getBoolean(INCLUDE_DOCUMENTATION) : false;
}
public Collection<ConfigResource> resources() {
@@ -142,6 +170,10 @@ public class DescribeConfigsRequest extends AbstractRequest {
return includeSynonyms;
}
+ public boolean includeDocumentation() {
+ return includeDocumentation;
+ }
+
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
@@ -159,6 +191,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
}
struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
struct.setIfExists(INCLUDE_SYNONYMS, includeSynonyms);
+ struct.setIfExists(INCLUDE_DOCUMENTATION, includeDocumentation);
return struct;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index c24bbe6..bd46bc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -56,6 +56,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
private static final String IS_SENSITIVE_KEY_NAME = "is_sensitive";
private static final String IS_DEFAULT_KEY_NAME = "is_default";
private static final String READ_ONLY_KEY_NAME = "read_only";
+ private static final String CONFIG_TYPE_KEY_NAME = "config_type";
+ private static final String CONFIG_DOCUMENTATION_KEY_NAME = "config_documentation";
private static final String CONFIG_SYNONYMS_KEY_NAME = "config_synonyms";
private static final String CONFIG_SOURCE_KEY_NAME = "config_source";
@@ -80,6 +82,16 @@ public class DescribeConfigsResponse extends AbstractResponse {
new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)));
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V3 = new Schema(
+ new Field(CONFIG_NAME_KEY_NAME, STRING),
+ new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+ new Field(READ_ONLY_KEY_NAME, BOOLEAN),
+ new Field(CONFIG_SOURCE_KEY_NAME, INT8),
+ new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
+ new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)),
+ new Field(CONFIG_TYPE_KEY_NAME, INT8),
+ new Field(CONFIG_DOCUMENTATION_KEY_NAME, NULLABLE_STRING));
+
private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
ERROR_CODE,
ERROR_MESSAGE,
@@ -94,6 +106,13 @@ public class DescribeConfigsResponse extends AbstractResponse {
new Field(RESOURCE_NAME_KEY_NAME, STRING),
new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V1)));
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V3 = new Schema(
+ ERROR_CODE,
+ ERROR_MESSAGE,
+ new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+ new Field(RESOURCE_NAME_KEY_NAME, STRING),
+ new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V3)));
+
private static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0)));
@@ -102,13 +121,22 @@ public class DescribeConfigsResponse extends AbstractResponse {
THROTTLE_TIME_MS,
new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1)));
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_V3 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V3)));
+
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema DESCRIBE_CONFIGS_RESPONSE_V2 = DESCRIBE_CONFIGS_RESPONSE_V1;
public static Schema[] schemaVersions() {
- return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0, DESCRIBE_CONFIGS_RESPONSE_V1, DESCRIBE_CONFIGS_RESPONSE_V2};
+ return new Schema[]{
+ DESCRIBE_CONFIGS_RESPONSE_V0,
+ DESCRIBE_CONFIGS_RESPONSE_V1,
+ DESCRIBE_CONFIGS_RESPONSE_V2,
+ DESCRIBE_CONFIGS_RESPONSE_V3
+ };
}
public static class Config {
@@ -136,9 +164,16 @@ public class DescribeConfigsResponse extends AbstractResponse {
private final ConfigSource source;
private final boolean readOnly;
private final Collection<ConfigSynonym> synonyms;
+ private final ConfigType type;
+ private final String documentation;
public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly,
- Collection<ConfigSynonym> synonyms) {
+ Collection<ConfigSynonym> synonyms) {
+ this(name, value, source, isSensitive, readOnly, synonyms, ConfigType.UNKNOWN, null);
+ }
+
+ public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly,
+ Collection<ConfigSynonym> synonyms, ConfigType type, String documentation) {
this.name = Objects.requireNonNull(name, "name");
this.value = value;
@@ -146,6 +181,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
this.isSensitive = isSensitive;
this.readOnly = readOnly;
this.synonyms = Objects.requireNonNull(synonyms, "synonyms");
+ this.type = type;
+ this.documentation = documentation;
}
public String name() {
@@ -171,6 +208,14 @@ public class DescribeConfigsResponse extends AbstractResponse {
public Collection<ConfigSynonym> synonyms() {
return synonyms;
}
+
+ public ConfigType type() {
+ return type;
+ }
+
+ public String documentation() {
+ return documentation;
+ }
}
public enum ConfigSource {
@@ -198,6 +243,34 @@ public class DescribeConfigsResponse extends AbstractResponse {
}
}
+ public enum ConfigType {
+ UNKNOWN((byte) 0),
+ BOOLEAN((byte) 1),
+ STRING((byte) 2),
+ INT((byte) 3),
+ SHORT((byte) 4),
+ LONG((byte) 5),
+ DOUBLE((byte) 6),
+ LIST((byte) 7),
+ CLASS((byte) 8),
+ PASSWORD((byte) 9);
+
+ final byte id;
+ private static final ConfigType[] VALUES = values();
+
+ ConfigType(byte id) {
+ this.id = id;
+ }
+
+ public static ConfigType forId(byte id) {
+ if (id < 0)
+ throw new IllegalArgumentException("id should be positive, id: " + id);
+ if (id >= VALUES.length)
+ return UNKNOWN;
+ return VALUES[id];
+ }
+ }
+
public static class ConfigSynonym {
private final String name;
private final String value;
@@ -248,6 +321,14 @@ public class DescribeConfigsResponse extends AbstractResponse {
String configName = configEntriesStruct.getString(CONFIG_NAME_KEY_NAME);
String configValue = configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME);
boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME);
+ ConfigType type = ConfigType.UNKNOWN;
+ if (configEntriesStruct.hasField(CONFIG_TYPE_KEY_NAME)) {
+ type = ConfigType.forId(configEntriesStruct.getByte(CONFIG_TYPE_KEY_NAME));
+ }
+ String documentation = null;
+ if (configEntriesStruct.hasField(CONFIG_DOCUMENTATION_KEY_NAME)) {
+ documentation = configEntriesStruct.getString(CONFIG_DOCUMENTATION_KEY_NAME);
+ }
ConfigSource configSource;
if (configEntriesStruct.hasField(CONFIG_SOURCE_KEY_NAME))
configSource = ConfigSource.forId(configEntriesStruct.getByte(CONFIG_SOURCE_KEY_NAME));
@@ -281,9 +362,10 @@ public class DescribeConfigsResponse extends AbstractResponse {
ConfigSource source = ConfigSource.forId(synonymStruct.getByte(CONFIG_SOURCE_KEY_NAME));
synonyms.add(new ConfigSynonym(synonymConfigName, synonymConfigValue, source));
}
- } else
+ } else {
synonyms = Collections.emptyList();
- configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms));
+ }
+ configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms, type, documentation));
}
Config config = new Config(error, configEntries);
configs.put(resource, config);
@@ -336,6 +418,10 @@ public class DescribeConfigsResponse extends AbstractResponse {
configEntriesStruct.setIfExists(CONFIG_SOURCE_KEY_NAME, configEntry.source.id);
configEntriesStruct.setIfExists(IS_DEFAULT_KEY_NAME, configEntry.source == ConfigSource.DEFAULT_CONFIG);
configEntriesStruct.set(READ_ONLY_KEY_NAME, configEntry.readOnly);
+ if (configEntriesStruct.hasField(CONFIG_TYPE_KEY_NAME) && configEntry.type != null) {
+ configEntriesStruct.set(CONFIG_TYPE_KEY_NAME, configEntry.type.id);
+ }
+ configEntriesStruct.setIfExists(CONFIG_DOCUMENTATION_KEY_NAME, configEntry.documentation);
configEntryStructs.add(configEntriesStruct);
if (configEntriesStruct.hasField(CONFIG_SYNONYMS_KEY_NAME)) {
List<Struct> configSynonymStructs = new ArrayList<>(configEntry.synonyms.size());
diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
index 5d4d13b..1438a36 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
@@ -17,9 +17,9 @@
"apiKey": 32,
"type": "request",
"name": "DescribeConfigsRequest",
- // Version 1 adds IncludeSynoyms.
+ // Version 1 adds IncludeSynonyms.
// Version 2 is the same as version 1.
- "validVersions": "0-2",
+ "validVersions": "0-3",
"flexibleVersions": "none",
"fields": [
{ "name": "Resources", "type": "[]DescribeConfigsResource", "versions": "0+",
@@ -31,7 +31,9 @@
{ "name": "ConfigurationKeys", "type": "[]string", "versions": "0+", "nullableVersions": "0+",
"about": "The configuration keys to list, or null to list all configuration keys." }
]},
- { "name": "IncludeSynoyms", "type": "bool", "versions": "1+", "default": "false", "ignorable": false,
- "about": "True if we should include all synonyms." }
+ { "name": "IncludeSynonyms", "type": "bool", "versions": "1+", "default": "false", "ignorable": false,
+ "about": "True if we should include all synonyms." },
+ { "name": "IncludeDocumentation", "type": "bool", "versions": "3+", "default": "false", "ignorable": false,
+ "about": "True if we should include configuration documentation." }
]
}
diff --git a/clients/src/main/resources/common/message/DescribeConfigsResponse.json b/clients/src/main/resources/common/message/DescribeConfigsResponse.json
index 82e44f7..a70c959 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsResponse.json
@@ -19,7 +19,7 @@
"name": "DescribeConfigsResponse",
// Version 1 adds ConfigSource and the synonyms.
// Starting in version 2, on quota violation, brokers send out responses before throttling.
- "validVersions": "0-2",
+ "validVersions": "0-3",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -59,7 +59,11 @@
"about": "The synonym value." },
{ "name": "Source", "type": "int8", "versions": "1+",
"about": "The synonym source." }
- ]}
+ ]},
+ { "name": "ConfigType", "type": "int8", "versions": "3+", "default": "0", "ignorable": true,
+ "about": "The configuration data type. Type can be one of the following values - BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD" },
+ { "name": "Documentation", "type": "string", "versions": "3+", "nullableVersions": "0+", "ignorable": true,
+ "about": "The configuration documentation." }
]}
]}
]
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
index 1146a35..ed4c0bd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.clients.admin.ConfigEntry.ConfigType;
import org.junit.Before;
import org.junit.Test;
@@ -92,6 +93,6 @@ public class ConfigTest {
public static ConfigEntry newConfigEntry(String name, String value, ConfigEntry.ConfigSource source, boolean isSensitive,
boolean isReadOnly, List<ConfigEntry.ConfigSynonym> synonyms) {
- return new ConfigEntry(name, value, source, isSensitive, isReadOnly, synonyms);
+ return new ConfigEntry(name, value, source, isSensitive, isReadOnly, synonyms, ConfigType.UNKNOWN, null);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index d395a87..73f83d7 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -482,6 +482,25 @@ public class AbstractConfigTest {
assertEquals(config.originals().get("sasl.truststore.location"), "/usr/vault");
}
+ @Test
+ public void testDocumentationOf() {
+ Properties props = new Properties();
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+
+ assertEquals(
+ config.documentationOf(TestIndirectConfigResolution.INDIRECT_CONFIGS),
+ TestIndirectConfigResolution.INDIRECT_CONFIGS_DOC
+ );
+ }
+
+ @Test
+ public void testDocumentationOfExpectNull() {
+ Properties props = new Properties();
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+
+ assertNull(config.documentationOf("xyz"));
+ }
+
private static class TestIndirectConfigResolution extends AbstractConfig {
private static final ConfigDef CONFIG;
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c712c18..c508dbc 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -144,6 +144,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.CreateTopicsRequest.Builder;
+import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigType;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
@@ -434,6 +435,9 @@ public class RequestResponseTest {
checkResponse(createDescribeConfigsResponse(), 0, false);
checkRequest(createDescribeConfigsRequest(1), true);
checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false);
+ checkRequest(createDescribeConfigsRequestWithDocumentation(1), false);
+ checkRequest(createDescribeConfigsRequestWithDocumentation(2), false);
+ checkRequest(createDescribeConfigsRequestWithDocumentation(3), false);
checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException(), true);
checkResponse(createDescribeConfigsResponse(), 1, false);
checkDescribeConfigsResponseVersions();
@@ -498,7 +502,12 @@ public class RequestResponseTest {
assertEquals(expectedEntry.value(), entry.value());
assertEquals(expectedEntry.isReadOnly(), entry.isReadOnly());
assertEquals(expectedEntry.isSensitive(), entry.isSensitive());
- if (version == 1 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG &&
+ if (version < 3) {
+ assertEquals(ConfigType.UNKNOWN, entry.type());
+ } else {
+ assertEquals(expectedEntry.type(), entry.type());
+ }
+ if (version == 1 || version == 3 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG &&
expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
assertEquals(expectedEntry.source(), entry.source());
else
@@ -516,6 +525,10 @@ public class RequestResponseTest {
DescribeConfigsResponse deserialized1 = (DescribeConfigsResponse) deserialize(response,
response.toStruct((short) 1), (short) 1);
verifyDescribeConfigsResponse(response, deserialized1, 1);
+
+ DescribeConfigsResponse deserialized3 = (DescribeConfigsResponse) deserialize(response,
+ response.toStruct((short) 3), (short) 3);
+ verifyDescribeConfigsResponse(response, deserialized3, 3);
}
private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
@@ -1873,6 +1886,12 @@ public class RequestResponseTest {
return new DescribeConfigsRequest.Builder(resources).build((short) version);
}
+ private DescribeConfigsRequest createDescribeConfigsRequestWithDocumentation(int version) {
+ Map<ConfigResource, Collection<String>> resources = new HashMap<>();
+ resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), asList("foo", "bar"));
+ return new DescribeConfigsRequest.Builder(resources).includeDocumentation(true).build((short) version);
+ }
+
private DescribeConfigsResponse createDescribeConfigsResponse() {
Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>();
List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList();
@@ -1880,7 +1899,10 @@ public class RequestResponseTest {
new DescribeConfigsResponse.ConfigEntry("config_name", "config_value",
DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms),
new DescribeConfigsResponse.ConfigEntry("another_name", "another value",
- DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms)
+ DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms),
+ new DescribeConfigsResponse.ConfigEntry("yet_another_name", "yet another value",
+ DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms,
+ ConfigType.BOOLEAN, "some description")
);
configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new DescribeConfigsResponse.Config(
ApiError.NONE, configEntries));
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 3330ee9..183a5d3 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -164,7 +164,7 @@ class AdminManager(val config: KafkaConfig,
// For responses with DescribeConfigs permission, populate metadata and configs
includeConfigsAndMetatadata.get(topic.name).foreach { result =>
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), configs)
- val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false)(_, _)
+ val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _)
val topicConfigs = logConfig.values.asScala.map { case (k, v) =>
val entry = createEntry(k, v)
val source = ConfigSource.values.indices.map(_.toByte)
@@ -347,7 +347,7 @@ class AdminManager(val config: KafkaConfig,
}
}
- def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = {
+ def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean, includeDocumentation: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = {
resourceToConfigNames.map { case (resource, configNames) =>
def allConfigs(config: AbstractConfig) = {
@@ -374,7 +374,7 @@ class AdminManager(val config: KafkaConfig,
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
- createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+ createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation))
} else {
new DescribeConfigsResponse.Config(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
}
@@ -382,10 +382,10 @@ class AdminManager(val config: KafkaConfig,
case ConfigResource.Type.BROKER =>
if (resource.name == null || resource.name.isEmpty)
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
- createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms))
+ createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
else if (resourceNameToBrokerId(resource.name) == config.brokerId)
createResponseConfig(allConfigs(config),
- createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms))
+ createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
else
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.name}")
@@ -658,6 +658,27 @@ class AdminManager(val config: KafkaConfig,
DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
}
+ private def brokerDocumentation(name: String): String = {
+ config.documentationOf(name)
+ }
+
+ private def configResponseType(configType: Option[ConfigDef.Type]): DescribeConfigsResponse.ConfigType = {
+ if (configType.isEmpty)
+ DescribeConfigsResponse.ConfigType.UNKNOWN
+ else configType.get match {
+ case ConfigDef.Type.BOOLEAN => DescribeConfigsResponse.ConfigType.BOOLEAN
+ case ConfigDef.Type.STRING => DescribeConfigsResponse.ConfigType.STRING
+ case ConfigDef.Type.INT => DescribeConfigsResponse.ConfigType.INT
+ case ConfigDef.Type.SHORT => DescribeConfigsResponse.ConfigType.SHORT
+ case ConfigDef.Type.LONG => DescribeConfigsResponse.ConfigType.LONG
+ case ConfigDef.Type.DOUBLE => DescribeConfigsResponse.ConfigType.DOUBLE
+ case ConfigDef.Type.LIST => DescribeConfigsResponse.ConfigType.LIST
+ case ConfigDef.Type.CLASS => DescribeConfigsResponse.ConfigType.CLASS
+ case ConfigDef.Type.PASSWORD => DescribeConfigsResponse.ConfigType.PASSWORD
+ case _ => DescribeConfigsResponse.ConfigType.UNKNOWN
+ }
+ }
+
private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = {
val dynamicConfig = config.dynamicConfig
val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
@@ -676,7 +697,7 @@ class AdminManager(val config: KafkaConfig,
allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config
}
- private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean)
+ private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
val configEntryType = LogConfig.configType(name)
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
@@ -692,10 +713,12 @@ class AdminManager(val config: KafkaConfig,
}
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
- new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava)
+ val dataType = configResponseType(configEntryType)
+ val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null
+ new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava, dataType, configDocumentation)
}
- private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
+ private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
val allNames = brokerSynonyms(name)
val configEntryType = KafkaConfig.configType(name)
@@ -711,7 +734,9 @@ class AdminManager(val config: KafkaConfig,
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
- new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava)
+ val dataType = configResponseType(configEntryType)
+ val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null
+ new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava, dataType, configDocumentation)
}
private def sanitizeEntityName(entityName: String): String =
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 28e20cd..86649c1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2579,7 +2579,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource =>
resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
- }.toMap, describeConfigsRequest.includeSynonyms)
+ }.toMap, describeConfigsRequest.includeSynonyms, describeConfigsRequest.includeDocumentation)
val unauthorizedConfigs = unauthorizedResources.map { resource =>
val error = configsAuthorizationApiError(resource)
resource -> new DescribeConfigsResponse.Config(error, util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 0b9c4a4..9da7319 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -257,7 +257,7 @@ class KafkaApisTest {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
val config = new DescribeConfigsResponse.Config(ApiError.NONE, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
- EasyMock.expect(adminManager.describeConfigs(anyObject(), EasyMock.eq(true)))
+ EasyMock.expect(adminManager.describeConfigs(anyObject(), EasyMock.eq(true), EasyMock.eq(false)))
.andReturn(Map(configResource -> config))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,