You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2015/03/05 15:53:42 UTC

[3/3] kafka git commit: KAFKA-1845 KafkaConfig should use ConfigDef patch by Andrii Biletskyi reviewed by Gwen Shapira

KAFKA-1845 KafkaConfig should use ConfigDef patch by Andrii Biletskyi reviewed by Gwen Shapira


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f0003f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f0003f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f0003f9

Branch: refs/heads/trunk
Commit: 8f0003f9b694b4da5fbd2f86db872d77a43eb63f
Parents: 3a9f4b8
Author: Joe Stein <jo...@stealth.ly>
Authored: Thu Mar 5 09:53:27 2015 -0500
Committer: Joe Stein <jo...@stealth.ly>
Committed: Thu Mar 5 09:53:27 2015 -0500

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 .../apache/kafka/common/config/ConfigDef.java   |  160 ++-
 core/src/main/scala/kafka/Kafka.scala           |    6 +-
 .../kafka/controller/KafkaController.scala      |    2 +-
 .../controller/PartitionLeaderSelector.scala    |    2 +-
 .../main/scala/kafka/server/KafkaConfig.scala   | 1169 +++++++++++++-----
 .../main/scala/kafka/server/KafkaServer.scala   |    2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |    2 +-
 .../kafka/api/IntegrationTestHarness.scala      |    2 +-
 .../kafka/api/ProducerCompressionTest.scala     |    2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |   21 +-
 .../kafka/api/ProducerSendTest.scala            |   12 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |    8 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   16 +-
 .../kafka/admin/DeleteConsumerGroupTest.scala   |    2 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |    4 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |   10 +-
 .../ZookeeperConsumerConnectorTest.scala        |   12 +-
 .../kafka/integration/AutoOffsetResetTest.scala |    2 +-
 .../unit/kafka/integration/FetcherTest.scala    |    2 +-
 .../kafka/integration/PrimitiveApiTest.scala    |    2 +-
 .../kafka/integration/RollingBounceTest.scala   |    8 +-
 .../kafka/integration/TopicMetadataTest.scala   |    2 +-
 .../integration/UncleanLeaderElectionTest.scala |    2 +-
 .../ZookeeperConsumerConnectorTest.scala        |   15 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |    2 +-
 .../kafka/log4j/KafkaLog4jAppenderTest.scala    |    2 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala  |   15 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |    2 +-
 .../unit/kafka/producer/ProducerTest.scala      |    4 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |    2 +-
 .../unit/kafka/server/AdvertiseBrokerTest.scala |    2 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |    2 +-
 .../server/HighwatermarkPersistenceTest.scala   |    2 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   16 +-
 .../kafka/server/KafkaConfigConfigDefTest.scala |  403 ++++++
 .../unit/kafka/server/KafkaConfigTest.scala     |   39 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |    6 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |    2 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |   20 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |    2 +-
 .../unit/kafka/server/ReplicaFetchTest.scala    |    2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |    6 +-
 .../server/ServerGenerateBrokerIdTest.scala     |   10 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |    6 +-
 .../unit/kafka/server/ServerStartupTest.scala   |    6 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   17 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala |    2 +-
 48 files changed, 1521 insertions(+), 515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 4c6c29e..1f3ba7d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ lib_managed/
 src_managed/
 project/boot/
 project/plugins/project/
+patch-process/*
 .idea
 .svn
 .classpath

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 8523333..4170bcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -26,22 +26,22 @@ import org.apache.kafka.common.utils.Utils;
 /**
  * This class is used for specifying the set of expected configurations, their type, their defaults, their
  * documentation, and any special validation logic used for checking the correctness of the values the user provides.
- * <p>
+ * <p/>
  * Usage of this class looks something like this:
- * 
+ * <p/>
  * <pre>
  * ConfigDef defs = new ConfigDef();
  * defs.define(&quot;config_name&quot;, Type.STRING, &quot;default string value&quot;, &quot;This configuration is used for blah blah blah.&quot;);
  * defs.define(&quot;another_config_name&quot;, Type.INT, 42, Range.atLeast(0), &quot;More documentation on this config&quot;);
- * 
+ *
  * Properties props = new Properties();
  * props.setProperty(&quot;config_name&quot;, &quot;some value&quot;);
  * Map&lt;String, Object&gt; configs = defs.parse(props);
- * 
+ *
  * String someConfig = (String) configs.get(&quot;config_name&quot;); // will return &quot;some value&quot;
  * int anotherConfig = (Integer) configs.get(&quot;another_config_name&quot;); // will return default value of 42
  * </pre>
- * 
+ * <p/>
  * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
  * functionality for accessing configs.
  */
@@ -53,7 +53,7 @@ public class ConfigDef {
 
     /**
      * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
-     * 
+     *
      * @return new unmodifiable {@link Set} instance containing the keys
      */
     public Set<String> names() {
@@ -62,90 +62,121 @@ public class ConfigDef {
 
     /**
      * Define a new configuration
-     * 
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param defaultValue The default value to use if this config isn't present
-     * @param validator A validator to use in checking the correctness of the config
-     * @param importance The importance of this config: is this something you will likely need to change.
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param defaultValue  The default value to use if this config isn't present
+     * @param validator     A validator to use in checking the correctness of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
+     * @param required      Should the config fail if given property is not set and doesn't have default value specified
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name,
-                            Type type,
-                            Object defaultValue,
-                            Validator validator,
-                            Importance importance,
-                            String documentation) {
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+                            boolean required) {
         if (configKeys.containsKey(name))
             throw new ConfigException("Configuration " + name + " is defined twice.");
-        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE
-                : parseType(name, defaultValue, type);
-        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
+        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, required));
         return this;
     }
 
     /**
+     * Define a new required configuration
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param defaultValue  The default value to use if this config isn't present
+     * @param validator     A validator to use in checking the correctness of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+        return define(name, type, defaultValue, validator, importance, documentation, true);
+    }
+
+    /**
      * Define a new configuration with no special validation logic
-     * 
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param defaultValue The default value to use if this config isn't present
-     * @param importance The importance of this config: is this something you will likely need to change.
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param defaultValue  The default value to use if this config isn't present
+     * @param importance    The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
     public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
-        return define(name, type, defaultValue, null, importance, documentation);
+        return define(name, type, defaultValue, null, importance, documentation, true);
     }
 
     /**
      * Define a required parameter with no default value
-     * 
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param validator A validator to use in checking the correctness of the config
-     * @param importance The importance of this config: is this something you will likely need to change.
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param validator     A validator to use in checking the correctness of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
     public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation);
+        return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation, true);
     }
 
     /**
      * Define a required parameter with no default value and no special validation logic
-     * 
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param importance The importance of this config: is this something you will likely need to change.
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
     public ConfigDef define(String name, Type type, Importance importance, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation);
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, true);
     }
 
     /**
+     * Define a required parameter with no default value and no special validation logic
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @param required      Should the config fail if given property is not set and doesn't have default value specified
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Importance importance, String documentation, boolean required) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
+    }
+
+
+    /**
      * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
      * that the keys of the map are strings, but the values can either be strings or they may already be of the
      * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
      * programmatically constructed map.
-     * 
+     *
      * @param props The configs to parse and validate
      * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
-     *         the appropriate type (int, string, etc)
+     * the appropriate type (int, string, etc)
      */
     public Map<String, Object> parse(Map<?, ?> props) {
         /* parse all known keys */
         Map<String, Object> values = new HashMap<String, Object>();
         for (ConfigKey key : configKeys.values()) {
             Object value;
+            // props map contains setting - assign ConfigKey value
             if (props.containsKey(key.name))
                 value = parseType(key.name, props.get(key.name), key.type);
-            else if (key.defaultValue == NO_DEFAULT_VALUE)
-                throw new ConfigException("Missing required configuration \"" + key.name
-                        + "\" which has no default value.");
+                // props map doesn't contain setting, the key is required and no default value specified - it's an error
+            else if (key.defaultValue == NO_DEFAULT_VALUE && key.required)
+                throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+                // props map doesn't contain setting, no default value specified and the key is not required - assign it to null
+            else if (!key.hasDefault() && !key.required)
+                value = null;
+                // otherwise assign setting it's default value
             else
                 value = key.defaultValue;
             if (key.validator != null)
@@ -157,10 +188,10 @@ public class ConfigDef {
 
     /**
      * Parse a value according to its expected type.
-     * 
-     * @param name The config name
+     *
+     * @param name  The config name
      * @param value The config value
-     * @param type The expected type
+     * @param type  The expected type
      * @return The parsed object
      */
     private Object parseType(String name, Object value, Type type) {
@@ -185,8 +216,7 @@ public class ConfigDef {
                     if (value instanceof String)
                         return trimmed;
                     else
-                        throw new ConfigException(name, value, "Expected value to be a string, but it was a "
-                                + value.getClass().getName());
+                        throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
                 case INT:
                     if (value instanceof Integer) {
                         return (Integer) value;
@@ -195,6 +225,14 @@ public class ConfigDef {
                     } else {
                         throw new ConfigException(name, value, "Expected value to be an number.");
                     }
+                case SHORT:
+                    if (value instanceof Short) {
+                        return (Short) value;
+                    } else if (value instanceof String) {
+                        return Short.parseShort(trimmed);
+                    } else {
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                    }
                 case LONG:
                     if (value instanceof Integer)
                         return ((Integer) value).longValue();
@@ -242,7 +280,7 @@ public class ConfigDef {
      * The config types
      */
     public enum Type {
-        BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS;
+        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS;
     }
 
     public enum Importance {
@@ -270,7 +308,7 @@ public class ConfigDef {
 
         /**
          * A numeric range that checks only the lower bound
-         * 
+         *
          * @param min The minimum acceptable value
          */
         public static Range atLeast(Number min) {
@@ -303,7 +341,7 @@ public class ConfigDef {
     }
 
     public static class ValidString implements Validator {
-        private final List<String> validStrings;
+        List<String> validStrings;
 
         private ValidString(List<String> validStrings) {
             this.validStrings = validStrings;
@@ -316,14 +354,15 @@ public class ConfigDef {
         @Override
         public void ensureValid(String name, Object o) {
             String s = (String) o;
-            if (!validStrings.contains(s))
+            if (!validStrings.contains(s)) {
                 throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
+            }
+
         }
 
         public String toString() {
             return "[" + Utils.join(validStrings, ", ") + "]";
         }
-
     }
 
     private static class ConfigKey {
@@ -333,13 +372,9 @@ public class ConfigDef {
         public final Object defaultValue;
         public final Validator validator;
         public final Importance importance;
+        public final boolean required;
 
-        public ConfigKey(String name,
-                         Type type,
-                         Object defaultValue,
-                         Validator validator,
-                         Importance importance,
-                         String documentation) {
+        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required) {
             super();
             this.name = name;
             this.type = type;
@@ -349,6 +384,7 @@ public class ConfigDef {
             if (this.validator != null)
                 this.validator.ensureValid(name, defaultValue);
             this.documentation = documentation;
+            this.required = required;
         }
 
         public boolean hasDefault() {
@@ -408,4 +444,4 @@ public class ConfigDef {
         b.append("</table>");
         return b.toString();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 77a49e1..37de7df 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions._
 import joptsimple.OptionParser
 import metrics.KafkaMetricsReporter
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
-import kafka.utils.{CommandLineUtils, Utils, Logging}
+import kafka.utils.{VerifiableProperties, CommandLineUtils, Utils, Logging}
 
 object Kafka extends Logging {
 
@@ -47,13 +47,13 @@ object Kafka extends Logging {
       props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
     }
 
-    new KafkaConfig(props)
+    KafkaConfig.fromProps(props)
   }
 
   def main(args: Array[String]): Unit = {
     try {
       val serverConfig = getKafkaConfigFromArgs(args)
-      KafkaMetricsReporter.startReporters(serverConfig.props)
+      KafkaMetricsReporter.startReporters(new VerifiableProperties(serverConfig.toProps))
       val kafkaServerStartable = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e9b4dc6..09fc46d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1012,7 +1012,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
             // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
             // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
             // eventually be restored as the leader.
-            if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient,
+            if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient,
               topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
               newIsr = leaderAndIsr.isr

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 4a31c72..3b15ab4 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
           case true =>
             // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
             // for unclean leader election.
-            if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
+            if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
               topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               throw new NoReplicaOnlineException(("No broker in ISR for partition " +
                 "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +