You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/05 04:16:53 UTC
kafka git commit: KAFKA-2288;
Follow-up to KAFKA-2249 - reduce logging and testing; Reviewd by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 7a666f7aa -> 9cefb2a0f
KAFKA-2288; Follow-up to KAFKA-2249 - reduce logging and testing; Reviewd by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cefb2a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cefb2a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cefb2a0
Branch: refs/heads/trunk
Commit: 9cefb2a0fb7852d35cfe0f051bc6eadb8e9c4c80
Parents: 7a666f7
Author: Gwen Shapira <cs...@gmail.com>
Authored: Tue Aug 4 19:04:58 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Aug 4 19:04:58 2015 -0700
----------------------------------------------------------------------
.../kafka/common/config/AbstractConfig.java | 23 +-
core/src/main/scala/kafka/log/LogConfig.scala | 2 +-
.../scala/unit/kafka/log/LogConfigTest.scala | 22 -
.../kafka/server/KafkaConfigConfigDefTest.scala | 403 -------------------
.../unit/kafka/server/KafkaConfigTest.scala | 154 ++++++-
5 files changed, 175 insertions(+), 429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index ec3ae15..6c31748 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -39,7 +39,7 @@ public class AbstractConfig {
private final Map<String, Object> values;
@SuppressWarnings("unchecked")
- public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
+ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Boolean doLog) {
/* check that all the keys are really strings */
for (Object key : originals.keySet())
if (!(key instanceof String))
@@ -47,7 +47,12 @@ public class AbstractConfig {
this.originals = (Map<String, ?>) originals;
this.values = definition.parse(this.originals);
this.used = Collections.synchronizedSet(new HashSet<String>());
- logAll();
+ if (doLog)
+ logAll();
+ }
+
+ public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
+ this(definition, originals, true);
}
protected Object get(String key) {
@@ -167,4 +172,18 @@ public class AbstractConfig {
return objects;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AbstractConfig that = (AbstractConfig) o;
+
+ return originals.equals(that.originals);
+ }
+
+ @Override
+ public int hashCode() {
+ return originals.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index fc41132..c969d16 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -46,7 +46,7 @@ object Defaults {
val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
}
-case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
+case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
val segmentSize = getInt(LogConfig.SegmentBytesProp)
val segmentMs = getLong(LogConfig.SegmentMsProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 19dcb47..72e98b3 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -33,28 +33,6 @@ class LogConfigTest extends JUnit3Suite {
}
@Test
- def testFromPropsToProps() {
- import scala.util.Random._
- val expected = new Properties()
- LogConfig.configNames().foreach((name) => {
- name match {
- case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer", "uncompressed", "gzip"))
- case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete))
- case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
- case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
- case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
- case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
- case LogConfig.PreAllocateEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
- }
- })
-
- val actual = LogConfig(expected).originals
- Assert.assertEquals(expected, actual)
- }
-
- @Test
def testFromPropsInvalid() {
LogConfig.configNames().foreach((name) => {
name match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
deleted file mode 100644
index 04a02e0..0000000
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.Properties
-
-import kafka.api.ApiVersion
-import kafka.message._
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{Assert, Test}
-import org.scalatest.junit.JUnit3Suite
-
-import scala.collection.Map
-import scala.util.Random._
-
-class KafkaConfigConfigDefTest extends JUnit3Suite {
-
- @Test
- def testFromPropsEmpty() {
- // only required
- val p = new Properties()
- p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
- val actualConfig = KafkaConfig.fromProps(p)
-
- val expectedConfig = new KafkaConfig(p)
-
- Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect)
- Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs)
- Assert.assertEquals(expectedConfig.zkConnectionTimeoutMs, actualConfig.zkConnectionTimeoutMs)
- Assert.assertEquals(expectedConfig.zkSyncTimeMs, actualConfig.zkSyncTimeMs)
- Assert.assertEquals(expectedConfig.maxReservedBrokerId, actualConfig.maxReservedBrokerId)
- Assert.assertEquals(expectedConfig.brokerId, actualConfig.brokerId)
- Assert.assertEquals(expectedConfig.messageMaxBytes, actualConfig.messageMaxBytes)
- Assert.assertEquals(expectedConfig.numNetworkThreads, actualConfig.numNetworkThreads)
- Assert.assertEquals(expectedConfig.numIoThreads, actualConfig.numIoThreads)
- Assert.assertEquals(expectedConfig.backgroundThreads, actualConfig.backgroundThreads)
- Assert.assertEquals(expectedConfig.queuedMaxRequests, actualConfig.queuedMaxRequests)
-
- Assert.assertEquals(expectedConfig.port, actualConfig.port)
- Assert.assertEquals(expectedConfig.hostName, actualConfig.hostName)
- Assert.assertEquals(expectedConfig.advertisedHostName, actualConfig.advertisedHostName)
- Assert.assertEquals(expectedConfig.advertisedPort, actualConfig.advertisedPort)
- Assert.assertEquals(expectedConfig.socketSendBufferBytes, actualConfig.socketSendBufferBytes)
- Assert.assertEquals(expectedConfig.socketReceiveBufferBytes, actualConfig.socketReceiveBufferBytes)
- Assert.assertEquals(expectedConfig.socketRequestMaxBytes, actualConfig.socketRequestMaxBytes)
- Assert.assertEquals(expectedConfig.maxConnectionsPerIp, actualConfig.maxConnectionsPerIp)
- Assert.assertEquals(expectedConfig.maxConnectionsPerIpOverrides, actualConfig.maxConnectionsPerIpOverrides)
- Assert.assertEquals(expectedConfig.connectionsMaxIdleMs, actualConfig.connectionsMaxIdleMs)
-
- Assert.assertEquals(expectedConfig.numPartitions, actualConfig.numPartitions)
- Assert.assertEquals(expectedConfig.logDirs, actualConfig.logDirs)
-
- Assert.assertEquals(expectedConfig.logSegmentBytes, actualConfig.logSegmentBytes)
-
- Assert.assertEquals(expectedConfig.logRollTimeMillis, actualConfig.logRollTimeMillis)
- Assert.assertEquals(expectedConfig.logRollTimeJitterMillis, actualConfig.logRollTimeJitterMillis)
- Assert.assertEquals(expectedConfig.logRetentionTimeMillis, actualConfig.logRetentionTimeMillis)
-
- Assert.assertEquals(expectedConfig.logRetentionBytes, actualConfig.logRetentionBytes)
- Assert.assertEquals(expectedConfig.logCleanupIntervalMs, actualConfig.logCleanupIntervalMs)
- Assert.assertEquals(expectedConfig.logCleanupPolicy, actualConfig.logCleanupPolicy)
- Assert.assertEquals(expectedConfig.logCleanerThreads, actualConfig.logCleanerThreads)
- Assert.assertEquals(expectedConfig.logCleanerIoMaxBytesPerSecond, actualConfig.logCleanerIoMaxBytesPerSecond, 0.0)
- Assert.assertEquals(expectedConfig.logCleanerDedupeBufferSize, actualConfig.logCleanerDedupeBufferSize)
- Assert.assertEquals(expectedConfig.logCleanerIoBufferSize, actualConfig.logCleanerIoBufferSize)
- Assert.assertEquals(expectedConfig.logCleanerDedupeBufferLoadFactor, actualConfig.logCleanerDedupeBufferLoadFactor, 0.0)
- Assert.assertEquals(expectedConfig.logCleanerBackoffMs, actualConfig.logCleanerBackoffMs)
- Assert.assertEquals(expectedConfig.logCleanerMinCleanRatio, actualConfig.logCleanerMinCleanRatio, 0.0)
- Assert.assertEquals(expectedConfig.logCleanerEnable, actualConfig.logCleanerEnable)
- Assert.assertEquals(expectedConfig.logCleanerDeleteRetentionMs, actualConfig.logCleanerDeleteRetentionMs)
- Assert.assertEquals(expectedConfig.logIndexSizeMaxBytes, actualConfig.logIndexSizeMaxBytes)
- Assert.assertEquals(expectedConfig.logIndexIntervalBytes, actualConfig.logIndexIntervalBytes)
- Assert.assertEquals(expectedConfig.logFlushIntervalMessages, actualConfig.logFlushIntervalMessages)
- Assert.assertEquals(expectedConfig.logDeleteDelayMs, actualConfig.logDeleteDelayMs)
- Assert.assertEquals(expectedConfig.logFlushSchedulerIntervalMs, actualConfig.logFlushSchedulerIntervalMs)
- Assert.assertEquals(expectedConfig.logFlushIntervalMs, actualConfig.logFlushIntervalMs)
- Assert.assertEquals(expectedConfig.logFlushOffsetCheckpointIntervalMs, actualConfig.logFlushOffsetCheckpointIntervalMs)
- Assert.assertEquals(expectedConfig.numRecoveryThreadsPerDataDir, actualConfig.numRecoveryThreadsPerDataDir)
- Assert.assertEquals(expectedConfig.autoCreateTopicsEnable, actualConfig.autoCreateTopicsEnable)
-
- Assert.assertEquals(expectedConfig.minInSyncReplicas, actualConfig.minInSyncReplicas)
-
- Assert.assertEquals(expectedConfig.controllerSocketTimeoutMs, actualConfig.controllerSocketTimeoutMs)
- Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor)
- Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs)
- Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs)
- Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes)
- Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes)
- Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs)
- Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes)
- Assert.assertEquals(expectedConfig.replicaFetchBackoffMs, actualConfig.replicaFetchBackoffMs)
- Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers)
- Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs)
- Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests)
- Assert.assertEquals(expectedConfig.producerPurgatoryPurgeIntervalRequests, actualConfig.producerPurgatoryPurgeIntervalRequests)
- Assert.assertEquals(expectedConfig.autoLeaderRebalanceEnable, actualConfig.autoLeaderRebalanceEnable)
- Assert.assertEquals(expectedConfig.leaderImbalancePerBrokerPercentage, actualConfig.leaderImbalancePerBrokerPercentage)
- Assert.assertEquals(expectedConfig.leaderImbalanceCheckIntervalSeconds, actualConfig.leaderImbalanceCheckIntervalSeconds)
- Assert.assertEquals(expectedConfig.uncleanLeaderElectionEnable, actualConfig.uncleanLeaderElectionEnable)
-
- Assert.assertEquals(expectedConfig.controlledShutdownMaxRetries, actualConfig.controlledShutdownMaxRetries)
- Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs)
- Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable)
-
- Assert.assertEquals(expectedConfig.consumerMinSessionTimeoutMs, actualConfig.consumerMinSessionTimeoutMs)
- Assert.assertEquals(expectedConfig.consumerMaxSessionTimeoutMs, actualConfig.consumerMaxSessionTimeoutMs)
-
- Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize)
- Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize)
- Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor)
- Assert.assertEquals(expectedConfig.offsetsTopicPartitions, actualConfig.offsetsTopicPartitions)
- Assert.assertEquals(expectedConfig.offsetsTopicSegmentBytes, actualConfig.offsetsTopicSegmentBytes)
- Assert.assertEquals(expectedConfig.offsetsTopicCompressionCodec, actualConfig.offsetsTopicCompressionCodec)
- Assert.assertEquals(expectedConfig.offsetsRetentionMinutes, actualConfig.offsetsRetentionMinutes)
- Assert.assertEquals(expectedConfig.offsetsRetentionCheckIntervalMs, actualConfig.offsetsRetentionCheckIntervalMs)
- Assert.assertEquals(expectedConfig.offsetCommitTimeoutMs, actualConfig.offsetCommitTimeoutMs)
- Assert.assertEquals(expectedConfig.offsetCommitRequiredAcks, actualConfig.offsetCommitRequiredAcks)
-
- Assert.assertEquals(expectedConfig.deleteTopicEnable, actualConfig.deleteTopicEnable)
- Assert.assertEquals(expectedConfig.compressionType, actualConfig.compressionType)
- }
-
- private def atLeastXIntProp(x: Int): String = (nextInt(Int.MaxValue - x) + x).toString
-
- private def atLeastOneIntProp: String = atLeastXIntProp(1)
-
- private def inRangeIntProp(fromInc: Int, toInc: Int): String = (nextInt(toInc + 1 - fromInc) + fromInc).toString
-
- @Test
- def testFromPropsToProps() {
- import scala.util.Random._
- val expected = new Properties()
- KafkaConfig.configNames().foreach(name => {
- name match {
- case KafkaConfig.ZkConnectProp => expected.setProperty(name, "127.0.0.1:2181")
- case KafkaConfig.ZkSessionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.ZkConnectionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.ZkSyncTimeMsProp => expected.setProperty(name, atLeastOneIntProp)
-
- case KafkaConfig.NumNetworkThreadsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.NumIoThreadsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.BackgroundThreadsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.QueuedMaxRequestsProp => expected.setProperty(name, atLeastOneIntProp)
-
- case KafkaConfig.PortProp => expected.setProperty(name, "1234")
- case KafkaConfig.HostNameProp => expected.setProperty(name, "hostname")
- case KafkaConfig.ListenersProp => expected.setProperty(name, "PLAINTEXT://:9092")
- case KafkaConfig.AdvertisedHostNameProp => expected.setProperty(name, "advertised.hostname")
- case KafkaConfig.AdvertisedPortProp => expected.setProperty(name, "4321")
- case KafkaConfig.AdvertisedListenersProp => expected.setProperty(name, "PLAINTEXT://:2909")
- case KafkaConfig.SocketRequestMaxBytesProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.MaxConnectionsPerIpProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.MaxConnectionsPerIpOverridesProp => expected.setProperty(name, "127.0.0.1:2, 127.0.0.2:3")
-
- case KafkaConfig.NumPartitionsProp => expected.setProperty(name, "2")
- case KafkaConfig.LogDirsProp => expected.setProperty(name, "/tmp/logs,/tmp/logs2")
- case KafkaConfig.LogDirProp => expected.setProperty(name, "/tmp/log")
- case KafkaConfig.LogSegmentBytesProp => expected.setProperty(name, atLeastXIntProp(Message.MinHeaderSize))
-
- case KafkaConfig.LogRollTimeMillisProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.LogRollTimeHoursProp => expected.setProperty(name, atLeastOneIntProp)
-
- case KafkaConfig.LogRetentionTimeMillisProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.LogRetentionTimeMinutesProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.LogRetentionTimeHoursProp => expected.setProperty(name, atLeastOneIntProp)
-
- case KafkaConfig.LogCleanupIntervalMsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.LogCleanupPolicyProp => expected.setProperty(name, randFrom(Defaults.Compact, Defaults.Delete))
- case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
- case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
- case KafkaConfig.LogCleanerMinCleanRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
- case KafkaConfig.LogCleanerEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case KafkaConfig.LogIndexSizeMaxBytesProp => expected.setProperty(name, atLeastXIntProp(4))
- case KafkaConfig.LogFlushIntervalMessagesProp => expected.setProperty(name, atLeastOneIntProp)
-
- case KafkaConfig.NumRecoveryThreadsPerDataDirProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.AutoCreateTopicsEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case KafkaConfig.LogPreAllocateProp => expected.setProperty(name, randFrom("true", "false"))
- case KafkaConfig.InterBrokerSecurityProtocolProp => expected.setProperty(name, SecurityProtocol.PLAINTEXT.toString)
- case KafkaConfig.InterBrokerProtocolVersionProp => expected.setProperty(name, ApiVersion.latestVersion.toString)
-
- case KafkaConfig.ControlledShutdownEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case KafkaConfig.OffsetsLoadBufferSizeProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.OffsetsTopicPartitionsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.OffsetsTopicSegmentBytesProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.OffsetsTopicCompressionCodecProp => expected.setProperty(name, randFrom(GZIPCompressionCodec.codec.toString,
- SnappyCompressionCodec.codec.toString, LZ4CompressionCodec.codec.toString, NoCompressionCodec.codec.toString))
- case KafkaConfig.OffsetsRetentionMinutesProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.OffsetCommitTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
- case KafkaConfig.DeleteTopicEnableProp => expected.setProperty(name, randFrom("true", "false"))
-
- // explicit, non trivial validations or with transient dependencies
-
- // require(brokerId >= -1 && brokerId <= maxReservedBrokerId)
- case KafkaConfig.MaxReservedBrokerIdProp => expected.setProperty(name, "100")
- case KafkaConfig.BrokerIdProp => expected.setProperty(name, inRangeIntProp(0, 100))
- // require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024)
- case KafkaConfig.LogCleanerThreadsProp => expected.setProperty(name, "2")
- case KafkaConfig.LogCleanerDedupeBufferSizeProp => expected.setProperty(name, (1024 * 1024 * 3 + 1).toString)
- // require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs)
- case KafkaConfig.ReplicaFetchWaitMaxMsProp => expected.setProperty(name, "321")
- case KafkaConfig.ReplicaSocketTimeoutMsProp => expected.setProperty(name, atLeastXIntProp(321))
- // require(replicaFetchMaxBytes >= messageMaxBytes)
- case KafkaConfig.MessageMaxBytesProp => expected.setProperty(name, "1234")
- case KafkaConfig.ReplicaFetchMaxBytesProp => expected.setProperty(name, atLeastXIntProp(1234))
- // require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs)
- case KafkaConfig.ReplicaLagTimeMaxMsProp => expected.setProperty(name, atLeastXIntProp(321))
- //require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor)
- case KafkaConfig.OffsetCommitRequiredAcksProp => expected.setProperty(name, "-1")
- case KafkaConfig.OffsetsTopicReplicationFactorProp => expected.setProperty(name, inRangeIntProp(1, Short.MaxValue))
- //BrokerCompressionCodec.isValid(compressionType)
- case KafkaConfig.CompressionTypeProp => expected.setProperty(name, randFrom(BrokerCompressionCodec.brokerCompressionOptions))
-
- case KafkaConfig.MetricNumSamplesProp => expected.setProperty(name, "2")
- case KafkaConfig.MetricSampleWindowMsProp => expected.setProperty(name, "1000")
- case KafkaConfig.MetricReporterClassesProp => expected.setProperty(name, "")
-
- case nonNegativeIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
- }
- })
-
- val actual = KafkaConfig.fromProps(expected).originals
- Assert.assertEquals(expected, actual)
- }
-
- @Test
- def testFromPropsInvalid() {
- def getBaseProperties(): Properties = {
- val validRequiredProperties = new Properties()
- validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1")
- validRequiredProperties
- }
- // to ensure a basis is valid - bootstraps all needed validation
- KafkaConfig.fromProps(getBaseProperties())
-
- KafkaConfig.configNames().foreach(name => {
- name match {
- case KafkaConfig.ZkConnectProp => // ignore string
- case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
-
- case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
-
- case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.HostNameProp => // ignore string
- case KafkaConfig.AdvertisedHostNameProp => //ignore string
- case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
- assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number")
- case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
-
- case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.LogDirsProp => // ignore string
- case KafkaConfig.LogDirProp => // ignore string
- case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinHeaderSize - 1)
-
- case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
-
- case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
-
- case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0")
- case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024")
- case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
- case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
- case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
- case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
- case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
- case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
- case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
- case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
- case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
- case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
- case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
-
- case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
-
- case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
- case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
- case KafkaConfig.MetricReporterClassesProp => // ignore string
-
- case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
- }
- })
- }
-
- @Test
- def testSpecificProperties(): Unit = {
- val defaults = new Properties()
- defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
- // For ZkConnectionTimeoutMs
- defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
- defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
- defaults.put(KafkaConfig.BrokerIdProp, "1")
- defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
- defaults.put(KafkaConfig.PortProp, "1122")
- defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
- defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
- defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
- defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
- defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
- //For LogFlushIntervalMsProp
- defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
- defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
-
- val config = KafkaConfig.fromProps(defaults)
- Assert.assertEquals("127.0.0.1:2181", config.zkConnect)
- Assert.assertEquals(1234, config.zkConnectionTimeoutMs)
- Assert.assertEquals(1, config.maxReservedBrokerId)
- Assert.assertEquals(1, config.brokerId)
- Assert.assertEquals("127.0.0.1", config.hostName)
- Assert.assertEquals(1122, config.advertisedPort)
- Assert.assertEquals("127.0.0.1", config.advertisedHostName)
- Assert.assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
- Assert.assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
- Assert.assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
- Assert.assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
- Assert.assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
- Assert.assertEquals(123L, config.logFlushIntervalMs)
- Assert.assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
- }
-
- private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
- values.foreach((value) => {
- val props = validRequiredProps
- props.setProperty(name, value.toString)
- intercept[Exception] {
- KafkaConfig.fromProps(props)
- }
- })
- }
-
- private def randFrom[T](choices: T*): T = {
- import scala.util.Random
- choices(Random.nextInt(choices.size))
- }
-
- private def randFrom[T](choices: List[T]): T = {
- import scala.util.Random
- choices(Random.nextInt(choices.size))
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d354452..f32d206 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -21,10 +21,11 @@ import java.util.Properties
import junit.framework.Assert._
import kafka.api.{ApiVersion, KAFKA_082}
+import kafka.message._
import kafka.utils.{TestUtils, CoreUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.Test
+import org.junit.{Assert, Test}
import org.scalatest.junit.JUnit3Suite
class KafkaConfigTest extends JUnit3Suite {
@@ -380,4 +381,155 @@ class KafkaConfigTest extends JUnit3Suite {
KafkaConfig.fromProps(props)
}
}
+
+ @Test
+ def testFromPropsInvalid() {
+ def getBaseProperties(): Properties = {
+ val validRequiredProperties = new Properties()
+ validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ validRequiredProperties
+ }
+ // to ensure a basis is valid - bootstraps all needed validation
+ KafkaConfig.fromProps(getBaseProperties())
+
+ KafkaConfig.configNames().foreach(name => {
+ name match {
+ case KafkaConfig.ZkConnectProp => // ignore string
+ case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+
+ case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
+ case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.HostNameProp => // ignore string
+ case KafkaConfig.AdvertisedHostNameProp => //ignore string
+ case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
+ assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number")
+ case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+
+ case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.LogDirsProp => // ignore string
+ case KafkaConfig.LogDirProp => // ignore string
+ case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinHeaderSize - 1)
+
+ case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
+ case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
+ case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0")
+ case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024")
+ case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
+ case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
+ case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+ case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
+ case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+ case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+ case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+ case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+ case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
+
+ case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+
+ case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
+ case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
+ case KafkaConfig.MetricReporterClassesProp => // ignore string
+
+ case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+ }
+ })
+ }
+
+ @Test
+ def testSpecificProperties(): Unit = {
+ val defaults = new Properties()
+ defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ // For ZkConnectionTimeoutMs
+ defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+ defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
+ defaults.put(KafkaConfig.BrokerIdProp, "1")
+ defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
+ defaults.put(KafkaConfig.PortProp, "1122")
+ defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
+ defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
+ defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
+ defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
+ defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
+ //For LogFlushIntervalMsProp
+ defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
+ defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
+
+ val config = KafkaConfig.fromProps(defaults)
+ Assert.assertEquals("127.0.0.1:2181", config.zkConnect)
+ Assert.assertEquals(1234, config.zkConnectionTimeoutMs)
+ Assert.assertEquals(1, config.maxReservedBrokerId)
+ Assert.assertEquals(1, config.brokerId)
+ Assert.assertEquals("127.0.0.1", config.hostName)
+ Assert.assertEquals(1122, config.advertisedPort)
+ Assert.assertEquals("127.0.0.1", config.advertisedHostName)
+ Assert.assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
+ Assert.assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
+ Assert.assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
+ Assert.assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
+ Assert.assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
+ Assert.assertEquals(123L, config.logFlushIntervalMs)
+ Assert.assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
+ }
+
+ private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
+ values.foreach((value) => {
+ val props = validRequiredProps
+ props.setProperty(name, value.toString)
+ intercept[Exception] {
+ KafkaConfig.fromProps(props)
+ }
+ })
+ }
+
}