You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/16 00:51:49 UTC
kafka git commit: KAFKA-3526: Return string instead of object in
ConfigKeyInfo and ConfigValueInfo
Repository: kafka
Updated Branches:
refs/heads/trunk 065ddf901 -> 5236bf60d
KAFKA-3526: Return string instead of object in ConfigKeyInfo and ConfigValueInfo
Author: Liquan Pei <li...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1200 from Ishiihara/config-string
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5236bf60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5236bf60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5236bf60
Branch: refs/heads/trunk
Commit: 5236bf60debbb0c08010315a92dd3fbfa482e871
Parents: 065ddf9
Author: Liquan Pei <li...@gmail.com>
Authored: Fri Apr 15 15:51:31 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 15 15:51:31 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/common/config/ConfigDef.java | 35 +++++++++--
.../kafka/common/config/ConfigDefTest.java | 3 +-
.../kafka/connect/runtime/AbstractHerder.java | 40 +++++++++----
.../runtime/rest/entities/ConfigKeyInfo.java | 6 +-
.../runtime/rest/entities/ConfigValueInfo.java | 12 ++--
.../resources/ConnectorPluginsResourceTest.java | 63 ++++++++++++++++----
6 files changed, 120 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 881cb0b..1df55d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -538,19 +538,18 @@ public class ConfigDef {
}
}
+ @SuppressWarnings("unchecked")
private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
if (!configKeys.containsKey(name)) {
return;
}
ConfigKey key = configKeys.get(name);
ConfigValue config = configs.get(name);
- Object value = parsed.get(name);
List<Object> recommendedValues;
if (key.recommender != null) {
try {
recommendedValues = key.recommender.validValues(name, parsed);
List<Object> originalRecommendedValues = config.recommendedValues();
-
if (!originalRecommendedValues.isEmpty()) {
Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
Iterator<Object> it = recommendedValues.iterator();
@@ -562,9 +561,6 @@ public class ConfigDef {
}
}
config.recommendedValues(recommendedValues);
- if (value != null && !recommendedValues.isEmpty() && !recommendedValues.contains(value)) {
- config.addErrorMessage("Invalid value for configuration " + key.name);
- }
config.visible(key.recommender.visible(name, parsed));
} catch (ConfigException e) {
config.addErrorMessage(e.getMessage());
@@ -676,6 +672,35 @@ public class ConfigDef {
}
}
+ public static String convertToString(Object parsedValue, Type type) {
+ if (parsedValue == null) {
+ return null;
+ }
+
+ if (type == null) {
+ return parsedValue.toString();
+ }
+
+ switch (type) {
+ case BOOLEAN:
+ case SHORT:
+ case INT:
+ case LONG:
+ case DOUBLE:
+ case STRING:
+ case PASSWORD:
+ return parsedValue.toString();
+ case LIST:
+ List<?> valueList = (List<?>) parsedValue;
+ return Utils.join(valueList, ",");
+ case CLASS:
+ Class<?> clazz = (Class<?>) parsedValue;
+ return clazz.getCanonicalName();
+ default:
+ throw new IllegalStateException("Unknown type.");
+ }
+ }
+
/**
* The config types
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 022fb6b..e20e422 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -236,12 +236,11 @@ public class ConfigDefTest {
Map<String, ConfigValue> expected = new HashMap<>();
String errorMessageB = "Missing required configuration \"b\" which has no default value.";
String errorMessageC = "Missing required configuration \"c\" which has no default value.";
- String errorMessageD = "Invalid value for configuration d";
ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB));
ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
- ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Arrays.asList(errorMessageD));
+ ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
expected.put("a", configA);
expected.put("b", configB);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index a97c4db..1d87d60 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -230,16 +231,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
configValueMap.put(configName, configValue);
if (!configKeys.containsKey(configName)) {
configValue.addErrorMessage("Configuration is not defined: " + configName);
- configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue)));
+ configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue, null)));
}
}
- for (String configName: configKeys.keySet()) {
- ConfigKeyInfo configKeyInfo = convertConfigKey(configKeys.get(configName));
+ for (Map.Entry<String, ConfigKey> entry : configKeys.entrySet()) {
+ String configName = entry.getKey();
+ ConfigKeyInfo configKeyInfo = convertConfigKey(entry.getValue());
+ Type type = entry.getValue().type;
ConfigValueInfo configValueInfo = null;
if (configValueMap.containsKey(configName)) {
ConfigValue configValue = configValueMap.get(configName);
- configValueInfo = convertConfigValue(configValue);
+ configValueInfo = convertConfigValue(configValue, type);
errorCount += configValue.errorMessages().size();
}
configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
@@ -249,11 +252,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
String name = configKey.name;
- String type = configKey.type.name();
- Object defaultValue = configKey.defaultValue;
+ Type type = configKey.type;
+ String typeName = configKey.type.name();
+
boolean required = false;
- if (defaultValue == ConfigDef.NO_DEFAULT_VALUE) {
+ String defaultValue;
+ if (configKey.defaultValue == ConfigDef.NO_DEFAULT_VALUE) {
+ defaultValue = (String) configKey.defaultValue;
required = true;
+ } else {
+ defaultValue = ConfigDef.convertToString(configKey.defaultValue, type);
}
String importance = configKey.importance.name();
String documentation = configKey.documentation;
@@ -262,11 +270,23 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
String width = configKey.width.name();
String displayName = configKey.displayName;
List<String> dependents = configKey.dependents;
- return new ConfigKeyInfo(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+ return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
}
- private static ConfigValueInfo convertConfigValue(ConfigValue configValue) {
- return new ConfigValueInfo(configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages(), configValue.visible());
+ private static ConfigValueInfo convertConfigValue(ConfigValue configValue, Type type) {
+ String value = ConfigDef.convertToString(configValue.value(), type);
+ List<String> recommendedValues = new LinkedList<>();
+
+ if (type == Type.LIST) {
+ for (Object object: configValue.recommendedValues()) {
+ recommendedValues.add(ConfigDef.convertToString(object, Type.STRING));
+ }
+ } else {
+ for (Object object : configValue.recommendedValues()) {
+ recommendedValues.add(ConfigDef.convertToString(object, type));
+ }
+ }
+ return new ConfigValueInfo(configValue.name(), value, recommendedValues, configValue.errorMessages(), configValue.visible());
}
private Connector getConnector(String connType) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
index f813709..ead24c5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
@@ -28,7 +28,7 @@ public class ConfigKeyInfo {
private final String name;
private final String type;
private final boolean required;
- private final Object defaultValue;
+ private final String defaultValue;
private final String importance;
private final String documentation;
private final String group;
@@ -41,7 +41,7 @@ public class ConfigKeyInfo {
public ConfigKeyInfo(@JsonProperty("name") String name,
@JsonProperty("type") String type,
@JsonProperty("required") boolean required,
- @JsonProperty("default_value") Object defaultValue,
+ @JsonProperty("default_value") String defaultValue,
@JsonProperty("importance") String importance,
@JsonProperty("documentation") String documentation,
@JsonProperty("group") String group,
@@ -78,7 +78,7 @@ public class ConfigKeyInfo {
}
@JsonProperty("default_value")
- public Object defaultValue() {
+ public String defaultValue() {
return defaultValue;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
index 51e7ee5..a6ae006 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
@@ -25,16 +25,16 @@ import java.util.Objects;
public class ConfigValueInfo {
private String name;
- private Object value;
- private List<Object> recommendedValues;
+ private String value;
+ private List<String> recommendedValues;
private List<String> errors;
private boolean visible;
@JsonCreator
public ConfigValueInfo(
@JsonProperty("name") String name,
- @JsonProperty("value") Object value,
- @JsonProperty("recommended_values") List<Object> recommendedValues,
+ @JsonProperty("value") String value,
+ @JsonProperty("recommended_values") List<String> recommendedValues,
@JsonProperty("errors") List<String> errors,
@JsonProperty("visible") boolean visible) {
this.name = name;
@@ -50,12 +50,12 @@ public class ConfigValueInfo {
}
@JsonProperty
- public Object value() {
+ public String value() {
return value;
}
@JsonProperty("recommended_values")
- public List<Object> recommendedValues() {
+ public List<String> recommendedValues() {
return recommendedValues;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 1049e7e..732db3d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -21,8 +21,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Recommender;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.AbstractHerder;
@@ -48,6 +50,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -68,7 +71,8 @@ public class ConnectorPluginsResourceTest {
private static Map<String, String> props = new HashMap<>();
static {
props.put("test.string.config", "testString");
- props.put("test.int.config", "10");
+ props.put("test.int.config", "1");
+ props.put("test.list.config", "a,b");
}
private static final ConfigInfos CONFIG_INFOS;
@@ -76,22 +80,27 @@ public class ConnectorPluginsResourceTest {
static {
List<ConfigInfo> configs = new LinkedList<>();
- ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", new LinkedList<String>());
- ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+ ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", Collections.<String>emptyList());
+ ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<String>emptyList(), Collections.<String>emptyList(), true);
ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
configs.add(configInfo);
- configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", null, -1, "NONE", "test.int.config", new LinkedList<String>());
- configValueInfo = new ConfigValueInfo("test.int.config", 10, Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+ configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", Collections.<String>emptyList());
+ configValueInfo = new ConfigValueInfo("test.int.config", "1", Arrays.asList("1", "2", "3"), Collections.<String>emptyList(), true);
configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
configs.add(configInfo);
- configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", new LinkedList<String>());
- configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+ configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", Collections.<String>emptyList());
+ configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<String>emptyList(), Collections.<String>emptyList(), true);
configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
configs.add(configInfo);
- CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.<String>emptyList(), configs);
+ configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, "", "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", Collections.<String>emptyList());
+ configValueInfo = new ConfigValueInfo("test.list.config", "a,b", Arrays.asList("a", "b", "c"), Collections.<String>emptyList(), true);
+ configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+ configs.add(configInfo);
+
+ CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.singletonList("Test"), configs);
}
@Mock
@@ -143,14 +152,17 @@ public class ConnectorPluginsResourceTest {
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class ConnectorPluginsResourceTestConnector extends Connector {
- public static final String TEST_STRING_CONFIG = "test.string.config";
- public static final String TEST_INT_CONFIG = "test.int.config";
- public static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default";
+ private static final String TEST_STRING_CONFIG = "test.string.config";
+ private static final String TEST_INT_CONFIG = "test.int.config";
+ private static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default";
+ private static final String TEST_LIST_CONFIG = "test.list.config";
+ private static final String GROUP = "Test";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.")
- .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.")
- .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.");
+ .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.", GROUP, 1, Width.MEDIUM, TEST_INT_CONFIG, new IntegerRecommender())
+ .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.")
+ .define(TEST_LIST_CONFIG, Type.LIST, Importance.HIGH, "Test configuration for list type.", GROUP, 2, Width.LONG, TEST_LIST_CONFIG, new ListRecommender());
@Override
public String version() {
@@ -182,4 +194,29 @@ public class ConnectorPluginsResourceTest {
return CONFIG_DEF;
}
}
+
+ private static class IntegerRecommender implements Recommender {
+
+ @Override
+ public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+ return Arrays.<Object>asList(1, 2, 3);
+ }
+
+ @Override
+ public boolean visible(String name, Map<String, Object> parsedConfig) {
+ return true;
+ }
+ }
+
+ private static class ListRecommender implements Recommender {
+ @Override
+ public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+ return Arrays.<Object>asList("a", "b", "c");
+ }
+
+ @Override
+ public boolean visible(String name, Map<String, Object> parsedConfig) {
+ return true;
+ }
+ }
}