You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/06 03:17:46 UTC

kafka git commit: KAFKA-2702: ConfigDef toHtmlTable() sorts in a way that is a bit conf…

Repository: kafka
Updated Branches:
  refs/heads/trunk 76bcccd61 -> 33e879a38


KAFKA-2702: ConfigDef toHtmlTable() sorts in a way that is a bit conf…

…using

Author: Grant Henke <gr...@gmail.com>

Reviewers: Gwen Shapira

Closes #379 from granthenke/config-html


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33e879a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33e879a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33e879a3

Branch: refs/heads/trunk
Commit: 33e879a389aeaff3576064b10a0f5dea5ad391ed
Parents: 76bcccd
Author: Grant Henke <gr...@gmail.com>
Authored: Thu Nov 5 18:17:30 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Nov 5 18:17:30 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/config/ConfigDef.java   | 87 ++++++--------------
 .../apache/kafka/common/config/SaslConfigs.java |  2 +-
 .../apache/kafka/common/config/SslConfigs.java  | 16 ++--
 .../kafka/common/config/ConfigDefTest.java      | 17 ++--
 .../kafka/copycat/runtime/WorkerConfig.java     |  6 +-
 .../runtime/distributed/DistributedConfig.java  | 18 ++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 54 ++++++------
 7 files changed, 84 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/33e879a3/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index f01ed28..2a5ebee 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -69,34 +69,17 @@ public class ConfigDef {
      * @param validator     A validator to use in checking the correctness of the config
      * @param importance    The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
-     * @param required      Should the config fail if given property is not set and doesn't have default value specified
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
-                            boolean required) {
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
         if (configKeys.containsKey(name))
             throw new ConfigException("Configuration " + name + " is defined twice.");
         Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
-        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, required));
+        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
         return this;
     }
 
     /**
-     * Define a new required configuration
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param defaultValue  The default value to use if this config isn't present
-     * @param validator     A validator to use in checking the correctness of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
-        return define(name, type, defaultValue, validator, importance, documentation, true);
-    }
-
-    /**
      * Define a new configuration with no special validation logic
      *
      * @param name          The name of the config parameter
@@ -107,25 +90,11 @@ public class ConfigDef {
      * @return This ConfigDef so you can chain calls
      */
     public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
-        return define(name, type, defaultValue, null, importance, documentation, true);
-    }
-
-    /**
-     * Define a required parameter with no default value
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param validator     A validator to use in checking the correctness of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation, true);
+        return define(name, type, defaultValue, null, importance, documentation);
     }
 
     /**
-     * Define a required parameter with no default value and no special validation logic
+     * Define a new configuration with no default value and no special validation logic
      *
      * @param name          The name of the config parameter
      * @param type          The type of the config
@@ -134,21 +103,7 @@ public class ConfigDef {
      * @return This ConfigDef so you can chain calls
      */
     public ConfigDef define(String name, Type type, Importance importance, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, true);
-    }
-
-    /**
-     * Define a required parameter with no default value and no special validation logic
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @param required      Should the config fail if given property is not set and doesn't have default value specified
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Importance importance, String documentation, boolean required) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation);
     }
 
     /**
@@ -187,13 +142,10 @@ public class ConfigDef {
             // props map contains setting - assign ConfigKey value
             if (props.containsKey(key.name))
                 value = parseType(key.name, props.get(key.name), key.type);
-                // props map doesn't contain setting, the key is required and no default value specified - its an error
-            else if (key.defaultValue == NO_DEFAULT_VALUE && key.required)
+            // props map doesn't contain setting, the key is required because no default value specified - its an error
+            else if (key.defaultValue == NO_DEFAULT_VALUE)
                 throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
-                // props map doesn't contain setting, no default value specified and the key is not required - assign it to null
-            else if (!key.hasDefault() && !key.required)
-                value = null;
-                // otherwise assign setting its default value
+            // otherwise assign setting its default value
             else
                 value = key.defaultValue;
             if (key.validator != null)
@@ -213,9 +165,12 @@ public class ConfigDef {
      */
     private Object parseType(String name, Object value, Type type) {
         try {
+            if (value == null) return null;
+
             String trimmed = null;
             if (value instanceof String)
                 trimmed = ((String) value).trim();
+
             switch (type) {
                 case BOOLEAN:
                     if (value instanceof String) {
@@ -389,9 +344,8 @@ public class ConfigDef {
         public final Object defaultValue;
         public final Validator validator;
         public final Importance importance;
-        public final boolean required;
 
-        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required) {
+        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
             super();
             this.name = name;
             this.type = type;
@@ -401,7 +355,6 @@ public class ConfigDef {
             if (this.validator != null)
                 this.validator.ensureValid(name, defaultValue);
             this.documentation = documentation;
-            this.required = required;
         }
 
         public boolean hasDefault() {
@@ -415,7 +368,7 @@ public class ConfigDef {
         List<ConfigDef.ConfigKey> configs = new ArrayList<ConfigDef.ConfigKey>(this.configKeys.values());
         Collections.sort(configs, new Comparator<ConfigDef.ConfigKey>() {
             public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) {
-                // first take anything with no default value
+                // first take anything with no default value (therefore required)
                 if (!k1.hasDefault() && k2.hasDefault())
                     return -1;
                 else if (!k2.hasDefault() && k1.hasDefault())
@@ -436,6 +389,7 @@ public class ConfigDef {
         b.append("<th>Name</th>\n");
         b.append("<th>Type</th>\n");
         b.append("<th>Default</th>\n");
+        b.append("<th>Valid Values</th>\n");
         b.append("<th>Importance</th>\n");
         b.append("<th>Description</th>\n");
         b.append("</tr>\n");
@@ -448,7 +402,18 @@ public class ConfigDef {
             b.append(def.type.toString().toLowerCase());
             b.append("</td>");
             b.append("<td>");
-            b.append(def.defaultValue == null ? "" : def.defaultValue);
+            if (def.hasDefault()) {
+                if (def.defaultValue == null)
+                    b.append("null");
+                else if (def.type == Type.STRING && def.defaultValue.toString().isEmpty())
+                    b.append("\"\"");
+                else
+                    b.append(def.defaultValue);
+            } else
+                b.append("");
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.validator != null ? def.validator.toString() : "");
             b.append("</td>");
             b.append("<td>");
             b.append(def.importance.toString().toLowerCase());

http://git-wip-us.apache.org/repos/asf/kafka/blob/33e879a3/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 0046868..78ae06a 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -50,7 +50,7 @@ public class SaslConfigs {
     public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
 
     public static void addClientSaslSupport(ConfigDef config) {
-        config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+        config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/33e879a3/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 8f93301..d257e35 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -99,18 +99,18 @@ public class SslConfigs {
     public static void addClientSslSupport(ConfigDef config) {
         config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
                 .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
-                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
-                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
+                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
                 .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
                 .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
-                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null,  ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC)
                 .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
                 .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
                 .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false);
+                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33e879a3/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 44c2ef0..cb22ce1 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -68,9 +68,12 @@ public class ConfigDefTest {
         new ConfigDef().define("a", Type.INT, "hello", Importance.HIGH, "docs");
     }
 
-    @Test(expected = ConfigException.class)
+    @Test
     public void testNullDefault() {
-        new ConfigDef().define("a", Type.INT, null, null, null, "docs");
+        ConfigDef def = new ConfigDef().define("a", Type.INT, null, null, null, "docs");
+        Map<String, Object> vals = def.parse(new Properties());
+
+        assertEquals(null, vals.get("a"));
     }
 
     @Test(expected = ConfigException.class)
@@ -85,9 +88,9 @@ public class ConfigDefTest {
 
     @Test
     public void testBadInputs() {
-        testBadInputs(Type.INT, "hello", null, "42.5", 42.5, Long.MAX_VALUE, Long.toString(Long.MAX_VALUE), new Object());
-        testBadInputs(Type.LONG, "hello", null, "42.5", Long.toString(Long.MAX_VALUE) + "00", new Object());
-        testBadInputs(Type.DOUBLE, "hello", null, new Object());
+        testBadInputs(Type.INT, "hello", "42.5", 42.5, Long.MAX_VALUE, Long.toString(Long.MAX_VALUE), new Object());
+        testBadInputs(Type.LONG, "hello", "42.5", Long.toString(Long.MAX_VALUE) + "00", new Object());
+        testBadInputs(Type.DOUBLE, "hello", new Object());
         testBadInputs(Type.STRING, new Object());
         testBadInputs(Type.LIST, 53, new Object());
         testBadInputs(Type.BOOLEAN, "hello", "truee", "fals");

http://git-wip-us.apache.org/repos/asf/kafka/blob/33e879a3/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
index 74aadb9..0c6a6f6 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
@@ -126,10 +126,10 @@ public class WorkerConfig extends AbstractConfig {
                         Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
                 .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
                         Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
-                .define(REST_HOST_NAME_CONFIG, Type.STRING, Importance.LOW, REST_HOST_NAME_DOC, false)
+                .define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
                 .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
-                .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC, false)
-                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, Importance.LOW, REST_ADVERTISED_PORT_DOC, false);
+                .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
+                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
     }
 
     public WorkerConfig(ConfigDef definition, Properties props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/33e879a3/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
index 7fe6691..90d63cf 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
@@ -137,20 +137,20 @@ public class DistributedConfig extends WorkerConfig {
                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                 .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
                 .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
-                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
-                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
+                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
                 .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
                 .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
-                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC)
                 .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
                 .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
                 .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
-                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null,  ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/33e879a3/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2973dab..bcedfaf 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -335,7 +335,7 @@ object KafkaConfig {
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Zookeeper host string"
   val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
-  val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper"
+  val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used"
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
   /** ********* General Configuration ***********/
@@ -379,16 +379,16 @@ object KafkaConfig {
   /** ********* Log Configuration ***********/
   val NumPartitionsDoc = "The default number of log partitions per topic"
   val LogDirDoc = "The directory in which the log data is kept (supplemental for " + LogDirsProp + " property)"
-  val LogDirsDoc = "The directories in which the log data is kept"
+  val LogDirsDoc = "The directories in which the log data is kept. If not set, the value in " + LogDirProp + " is used"
   val LogSegmentBytesDoc = "The maximum size of a single log file"
-  val LogRollTimeMillisDoc = "The maximum time before a new log segment is rolled out (in milliseconds)"
+  val LogRollTimeMillisDoc = "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in " + LogRollTimeHoursProp + " is used"
   val LogRollTimeHoursDoc = "The maximum time before a new log segment is rolled out (in hours), secondary to " + LogRollTimeMillisProp + " property"
 
-  val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds)"
+  val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used"
   val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property"
 
-  val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds)"
-  val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property"
+  val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used"
+  val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used"
   val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property"
 
   val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
@@ -409,7 +409,7 @@ object KafkaConfig {
   val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk "
   val LogDeleteDelayMsDoc = "The amount of time to wait before deleting a file from the filesystem"
   val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
-  val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk"
+  val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in " + LogFlushSchedulerIntervalMsProp + " is used"
   val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
   val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
@@ -517,7 +517,7 @@ object KafkaConfig {
       /** ********* Zookeeper Configuration ***********/
       .define(ZkConnectProp, STRING, HIGH, ZkConnectDoc)
       .define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, ZkSessionTimeoutMsDoc)
-      .define(ZkConnectionTimeoutMsProp, INT, HIGH, ZkConnectionTimeoutMsDoc, false)
+      .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc)
       .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
       .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
 
@@ -537,10 +537,10 @@ object KafkaConfig {
       /** ********* Socket Server Configuration ***********/
       .define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
       .define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
-      .define(ListenersProp, STRING, HIGH, ListenersDoc, false)
-      .define(AdvertisedHostNameProp, STRING, HIGH, AdvertisedHostNameDoc, false)
-      .define(AdvertisedPortProp, INT, HIGH, AdvertisedPortDoc, false)
-      .define(AdvertisedListenersProp, STRING, HIGH, AdvertisedListenersDoc, false)
+      .define(ListenersProp, STRING, null, HIGH, ListenersDoc)
+      .define(AdvertisedHostNameProp, STRING, null, HIGH, AdvertisedHostNameDoc)
+      .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
+      .define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc)
       .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc)
       .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
       .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
@@ -551,17 +551,17 @@ object KafkaConfig {
       /** ********* Log Configuration ***********/
       .define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
       .define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
-      .define(LogDirsProp, STRING, HIGH, LogDirsDoc, false)
+      .define(LogDirsProp, STRING, null, HIGH, LogDirsDoc)
       .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Message.MinHeaderSize), HIGH, LogSegmentBytesDoc)
 
-      .define(LogRollTimeMillisProp, LONG, HIGH, LogRollTimeMillisDoc, false)
+      .define(LogRollTimeMillisProp, LONG, null, HIGH, LogRollTimeMillisDoc)
       .define(LogRollTimeHoursProp, INT, Defaults.LogRollHours, atLeast(1), HIGH, LogRollTimeHoursDoc)
 
-      .define(LogRollTimeJitterMillisProp, LONG, HIGH, LogRollTimeJitterMillisDoc, false)
+      .define(LogRollTimeJitterMillisProp, LONG, null, HIGH, LogRollTimeJitterMillisDoc)
       .define(LogRollTimeJitterHoursProp, INT, Defaults.LogRollJitterHours, atLeast(0), HIGH, LogRollTimeJitterHoursDoc)
 
-      .define(LogRetentionTimeMillisProp, LONG, HIGH, LogRetentionTimeMillisDoc, false)
-      .define(LogRetentionTimeMinutesProp, INT, HIGH, LogRetentionTimeMinsDoc, false)
+      .define(LogRetentionTimeMillisProp, LONG, null, HIGH, LogRetentionTimeMillisDoc)
+      .define(LogRetentionTimeMinutesProp, INT, null, HIGH, LogRetentionTimeMinsDoc)
       .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, HIGH, LogRetentionTimeHoursDoc)
 
       .define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc)
@@ -581,7 +581,7 @@ object KafkaConfig {
       .define(LogFlushIntervalMessagesProp, LONG, Defaults.LogFlushIntervalMessages, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
       .define(LogDeleteDelayMsProp, LONG, Defaults.LogDeleteDelayMs, atLeast(0), HIGH, LogDeleteDelayMsDoc)
       .define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
-      .define(LogFlushIntervalMsProp, LONG, HIGH, LogFlushIntervalMsDoc, false)
+      .define(LogFlushIntervalMsProp, LONG, null, HIGH, LogFlushIntervalMsDoc)
       .define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
       .define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
       .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
@@ -647,23 +647,23 @@ object KafkaConfig {
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
       .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
-      .define(SslProviderProp, STRING, MEDIUM, SslProviderDoc, false)
+      .define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc)
       .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc)
       .define(SslKeystoreTypeProp, STRING, Defaults.SslKeystoreType, MEDIUM, SslKeystoreTypeDoc)
-      .define(SslKeystoreLocationProp, STRING, MEDIUM, SslKeystoreLocationDoc, false)
-      .define(SslKeystorePasswordProp, STRING, MEDIUM, SslKeystorePasswordDoc, false)
-      .define(SslKeyPasswordProp, STRING, MEDIUM, SslKeyPasswordDoc, false)
+      .define(SslKeystoreLocationProp, STRING, null, MEDIUM, SslKeystoreLocationDoc)
+      .define(SslKeystorePasswordProp, STRING, null, MEDIUM, SslKeystorePasswordDoc)
+      .define(SslKeyPasswordProp, STRING, null, MEDIUM, SslKeyPasswordDoc)
       .define(SslTruststoreTypeProp, STRING, Defaults.SslTruststoreType, MEDIUM, SslTruststoreTypeDoc)
-      .define(SslTruststoreLocationProp, STRING, MEDIUM, SslTruststoreLocationDoc, false)
-      .define(SslTruststorePasswordProp, STRING, MEDIUM, SslTruststorePasswordDoc, false)
+      .define(SslTruststoreLocationProp, STRING, null, MEDIUM, SslTruststoreLocationDoc)
+      .define(SslTruststorePasswordProp, STRING, null, MEDIUM, SslTruststorePasswordDoc)
       .define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc)
       .define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc)
-      .define(SslEndpointIdentificationAlgorithmProp, STRING, LOW, SslEndpointIdentificationAlgorithmDoc, false)
+      .define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc)
       .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
-      .define(SslCipherSuitesProp, LIST, MEDIUM, SslCipherSuitesDoc, false)
+      .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc)
 
       /** ********* Sasl Configuration ****************/
-      .define(SaslKerberosServiceNameProp, STRING, MEDIUM, SaslKerberosServiceNameDoc, false)
+      .define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, SaslKerberosServiceNameDoc)
       .define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc)
       .define(SaslKerberosTicketRenewWindowFactorProp, DOUBLE, Defaults.SaslKerberosTicketRenewWindowFactor, MEDIUM, SaslKerberosTicketRenewWindowFactorDoc)
       .define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc)