You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2015/03/05 15:53:42 UTC
[3/3] kafka git commit: KAFKA-1845 KafkaConfig should use ConfigDef
patch by Andrii Biletskyi reviewed by Gwen Shapira
KAFKA-1845 KafkaConfig should use ConfigDef patch by Andrii Biletskyi reviewed by Gwen Shapira
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f0003f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f0003f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f0003f9
Branch: refs/heads/trunk
Commit: 8f0003f9b694b4da5fbd2f86db872d77a43eb63f
Parents: 3a9f4b8
Author: Joe Stein <jo...@stealth.ly>
Authored: Thu Mar 5 09:53:27 2015 -0500
Committer: Joe Stein <jo...@stealth.ly>
Committed: Thu Mar 5 09:53:27 2015 -0500
----------------------------------------------------------------------
.gitignore | 1 +
.../apache/kafka/common/config/ConfigDef.java | 160 ++-
core/src/main/scala/kafka/Kafka.scala | 6 +-
.../kafka/controller/KafkaController.scala | 2 +-
.../controller/PartitionLeaderSelector.scala | 2 +-
.../main/scala/kafka/server/KafkaConfig.scala | 1169 +++++++++++++-----
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../kafka/server/ReplicaFetcherThread.scala | 2 +-
.../kafka/api/IntegrationTestHarness.scala | 2 +-
.../kafka/api/ProducerCompressionTest.scala | 2 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 21 +-
.../kafka/api/ProducerSendTest.scala | 12 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 8 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 16 +-
.../kafka/admin/DeleteConsumerGroupTest.scala | 2 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 4 +-
.../kafka/consumer/ConsumerIteratorTest.scala | 10 +-
.../ZookeeperConsumerConnectorTest.scala | 12 +-
.../kafka/integration/AutoOffsetResetTest.scala | 2 +-
.../unit/kafka/integration/FetcherTest.scala | 2 +-
.../kafka/integration/PrimitiveApiTest.scala | 2 +-
.../kafka/integration/RollingBounceTest.scala | 8 +-
.../kafka/integration/TopicMetadataTest.scala | 2 +-
.../integration/UncleanLeaderElectionTest.scala | 2 +-
.../ZookeeperConsumerConnectorTest.scala | 15 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 2 +-
.../kafka/log4j/KafkaLog4jAppenderTest.scala | 2 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 15 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 2 +-
.../unit/kafka/producer/ProducerTest.scala | 4 +-
.../unit/kafka/producer/SyncProducerTest.scala | 2 +-
.../unit/kafka/server/AdvertiseBrokerTest.scala | 2 +-
.../kafka/server/DynamicConfigChangeTest.scala | 2 +-
.../server/HighwatermarkPersistenceTest.scala | 2 +-
.../unit/kafka/server/ISRExpirationTest.scala | 16 +-
.../kafka/server/KafkaConfigConfigDefTest.scala | 403 ++++++
.../unit/kafka/server/KafkaConfigTest.scala | 39 +-
.../unit/kafka/server/LeaderElectionTest.scala | 6 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 2 +-
.../unit/kafka/server/LogRecoveryTest.scala | 20 +-
.../unit/kafka/server/OffsetCommitTest.scala | 2 +-
.../unit/kafka/server/ReplicaFetchTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 6 +-
.../server/ServerGenerateBrokerIdTest.scala | 10 +-
.../unit/kafka/server/ServerShutdownTest.scala | 6 +-
.../unit/kafka/server/ServerStartupTest.scala | 6 +-
.../unit/kafka/server/SimpleFetchTest.scala | 17 +-
.../unit/kafka/utils/ReplicationUtilsTest.scala | 2 +-
48 files changed, 1521 insertions(+), 515 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 4c6c29e..1f3ba7d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ lib_managed/
src_managed/
project/boot/
project/plugins/project/
+patch-process/*
.idea
.svn
.classpath
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 8523333..4170bcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -26,22 +26,22 @@ import org.apache.kafka.common.utils.Utils;
/**
* This class is used for specifying the set of expected configurations, their type, their defaults, their
* documentation, and any special validation logic used for checking the correctness of the values the user provides.
- * <p>
+ * <p/>
* Usage of this class looks something like this:
- *
+ * <p/>
* <pre>
* ConfigDef defs = new ConfigDef();
* defs.define("config_name", Type.STRING, "default string value", "This configuration is used for blah blah blah.");
* defs.define("another_config_name", Type.INT, 42, Range.atLeast(0), "More documentation on this config");
- *
+ *
* Properties props = new Properties();
* props.setProperty("config_name", "some value");
* Map<String, Object> configs = defs.parse(props);
- *
+ *
* String someConfig = (String) configs.get("config_name"); // will return "some value"
* int anotherConfig = (Integer) configs.get("another_config_name"); // will return default value of 42
* </pre>
- *
+ * <p/>
* This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
* functionality for accessing configs.
*/
@@ -53,7 +53,7 @@ public class ConfigDef {
/**
* Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
- *
+ *
* @return new unmodifiable {@link Set} instance containing the keys
*/
public Set<String> names() {
@@ -62,90 +62,121 @@ public class ConfigDef {
/**
* Define a new configuration
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param defaultValue The default value to use if this config isn't present
- * @param validator A validator to use in checking the correctness of the config
- * @param importance The importance of this config: is this something you will likely need to change.
+ *
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param defaultValue The default value to use if this config isn't present
+ * @param validator A validator to use in checking the correctness of the config
+ * @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
+ * @param required Should the config fail if given property is not set and doesn't have default value specified
* @return This ConfigDef so you can chain calls
*/
- public ConfigDef define(String name,
- Type type,
- Object defaultValue,
- Validator validator,
- Importance importance,
- String documentation) {
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+ boolean required) {
if (configKeys.containsKey(name))
throw new ConfigException("Configuration " + name + " is defined twice.");
- Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE
- : parseType(name, defaultValue, type);
- configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
+ Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+ configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, required));
return this;
}
/**
+ * Define a new required configuration
+ *
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param defaultValue The default value to use if this config isn't present
+ * @param validator A validator to use in checking the correctness of the config
+ * @param importance The importance of this config: is this something you will likely need to change.
+ * @param documentation The documentation string for the config
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+ return define(name, type, defaultValue, validator, importance, documentation, true);
+ }
+
+ /**
* Define a new configuration with no special validation logic
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param defaultValue The default value to use if this config isn't present
- * @param importance The importance of this config: is this something you will likely need to change.
+ *
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param defaultValue The default value to use if this config isn't present
+ * @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
- return define(name, type, defaultValue, null, importance, documentation);
+ return define(name, type, defaultValue, null, importance, documentation, true);
}
/**
* Define a required parameter with no default value
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param validator A validator to use in checking the correctness of the config
- * @param importance The importance of this config: is this something you will likely need to change.
+ *
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param validator A validator to use in checking the correctness of the config
+ * @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
- return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation);
+ return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation, true);
}
/**
* Define a required parameter with no default value and no special validation logic
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param importance The importance of this config: is this something you will likely need to change.
+ *
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Importance importance, String documentation) {
- return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation);
+ return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, true);
}
/**
+ * Define a required parameter with no default value and no special validation logic
+ *
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param importance The importance of this config: is this something you will likely need to change.
+ * @param documentation The documentation string for the config
+ * @param required Should the config fail if given property is not set and doesn't have default value specified
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Importance importance, String documentation, boolean required) {
+ return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
+ }
+
+
+ /**
* Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
* that the keys of the map are strings, but the values can either be strings or they may already be of the
* appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
* programmatically constructed map.
- *
+ *
* @param props The configs to parse and validate
* @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
- * the appropriate type (int, string, etc)
+ * the appropriate type (int, string, etc)
*/
public Map<String, Object> parse(Map<?, ?> props) {
/* parse all known keys */
Map<String, Object> values = new HashMap<String, Object>();
for (ConfigKey key : configKeys.values()) {
Object value;
+ // props map contains setting - assign ConfigKey value
if (props.containsKey(key.name))
value = parseType(key.name, props.get(key.name), key.type);
- else if (key.defaultValue == NO_DEFAULT_VALUE)
- throw new ConfigException("Missing required configuration \"" + key.name
- + "\" which has no default value.");
+ // props map doesn't contain setting, the key is required and no default value specified - it's an error
+ else if (key.defaultValue == NO_DEFAULT_VALUE && key.required)
+ throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+ // props map doesn't contain setting, no default value specified and the key is not required - assign it to null
+ else if (!key.hasDefault() && !key.required)
+ value = null;
+ // otherwise assign setting it's default value
else
value = key.defaultValue;
if (key.validator != null)
@@ -157,10 +188,10 @@ public class ConfigDef {
/**
* Parse a value according to its expected type.
- *
- * @param name The config name
+ *
+ * @param name The config name
* @param value The config value
- * @param type The expected type
+ * @param type The expected type
* @return The parsed object
*/
private Object parseType(String name, Object value, Type type) {
@@ -185,8 +216,7 @@ public class ConfigDef {
if (value instanceof String)
return trimmed;
else
- throw new ConfigException(name, value, "Expected value to be a string, but it was a "
- + value.getClass().getName());
+ throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
case INT:
if (value instanceof Integer) {
return (Integer) value;
@@ -195,6 +225,14 @@ public class ConfigDef {
} else {
throw new ConfigException(name, value, "Expected value to be an number.");
}
+ case SHORT:
+ if (value instanceof Short) {
+ return (Short) value;
+ } else if (value instanceof String) {
+ return Short.parseShort(trimmed);
+ } else {
+ throw new ConfigException(name, value, "Expected value to be an number.");
+ }
case LONG:
if (value instanceof Integer)
return ((Integer) value).longValue();
@@ -242,7 +280,7 @@ public class ConfigDef {
* The config types
*/
public enum Type {
- BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS;
+ BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS;
}
public enum Importance {
@@ -270,7 +308,7 @@ public class ConfigDef {
/**
* A numeric range that checks only the lower bound
- *
+ *
* @param min The minimum acceptable value
*/
public static Range atLeast(Number min) {
@@ -303,7 +341,7 @@ public class ConfigDef {
}
public static class ValidString implements Validator {
- private final List<String> validStrings;
+ List<String> validStrings;
private ValidString(List<String> validStrings) {
this.validStrings = validStrings;
@@ -316,14 +354,15 @@ public class ConfigDef {
@Override
public void ensureValid(String name, Object o) {
String s = (String) o;
- if (!validStrings.contains(s))
+ if (!validStrings.contains(s)) {
throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
+ }
+
}
public String toString() {
return "[" + Utils.join(validStrings, ", ") + "]";
}
-
}
private static class ConfigKey {
@@ -333,13 +372,9 @@ public class ConfigDef {
public final Object defaultValue;
public final Validator validator;
public final Importance importance;
+ public final boolean required;
- public ConfigKey(String name,
- Type type,
- Object defaultValue,
- Validator validator,
- Importance importance,
- String documentation) {
+ public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required) {
super();
this.name = name;
this.type = type;
@@ -349,6 +384,7 @@ public class ConfigDef {
if (this.validator != null)
this.validator.ensureValid(name, defaultValue);
this.documentation = documentation;
+ this.required = required;
}
public boolean hasDefault() {
@@ -408,4 +444,4 @@ public class ConfigDef {
b.append("</table>");
return b.toString();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 77a49e1..37de7df 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions._
import joptsimple.OptionParser
import metrics.KafkaMetricsReporter
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
-import kafka.utils.{CommandLineUtils, Utils, Logging}
+import kafka.utils.{VerifiableProperties, CommandLineUtils, Utils, Logging}
object Kafka extends Logging {
@@ -47,13 +47,13 @@ object Kafka extends Logging {
props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
}
- new KafkaConfig(props)
+ KafkaConfig.fromProps(props)
}
def main(args: Array[String]): Unit = {
try {
val serverConfig = getKafkaConfigFromArgs(args)
- KafkaMetricsReporter.startReporters(serverConfig.props)
+ KafkaMetricsReporter.startReporters(new VerifiableProperties(serverConfig.toProps))
val kafkaServerStartable = new KafkaServerStartable(serverConfig)
// attach shutdown handler to catch control-c
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e9b4dc6..09fc46d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1012,7 +1012,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
// is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
// eventually be restored as the leader.
- if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient,
+ if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 4a31c72..3b15ab4 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
- if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
+ if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +