You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/25 23:36:52 UTC
kafka git commit: kafka-1667; topic-level configuration not validated;
patched by Dmytro Kostiuchenko; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 9f8b8dad2 -> 834b64198
kafka-1667; topic-level configuration not validated; patched by Dmytro Kostiuchenko; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/834b6419
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/834b6419
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/834b6419
Branch: refs/heads/trunk
Commit: 834b6419806750ff555e8ffd104caa420004e84e
Parents: 9f8b8da
Author: Dmytro Kostiuchenko <ed...@archlinux.us>
Authored: Tue Nov 25 14:36:31 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Nov 25 14:36:31 2014 -0800
----------------------------------------------------------------------
.../apache/kafka/common/config/ConfigDef.java | 9 ++
.../main/scala/kafka/admin/TopicCommand.scala | 2 +-
core/src/main/scala/kafka/log/LogConfig.scala | 144 +++++++++++--------
core/src/main/scala/kafka/utils/Utils.scala | 26 ++++
.../test/scala/kafka/log/LogConfigTest.scala | 93 ++++++++++++
.../integration/UncleanLeaderElectionTest.scala | 4 +-
6 files changed, 218 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index c4cea2c..347e252 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -19,6 +19,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* This class is used for specifying the set of expected configurations, their type, their defaults, their
@@ -49,6 +50,14 @@ public class ConfigDef {
private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
/**
+ * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
+ * @return new unmodifiable {@link Set} instance containing the keys
+ */
+ public Set<String> names() {
+ return Collections.unmodifiableSet(configKeys.keySet());
+ }
+
+ /**
* Define a new configuration
* @param name The name of the config parameter
* @param type The type of the config
http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 0b2735e..285c033 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -256,7 +256,7 @@ object TopicCommand {
.ofType(classOf[String])
val nl = System.getProperty("line.separator")
val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
- "The following is a list of valid configurations: " + nl + LogConfig.ConfigNames.map("\t" + _).mkString(nl) + nl +
+ "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
"See the Kafka documentation for full details on the topic configs.")
.withRequiredArg
.describedAs("name=value")
http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index e48922a..f2fbc55 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,7 +21,7 @@ import java.util.Properties
import org.apache.kafka.common.utils.Utils
import scala.collection._
-import kafka.common._
+import org.apache.kafka.common.config.ConfigDef
object Defaults {
val SegmentSize = 1024 * 1024
@@ -44,21 +44,20 @@ object Defaults {
/**
* Configuration settings for a log
- * @param segmentSize The soft maximum for the size of a segment file in the log
+ * @param segmentSize The hard maximum for the size of a segment file in the log
* @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
* @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment rolling
* @param flushInterval The number of messages that can be written to the log before a flush is forced
* @param flushMs The amount of time the log can have dirty data before a flush is forced
* @param retentionSize The approximate total number of bytes this log can use
- * @param retentionMs The age approximate maximum age of the last segment that is retained
+ * @param retentionMs The approximate maximum age of the last segment that is retained
* @param maxIndexSize The maximum size of an index file
* @param indexInterval The approximate number of bytes between index entries
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
* @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param compact Should old segments in this log be deleted or deduplicated?
- * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property
- * but included here for topic-specific configuration validation purposes
+ * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
* @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
*
*/
@@ -106,6 +105,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
}
object LogConfig {
+
+ val Delete = "delete"
+ val Compact = "compact"
+
val SegmentBytesProp = "segment.bytes"
val SegmentMsProp = "segment.ms"
val SegmentJitterMsProp = "segment.jitter.ms"
@@ -123,46 +126,84 @@ object LogConfig {
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
- val ConfigNames = Set(SegmentBytesProp,
- SegmentMsProp,
- SegmentJitterMsProp,
- SegmentIndexBytesProp,
- FlushMessagesProp,
- FlushMsProp,
- RetentionBytesProp,
- RententionMsProp,
- MaxMessageBytesProp,
- IndexIntervalBytesProp,
- FileDeleteDelayMsProp,
- DeleteRetentionMsProp,
- MinCleanableDirtyRatioProp,
- CleanupPolicyProp,
- UncleanLeaderElectionEnableProp,
- MinInSyncReplicasProp)
+ val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
+ val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
+ val SegmentJitterMsDoc = "The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment" +
+ " rolling"
+ val FlushIntervalDoc = "The number of messages that can be written to the log before a flush is forced"
+ val FlushMsDoc = "The amount of time the log can have dirty data before a flush is forced"
+ val RetentionSizeDoc = "The approximate total number of bytes this log can use"
+ val RetentionMsDoc = "The approximate maximum age of the last segment that is retained"
+ val MaxIndexSizeDoc = "The maximum size of an index file"
+ val MaxMessageSizeDoc = "The maximum size of a message"
+ val IndexIntervalDoc = "The approximate number of bytes between index entries"
+ val FileDeleteDelayMsDoc = "The time to wait before deleting a file from the filesystem"
+ val DeleteRetentionMsDoc = "The time to retain delete markers in the log. Only applicable for logs that are being" +
+ " compacted."
+ val MinCleanableRatioDoc = "The ratio of bytes that are available for cleaning to the bytes already cleaned"
+ val CompactDoc = "Should old segments in this log be deleted or deduplicated?"
+ val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled"
+ val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop accepting writes with" +
+ " -1 (or all) required acks"
+
+ private val configDef = {
+ import ConfigDef.Range._
+ import ConfigDef.ValidString._
+ import ConfigDef.Type._
+ import ConfigDef.Importance._
+ import java.util.Arrays.asList
+
+ new ConfigDef()
+ .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(0), MEDIUM, SegmentSizeDoc)
+ .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, SegmentMsDoc)
+ .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc)
+ .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc)
+ .define(FlushMessagesProp, LONG, Defaults.FlushInterval, atLeast(0), MEDIUM, FlushIntervalDoc)
+ .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc)
+ // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize
+ .define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc)
+ .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc)
+ .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc)
+ .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM, IndexIntervalDoc)
+ .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM, DeleteRetentionMsDoc)
+ .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc)
+ .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
+ MinCleanableRatioDoc)
+ .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(asList(Compact, Delete)), MEDIUM,
+ CompactDoc)
+ // we validate true/false explicitly to fail in case of typo
+ .define(UncleanLeaderElectionEnableProp, STRING, Defaults.UncleanLeaderElectionEnable.toString,
+ in(asList(true.toString, false.toString)), MEDIUM, UncleanLeaderElectionEnableDoc)
+ .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
+ }
+
+ def configNames() = {
+ import JavaConversions._
+ configDef.names().toList.sorted
+ }
/**
* Parse the given properties instance into a LogConfig object
*/
def fromProps(props: Properties): LogConfig = {
- new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt,
- segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong,
- segmentJitterMs = props.getProperty(SegmentJitterMsProp, Defaults.SegmentJitterMs.toString).toLong,
- maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt,
- flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong,
- flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong,
- retentionSize = props.getProperty(RetentionBytesProp, Defaults.RetentionSize.toString).toLong,
- retentionMs = props.getProperty(RententionMsProp, Defaults.RetentionMs.toString).toLong,
- maxMessageSize = props.getProperty(MaxMessageBytesProp, Defaults.MaxMessageSize.toString).toInt,
- indexInterval = props.getProperty(IndexIntervalBytesProp, Defaults.IndexInterval.toString).toInt,
- fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString).toInt,
- deleteRetentionMs = props.getProperty(DeleteRetentionMsProp, Defaults.DeleteRetentionMs.toString).toLong,
- minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp,
- Defaults.MinCleanableDirtyRatio.toString).toDouble,
- compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete")
- .trim.toLowerCase != "delete",
- uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp,
- Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
- minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
+ import kafka.utils.Utils.evaluateDefaults
+ val parsed = configDef.parse(evaluateDefaults(props))
+ new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
+ segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],
+ segmentJitterMs = parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
+ maxIndexSize = parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
+ flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
+ flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
+ retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
+ retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long],
+ maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
+ indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
+ fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],
+ deleteRetentionMs = parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
+ minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
+ compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete,
+ uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[String].toBoolean,
+ minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int])
}
/**
@@ -179,30 +220,17 @@ object LogConfig {
*/
def validateNames(props: Properties) {
import JavaConversions._
+ val names = configDef.names()
for(name <- props.keys)
- require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
+ require(names.contains(name), "Unknown configuration \"%s\".".format(name))
}
/**
- * Check that the given properties contain only valid log config names, and that all values can be parsed.
+ * Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
def validate(props: Properties) {
validateNames(props)
- validateMinInSyncReplicas(props)
- LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
- }
-
- /**
- * Check that MinInSyncReplicas is reasonable
- * Unfortunately, we can't validate its smaller than number of replicas
- * since we don't have this information here
- */
- private def validateMinInSyncReplicas(props: Properties) {
- val minIsr = props.getProperty(MinInSyncReplicasProp)
- if (minIsr != null && minIsr.toInt < 1) {
- throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas in topic configuration; " +
- " Valid values are at least 1")
- }
+ configDef.parse(props)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 23aefb4..58685cc 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -505,6 +505,32 @@ object Utils extends Logging {
props.store(writer, "")
writer.toString
}
+
+ /**
+ * Turn {@linkplain java.util.Properties} with default values into a {@linkplain java.util.Map}. Following example
+ * illustrates difference from the cast
+ * <pre>
+ * val defaults = new Properties()
+ * defaults.put("foo", "bar")
+ * val props = new Properties(defaults)
+ *
+ * props.getProperty("foo") // "bar"
+ * props.get("foo") // null
+ * evaluateDefaults(props).get("foo") // "bar"
+ * </pre>
+ *
+ * @param props properties to evaluate
+ * @return new java.util.Map instance
+ */
+ def evaluateDefaults(props: Properties): java.util.Map[String, String] = {
+ import java.util._
+ import JavaConversions.asScalaSet
+ val evaluated = new HashMap[String, String]()
+ for (name <- props.stringPropertyNames()) {
+ evaluated.put(name, props.getProperty(name))
+ }
+ evaluated
+ }
/**
* Read some properties with the given default values
http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/test/scala/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala
new file mode 100644
index 0000000..99b0df7
--- /dev/null
+++ b/core/src/test/scala/kafka/log/LogConfigTest.scala
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import org.apache.kafka.common.config.ConfigException
+import org.scalatest.junit.JUnit3Suite
+import org.junit.{Assert, Test}
+import java.util.Properties
+
+class LogConfigTest extends JUnit3Suite {
+
+ @Test
+ def testFromPropsDefaults() {
+ val defaults = new Properties()
+ defaults.put(LogConfig.SegmentBytesProp, "4242")
+ val props = new Properties(defaults)
+
+ val config = LogConfig.fromProps(props)
+
+ Assert.assertEquals(4242, config.segmentSize)
+ Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize)
+ }
+
+ @Test
+ def testFromPropsEmpty() {
+ val p = new Properties()
+ val config = LogConfig.fromProps(p)
+ Assert.assertEquals(LogConfig(), config)
+ }
+
+ @Test
+ def testFromPropsToProps() {
+ import scala.util.Random._
+ val expected = new Properties()
+ LogConfig.configNames().foreach((name) => {
+ name match {
+ case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
+ case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete))
+ case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
+ case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
+ case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
+ case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
+ }
+ })
+
+ val actual = LogConfig.fromProps(expected).toProps
+ Assert.assertEquals(expected, actual)
+ }
+
+ @Test
+ def testFromPropsInvalid() {
+ LogConfig.configNames().foreach((name) => {
+ name match {
+ case LogConfig.UncleanLeaderElectionEnableProp => return
+ case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
+ case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
+ case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
+ case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
+ case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+ }
+ })
+ }
+
+ private def assertPropertyInvalid(name: String, values: AnyRef*) {
+ values.foreach((value) => {
+ val props = new Properties
+ props.setProperty(name, value.toString)
+ intercept[ConfigException] {
+ LogConfig.fromProps(props)
+ }
+ })
+ }
+
+ private def randFrom[T](choices: T*): T = {
+ import scala.util.Random
+ choices(Random.nextInt(choices.size))
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index f44568c..ba3bcdc 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -17,6 +17,8 @@
package kafka.integration
+import org.apache.kafka.common.config.ConfigException
+
import scala.collection.mutable.MutableList
import scala.util.Random
import org.apache.log4j.{Level, Logger}
@@ -155,7 +157,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val topicProps = new Properties()
topicProps.put("unclean.leader.election.enable", "invalid")
- intercept[IllegalArgumentException] {
+ intercept[ConfigException] {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
}
}