You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/16 00:51:49 UTC

kafka git commit: KAFKA-3526: Return string instead of object in ConfigKeyInfo and ConfigValueInfo

Repository: kafka
Updated Branches:
  refs/heads/trunk 065ddf901 -> 5236bf60d


KAFKA-3526: Return string instead of object in ConfigKeyInfo and ConfigValueInfo

Author: Liquan Pei <li...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1200 from Ishiihara/config-string


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5236bf60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5236bf60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5236bf60

Branch: refs/heads/trunk
Commit: 5236bf60debbb0c08010315a92dd3fbfa482e871
Parents: 065ddf9
Author: Liquan Pei <li...@gmail.com>
Authored: Fri Apr 15 15:51:31 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 15 15:51:31 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/config/ConfigDef.java   | 35 +++++++++--
 .../kafka/common/config/ConfigDefTest.java      |  3 +-
 .../kafka/connect/runtime/AbstractHerder.java   | 40 +++++++++----
 .../runtime/rest/entities/ConfigKeyInfo.java    |  6 +-
 .../runtime/rest/entities/ConfigValueInfo.java  | 12 ++--
 .../resources/ConnectorPluginsResourceTest.java | 63 ++++++++++++++++----
 6 files changed, 120 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 881cb0b..1df55d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -538,19 +538,18 @@ public class ConfigDef {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
         if (!configKeys.containsKey(name)) {
             return;
         }
         ConfigKey key = configKeys.get(name);
         ConfigValue config = configs.get(name);
-        Object value = parsed.get(name);
         List<Object> recommendedValues;
         if (key.recommender != null) {
             try {
                 recommendedValues = key.recommender.validValues(name, parsed);
                 List<Object> originalRecommendedValues = config.recommendedValues();
-
                 if (!originalRecommendedValues.isEmpty()) {
                     Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
                     Iterator<Object> it = recommendedValues.iterator();
@@ -562,9 +561,6 @@ public class ConfigDef {
                     }
                 }
                 config.recommendedValues(recommendedValues);
-                if (value != null && !recommendedValues.isEmpty() && !recommendedValues.contains(value)) {
-                    config.addErrorMessage("Invalid value for configuration " + key.name);
-                }
                 config.visible(key.recommender.visible(name, parsed));
             } catch (ConfigException e) {
                 config.addErrorMessage(e.getMessage());
@@ -676,6 +672,35 @@ public class ConfigDef {
         }
     }
 
+    public static String convertToString(Object parsedValue, Type type) {
+        if (parsedValue == null) {
+            return null;
+        }
+
+        if (type == null) {
+            return parsedValue.toString();
+        }
+
+        switch (type) {
+            case BOOLEAN:
+            case SHORT:
+            case INT:
+            case LONG:
+            case DOUBLE:
+            case STRING:
+            case PASSWORD:
+                return parsedValue.toString();
+            case LIST:
+                List<?> valueList = (List<?>) parsedValue;
+                return Utils.join(valueList, ",");
+            case CLASS:
+                Class<?> clazz = (Class<?>) parsedValue;
+                return clazz.getCanonicalName();
+            default:
+                throw new IllegalStateException("Unknown type.");
+        }
+    }
+
     /**
      * The config types
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 022fb6b..e20e422 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -236,12 +236,11 @@ public class ConfigDefTest {
         Map<String, ConfigValue> expected = new HashMap<>();
         String errorMessageB = "Missing required configuration \"b\" which has no default value.";
         String errorMessageC = "Missing required configuration \"c\" which has no default value.";
-        String errorMessageD = "Invalid value for configuration d";
 
         ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
         ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB));
         ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
-        ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Arrays.asList(errorMessageD));
+        ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
 
         expected.put("a", configA);
         expected.put("b", configB);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index a97c4db..1d87d60 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -230,16 +231,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             configValueMap.put(configName, configValue);
             if (!configKeys.containsKey(configName)) {
                 configValue.addErrorMessage("Configuration is not defined: " + configName);
-                configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue)));
+                configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue, null)));
             }
         }
 
-        for (String configName: configKeys.keySet()) {
-            ConfigKeyInfo configKeyInfo = convertConfigKey(configKeys.get(configName));
+        for (Map.Entry<String, ConfigKey> entry : configKeys.entrySet()) {
+            String configName = entry.getKey();
+            ConfigKeyInfo configKeyInfo = convertConfigKey(entry.getValue());
+            Type type = entry.getValue().type;
             ConfigValueInfo configValueInfo = null;
             if (configValueMap.containsKey(configName)) {
                 ConfigValue configValue = configValueMap.get(configName);
-                configValueInfo = convertConfigValue(configValue);
+                configValueInfo = convertConfigValue(configValue, type);
                 errorCount += configValue.errorMessages().size();
             }
             configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
@@ -249,11 +252,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
 
     private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
         String name = configKey.name;
-        String type = configKey.type.name();
-        Object defaultValue = configKey.defaultValue;
+        Type type = configKey.type;
+        String typeName = configKey.type.name();
+
         boolean required = false;
-        if (defaultValue == ConfigDef.NO_DEFAULT_VALUE) {
+        String defaultValue;
+        if (configKey.defaultValue == ConfigDef.NO_DEFAULT_VALUE) {
+            defaultValue = (String) configKey.defaultValue;
             required = true;
+        } else {
+            defaultValue = ConfigDef.convertToString(configKey.defaultValue, type);
         }
         String importance = configKey.importance.name();
         String documentation = configKey.documentation;
@@ -262,11 +270,23 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         String width = configKey.width.name();
         String displayName = configKey.displayName;
         List<String> dependents = configKey.dependents;
-        return new ConfigKeyInfo(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+        return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
     }
 
-    private static ConfigValueInfo convertConfigValue(ConfigValue configValue) {
-        return new ConfigValueInfo(configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages(), configValue.visible());
+    private static ConfigValueInfo convertConfigValue(ConfigValue configValue, Type type) {
+        String value = ConfigDef.convertToString(configValue.value(), type);
+        List<String> recommendedValues = new LinkedList<>();
+
+        if (type == Type.LIST) {
+            for (Object object: configValue.recommendedValues()) {
+                recommendedValues.add(ConfigDef.convertToString(object, Type.STRING));
+            }
+        } else {
+            for (Object object : configValue.recommendedValues()) {
+                recommendedValues.add(ConfigDef.convertToString(object, type));
+            }
+        }
+        return new ConfigValueInfo(configValue.name(), value, recommendedValues, configValue.errorMessages(), configValue.visible());
     }
 
     private Connector getConnector(String connType) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
index f813709..ead24c5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
@@ -28,7 +28,7 @@ public class ConfigKeyInfo {
     private final String name;
     private final String type;
     private final boolean required;
-    private final Object defaultValue;
+    private final String defaultValue;
     private final String importance;
     private final String documentation;
     private final String group;
@@ -41,7 +41,7 @@ public class ConfigKeyInfo {
     public ConfigKeyInfo(@JsonProperty("name") String name,
                          @JsonProperty("type") String type,
                          @JsonProperty("required") boolean required,
-                         @JsonProperty("default_value") Object defaultValue,
+                         @JsonProperty("default_value") String defaultValue,
                          @JsonProperty("importance") String importance,
                          @JsonProperty("documentation") String documentation,
                          @JsonProperty("group") String group,
@@ -78,7 +78,7 @@ public class ConfigKeyInfo {
     }
 
     @JsonProperty("default_value")
-    public Object defaultValue() {
+    public String defaultValue() {
         return defaultValue;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
index 51e7ee5..a6ae006 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
@@ -25,16 +25,16 @@ import java.util.Objects;
 
 public class ConfigValueInfo {
     private String name;
-    private Object value;
-    private List<Object> recommendedValues;
+    private String value;
+    private List<String> recommendedValues;
     private List<String> errors;
     private boolean visible;
 
     @JsonCreator
     public ConfigValueInfo(
         @JsonProperty("name") String name,
-        @JsonProperty("value") Object value,
-        @JsonProperty("recommended_values") List<Object> recommendedValues,
+        @JsonProperty("value") String value,
+        @JsonProperty("recommended_values") List<String> recommendedValues,
         @JsonProperty("errors") List<String> errors,
         @JsonProperty("visible") boolean visible) {
         this.name = name;
@@ -50,12 +50,12 @@ public class ConfigValueInfo {
     }
 
     @JsonProperty
-    public Object value() {
+    public String value() {
         return value;
     }
 
     @JsonProperty("recommended_values")
-    public List<Object> recommendedValues() {
+    public List<String> recommendedValues() {
         return recommendedValues;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 1049e7e..732db3d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -21,8 +21,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
 
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Recommender;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
@@ -48,6 +50,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -68,7 +71,8 @@ public class ConnectorPluginsResourceTest {
     private static Map<String, String> props = new HashMap<>();
     static {
         props.put("test.string.config", "testString");
-        props.put("test.int.config", "10");
+        props.put("test.int.config", "1");
+        props.put("test.list.config", "a,b");
     }
 
     private static final ConfigInfos CONFIG_INFOS;
@@ -76,22 +80,27 @@ public class ConnectorPluginsResourceTest {
     static {
         List<ConfigInfo> configs = new LinkedList<>();
 
-        ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", new LinkedList<String>());
-        ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+        ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", Collections.<String>emptyList());
+        ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<String>emptyList(), Collections.<String>emptyList(), true);
         ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
 
-        configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", null, -1, "NONE", "test.int.config", new LinkedList<String>());
-        configValueInfo = new ConfigValueInfo("test.int.config", 10, Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+        configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", Collections.<String>emptyList());
+        configValueInfo = new ConfigValueInfo("test.int.config", "1", Arrays.asList("1", "2", "3"), Collections.<String>emptyList(), true);
         configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
 
-        configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", new LinkedList<String>());
-        configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+        configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", Collections.<String>emptyList());
+        configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<String>emptyList(), Collections.<String>emptyList(), true);
         configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
 
-        CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.<String>emptyList(), configs);
+        configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, "", "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", Collections.<String>emptyList());
+        configValueInfo = new ConfigValueInfo("test.list.config", "a,b", Arrays.asList("a", "b", "c"), Collections.<String>emptyList(), true);
+        configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+        configs.add(configInfo);
+
+        CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.singletonList("Test"), configs);
     }
 
     @Mock
@@ -143,14 +152,17 @@ public class ConnectorPluginsResourceTest {
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class ConnectorPluginsResourceTestConnector extends Connector {
 
-        public static final String TEST_STRING_CONFIG = "test.string.config";
-        public static final String TEST_INT_CONFIG = "test.int.config";
-        public static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default";
+        private static final String TEST_STRING_CONFIG = "test.string.config";
+        private static final String TEST_INT_CONFIG = "test.int.config";
+        private static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default";
+        private static final String TEST_LIST_CONFIG = "test.list.config";
+        private static final String GROUP = "Test";
 
         private static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.")
-            .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.")
-            .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.");
+            .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.", GROUP, 1, Width.MEDIUM, TEST_INT_CONFIG, new IntegerRecommender())
+            .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.")
+            .define(TEST_LIST_CONFIG, Type.LIST, Importance.HIGH, "Test configuration for list type.", GROUP, 2, Width.LONG, TEST_LIST_CONFIG, new ListRecommender());
 
         @Override
         public String version() {
@@ -182,4 +194,29 @@ public class ConnectorPluginsResourceTest {
             return CONFIG_DEF;
         }
     }
+
+    private static class IntegerRecommender implements Recommender {
+
+        @Override
+        public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+            return Arrays.<Object>asList(1, 2, 3);
+        }
+
+        @Override
+        public boolean visible(String name, Map<String, Object> parsedConfig) {
+            return true;
+        }
+    }
+
+    private static class ListRecommender implements Recommender {
+        @Override
+        public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+            return Arrays.<Object>asList("a", "b", "c");
+        }
+
+        @Override
+        public boolean visible(String name, Map<String, Object> parsedConfig) {
+            return true;
+        }
+    }
 }