You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/18 23:07:40 UTC
[2/2] kafka git commit: kafka-2249;
KafkaConfig does not preserve original Properties; patched by Gwen Shapira;
reviewed by Jun Rao
kafka-2249; KafkaConfig does not preserve original Properties; patched by Gwen Shapira; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c904074
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c904074
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c904074
Branch: refs/heads/trunk
Commit: 5c9040745466945a04ea0315de583ccdab0614ac
Parents: ba86f0a
Author: Gwen Shapira <cs...@gmail.com>
Authored: Thu Jun 18 14:07:33 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jun 18 14:07:33 2015 -0700
----------------------------------------------------------------------
.../kafka/common/config/AbstractConfig.java | 12 +-
.../main/scala/kafka/cluster/Partition.scala | 2 +-
.../kafka/controller/KafkaController.scala | 4 +-
.../controller/PartitionLeaderSelector.scala | 2 +-
core/src/main/scala/kafka/log/LogConfig.scala | 156 ++---
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 4 +-
.../main/scala/kafka/server/KafkaConfig.scala | 573 +++++--------------
.../main/scala/kafka/server/KafkaServer.scala | 55 +-
.../kafka/server/ReplicaFetcherThread.scala | 4 +-
.../scala/kafka/server/TopicConfigManager.scala | 5 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 26 -
.../test/scala/other/kafka/StressTestLog.scala | 10 +-
.../other/kafka/TestLinearWriteSpeed.scala | 7 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 7 +-
.../test/scala/unit/kafka/log/CleanerTest.scala | 55 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 8 +-
.../scala/unit/kafka/log/LogConfigTest.scala | 19 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 17 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 121 +++-
.../kafka/server/DynamicConfigChangeTest.scala | 17 +-
.../kafka/server/KafkaConfigConfigDefTest.scala | 20 +-
22 files changed, 444 insertions(+), 682 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index c4fa058..bae528d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -57,15 +57,19 @@ public class AbstractConfig {
return values.get(key);
}
- public int getInt(String key) {
+ public Short getShort(String key) {
+ return (Short) get(key);
+ }
+
+ public Integer getInt(String key) {
return (Integer) get(key);
}
- public long getLong(String key) {
+ public Long getLong(String key) {
return (Long) get(key);
}
- public double getDouble(String key) {
+ public Double getDouble(String key) {
return (Double) get(key);
}
@@ -92,7 +96,7 @@ public class AbstractConfig {
return keys;
}
- public Map<String, ?> originals() {
+ public Map<String, Object> originals() {
Map<String, Object> copy = new HashMap<String, Object>();
copy.putAll(originals);
return copy;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 730a232..6cb6477 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -86,7 +86,7 @@ class Partition(val topic: String,
case Some(replica) => replica
case None =>
if (isReplicaLocal(replicaId)) {
- val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
+ val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic))
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 69bba24..3635057 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -325,7 +325,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
- 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
+ 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
@@ -1013,7 +1013,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
// is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
// eventually be restored as the leader.
- if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient,
+ if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3b15ab4..bb6b5c8 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
- if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
+ if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index f64fd79..e9af221 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -18,92 +18,52 @@
package kafka.log
import java.util.Properties
+import kafka.server.KafkaConfig
import org.apache.kafka.common.utils.Utils
import scala.collection._
-import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import kafka.message.BrokerCompressionCodec
import kafka.message.Message
object Defaults {
- val SegmentSize = 1024 * 1024
- val SegmentMs = Long.MaxValue
- val SegmentJitterMs = 0L
- val FlushInterval = Long.MaxValue
- val FlushMs = Long.MaxValue
- val RetentionSize = Long.MaxValue
- val RetentionMs = Long.MaxValue
- val MaxMessageSize = Int.MaxValue
- val MaxIndexSize = 1024 * 1024
- val IndexInterval = 4096
- val FileDeleteDelayMs = 60 * 1000L
- val DeleteRetentionMs = 24 * 60 * 60 * 1000L
- val MinCleanableDirtyRatio = 0.5
- val Compact = false
- val UncleanLeaderElectionEnable = true
- val MinInSyncReplicas = 1
- val CompressionType = "producer"
+ val SegmentSize = kafka.server.Defaults.LogSegmentBytes
+ val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L
+ val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L
+ val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages
+ val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs
+ val RetentionSize = kafka.server.Defaults.LogRetentionBytes
+ val RetentionMs = kafka.server.Defaults.LogRetentionHours * 60 * 60 * 1000L
+ val MaxMessageSize = kafka.server.Defaults.MessageMaxBytes
+ val MaxIndexSize = kafka.server.Defaults.LogIndexSizeMaxBytes
+ val IndexInterval = kafka.server.Defaults.LogIndexIntervalBytes
+ val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
+ val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
+ val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
+ val Compact = kafka.server.Defaults.LogCleanupPolicy
+ val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
+ val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
+ val CompressionType = kafka.server.Defaults.CompressionType
}
-/**
- * Configuration settings for a log
- * @param segmentSize The hard maximum for the size of a segment file in the log
- * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
- * @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment rolling
- * @param flushInterval The number of messages that can be written to the log before a flush is forced
- * @param flushMs The amount of time the log can have dirty data before a flush is forced
- * @param retentionSize The approximate total number of bytes this log can use
- * @param retentionMs The approximate maximum age of the last segment that is retained
- * @param maxIndexSize The maximum size of an index file
- * @param indexInterval The approximate number of bytes between index entries
- * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
- * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
- * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
- * @param compact Should old segments in this log be deleted or deduplicated?
- * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
- * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
- * @param compressionType compressionType for a given topic
- *
- */
-case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
- segmentMs: Long = Defaults.SegmentMs,
- segmentJitterMs: Long = Defaults.SegmentJitterMs,
- flushInterval: Long = Defaults.FlushInterval,
- flushMs: Long = Defaults.FlushMs,
- retentionSize: Long = Defaults.RetentionSize,
- retentionMs: Long = Defaults.RetentionMs,
- maxMessageSize: Int = Defaults.MaxMessageSize,
- maxIndexSize: Int = Defaults.MaxIndexSize,
- indexInterval: Int = Defaults.IndexInterval,
- fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
- deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
- minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
- compact: Boolean = Defaults.Compact,
- uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
- minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
- compressionType: String = Defaults.CompressionType) {
-
- def toProps: Properties = {
- val props = new Properties()
- import LogConfig._
- props.put(SegmentBytesProp, segmentSize.toString)
- props.put(SegmentMsProp, segmentMs.toString)
- props.put(SegmentJitterMsProp, segmentJitterMs.toString)
- props.put(SegmentIndexBytesProp, maxIndexSize.toString)
- props.put(FlushMessagesProp, flushInterval.toString)
- props.put(FlushMsProp, flushMs.toString)
- props.put(RetentionBytesProp, retentionSize.toString)
- props.put(RetentionMsProp, retentionMs.toString)
- props.put(MaxMessageBytesProp, maxMessageSize.toString)
- props.put(IndexIntervalBytesProp, indexInterval.toString)
- props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
- props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
- props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
- props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
- props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
- props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
- props.put(CompressionTypeProp, compressionType)
- props
- }
+case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
+
+ val segmentSize = getInt(LogConfig.SegmentBytesProp)
+ val segmentMs = getLong(LogConfig.SegmentMsProp)
+ val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)
+ val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp)
+ val flushInterval = getLong(LogConfig.FlushMessagesProp)
+ val flushMs = getLong(LogConfig.FlushMsProp)
+ val retentionSize = getLong(LogConfig.RetentionBytesProp)
+ val retentionMs = getLong(LogConfig.RetentionMsProp)
+ val maxMessageSize = getInt(LogConfig.MaxMessageBytesProp)
+ val indexInterval = getInt(LogConfig.IndexIntervalBytesProp)
+ val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
+ val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
+ val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
+ val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase != LogConfig.Delete
+ val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
+ val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
+ val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -111,6 +71,10 @@ case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
object LogConfig {
+ def main(args: Array[String]) {
+ System.out.println(configDef.toHtmlTable)
+ }
+
val Delete = "delete"
val Compact = "compact"
@@ -179,7 +143,7 @@ object LogConfig {
.define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc)
.define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
MinCleanableRatioDoc)
- .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(Compact, Delete), MEDIUM,
+ .define(CleanupPolicyProp, STRING, Defaults.Compact, in(Compact, Delete), MEDIUM,
CompactDoc)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
MEDIUM, UncleanLeaderElectionEnableDoc)
@@ -187,6 +151,8 @@ object LogConfig {
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
}
+ def apply(): LogConfig = LogConfig(new Properties())
+
def configNames() = {
import JavaConversions._
configDef.names().toList.sorted
@@ -194,37 +160,13 @@ object LogConfig {
/**
- * Parse the given properties instance into a LogConfig object
- */
- def fromProps(props: Properties): LogConfig = {
- import kafka.utils.CoreUtils.evaluateDefaults
- val parsed = configDef.parse(evaluateDefaults(props))
- new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
- segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],
- segmentJitterMs = parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
- maxIndexSize = parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
- flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
- flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
- retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
- retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long],
- maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
- indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
- fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],
- deleteRetentionMs = parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
- minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
- compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete,
- uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
- minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
- compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase())
- }
-
- /**
* Create a log config instance using the given properties and defaults
*/
- def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
- val props = new Properties(defaults)
+ def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
+ val props = new Properties()
+ props.putAll(defaults)
props.putAll(overrides)
- fromProps(props)
+ LogConfig(props)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index e781eba..538fc83 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -356,7 +356,7 @@ class LogManager(val logDirs: Array[File],
.format(topicAndPartition.topic,
topicAndPartition.partition,
dataDir.getAbsolutePath,
- {import JavaConversions._; config.toProps.mkString(", ")}))
+ {import JavaConversions._; config.originals.mkString(", ")}))
log
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c7debe4..ad6f058 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -428,9 +428,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.length > 0)
- Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
+ Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
else
- config.offsetsTopicReplicationFactor
+ config.offsetsTopicReplicationFactor.toInt
AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2d75186..e0b2480 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,7 +26,7 @@ import kafka.consumer.ConsumerConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.protocol.SecurityProtocol
import scala.collection.{mutable, immutable, JavaConversions, Map}
@@ -141,6 +141,10 @@ object Defaults {
object KafkaConfig {
+ def main(args: Array[String]) {
+ System.out.println(configDef.toHtmlTable)
+ }
+
/** ********* Zookeeper Configuration ***********/
val ZkConnectProp = "zookeeper.connect"
val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
@@ -482,14 +486,14 @@ object KafkaConfig {
.define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc)
.define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc)
.define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc)
- .define(LeaderImbalanceCheckIntervalSecondsProp, INT, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
+ .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
.define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
/** ********* Controlled shutdown configuration ***********/
.define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
- .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc)
+ .define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc)
.define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc)
/** ********* Consumer coordinator configuration ***********/
@@ -520,139 +524,6 @@ object KafkaConfig {
}
/**
- * Parse the given properties instance into a KafkaConfig object
- */
- def fromProps(props: Properties): KafkaConfig = {
- import kafka.utils.CoreUtils.evaluateDefaults
- val parsed = configDef.parse(evaluateDefaults(props))
- new KafkaConfig(
- /** ********* Zookeeper Configuration ***********/
- zkConnect = parsed.get(ZkConnectProp).asInstanceOf[String],
- zkSessionTimeoutMs = parsed.get(ZkSessionTimeoutMsProp).asInstanceOf[Int],
- _zkConnectionTimeoutMs = Option(parsed.get(ZkConnectionTimeoutMsProp)).map(_.asInstanceOf[Int]),
- zkSyncTimeMs = parsed.get(ZkSyncTimeMsProp).asInstanceOf[Int],
-
- /** ********* General Configuration ***********/
- maxReservedBrokerId = parsed.get(MaxReservedBrokerIdProp).asInstanceOf[Int],
- brokerId = parsed.get(BrokerIdProp).asInstanceOf[Int],
- messageMaxBytes = parsed.get(MessageMaxBytesProp).asInstanceOf[Int],
- numNetworkThreads = parsed.get(NumNetworkThreadsProp).asInstanceOf[Int],
- numIoThreads = parsed.get(NumIoThreadsProp).asInstanceOf[Int],
- backgroundThreads = parsed.get(BackgroundThreadsProp).asInstanceOf[Int],
- queuedMaxRequests = parsed.get(QueuedMaxRequestsProp).asInstanceOf[Int],
-
- /** ********* Socket Server Configuration ***********/
- port = parsed.get(PortProp).asInstanceOf[Int],
- hostName = parsed.get(HostNameProp).asInstanceOf[String],
- _listeners = Option(parsed.get(ListenersProp)).map(_.asInstanceOf[String]),
- _advertisedHostName = Option(parsed.get(AdvertisedHostNameProp)).map(_.asInstanceOf[String]),
- _advertisedPort = Option(parsed.get(AdvertisedPortProp)).map(_.asInstanceOf[Int]),
- _advertisedListeners = Option(parsed.get(AdvertisedListenersProp)).map(_.asInstanceOf[String]),
- socketSendBufferBytes = parsed.get(SocketSendBufferBytesProp).asInstanceOf[Int],
- socketReceiveBufferBytes = parsed.get(SocketReceiveBufferBytesProp).asInstanceOf[Int],
- socketRequestMaxBytes = parsed.get(SocketRequestMaxBytesProp).asInstanceOf[Int],
- maxConnectionsPerIp = parsed.get(MaxConnectionsPerIpProp).asInstanceOf[Int],
- _maxConnectionsPerIpOverrides = parsed.get(MaxConnectionsPerIpOverridesProp).asInstanceOf[String],
- connectionsMaxIdleMs = parsed.get(ConnectionsMaxIdleMsProp).asInstanceOf[Long],
-
- /** ********* Log Configuration ***********/
- numPartitions = parsed.get(NumPartitionsProp).asInstanceOf[Int],
- _logDir = parsed.get(LogDirProp).asInstanceOf[String],
- _logDirs = Option(parsed.get(LogDirsProp)).map(_.asInstanceOf[String]),
-
- logSegmentBytes = parsed.get(LogSegmentBytesProp).asInstanceOf[Int],
- logRollTimeHours = parsed.get(LogRollTimeHoursProp).asInstanceOf[Int],
- _logRollTimeMillis = Option(parsed.get(LogRollTimeMillisProp)).map(_.asInstanceOf[Long]),
-
- logRollTimeJitterHours = parsed.get(LogRollTimeJitterHoursProp).asInstanceOf[Int],
- _logRollTimeJitterMillis = Option(parsed.get(LogRollTimeJitterMillisProp)).map(_.asInstanceOf[Long]),
-
- logRetentionTimeHours = parsed.get(LogRetentionTimeHoursProp).asInstanceOf[Int],
- _logRetentionTimeMins = Option(parsed.get(LogRetentionTimeMinutesProp)).map(_.asInstanceOf[Int]),
- _logRetentionTimeMillis = Option(parsed.get(LogRetentionTimeMillisProp)).map(_.asInstanceOf[Long]),
-
- logRetentionBytes = parsed.get(LogRetentionBytesProp).asInstanceOf[Long],
- logCleanupIntervalMs = parsed.get(LogCleanupIntervalMsProp).asInstanceOf[Long],
- logCleanupPolicy = parsed.get(LogCleanupPolicyProp).asInstanceOf[String],
- logCleanerThreads = parsed.get(LogCleanerThreadsProp).asInstanceOf[Int],
- logCleanerIoMaxBytesPerSecond = parsed.get(LogCleanerIoMaxBytesPerSecondProp).asInstanceOf[Double],
- logCleanerDedupeBufferSize = parsed.get(LogCleanerDedupeBufferSizeProp).asInstanceOf[Long],
- logCleanerIoBufferSize = parsed.get(LogCleanerIoBufferSizeProp).asInstanceOf[Int],
- logCleanerDedupeBufferLoadFactor = parsed.get(LogCleanerDedupeBufferLoadFactorProp).asInstanceOf[Double],
- logCleanerBackoffMs = parsed.get(LogCleanerBackoffMsProp).asInstanceOf[Long],
- logCleanerMinCleanRatio = parsed.get(LogCleanerMinCleanRatioProp).asInstanceOf[Double],
- logCleanerEnable = parsed.get(LogCleanerEnableProp).asInstanceOf[Boolean],
- logCleanerDeleteRetentionMs = parsed.get(LogCleanerDeleteRetentionMsProp).asInstanceOf[Long],
- logIndexSizeMaxBytes = parsed.get(LogIndexSizeMaxBytesProp).asInstanceOf[Int],
- logIndexIntervalBytes = parsed.get(LogIndexIntervalBytesProp).asInstanceOf[Int],
- logFlushIntervalMessages = parsed.get(LogFlushIntervalMessagesProp).asInstanceOf[Long],
- logDeleteDelayMs = parsed.get(LogDeleteDelayMsProp).asInstanceOf[Long],
- logFlushSchedulerIntervalMs = parsed.get(LogFlushSchedulerIntervalMsProp).asInstanceOf[Long],
- _logFlushIntervalMs = Option(parsed.get(LogFlushIntervalMsProp)).map(_.asInstanceOf[Long]),
- logFlushOffsetCheckpointIntervalMs = parsed.get(LogFlushOffsetCheckpointIntervalMsProp).asInstanceOf[Int],
- numRecoveryThreadsPerDataDir = parsed.get(NumRecoveryThreadsPerDataDirProp).asInstanceOf[Int],
- autoCreateTopicsEnable = parsed.get(AutoCreateTopicsEnableProp).asInstanceOf[Boolean],
- minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
-
- /** ********* Replication configuration ***********/
- controllerSocketTimeoutMs = parsed.get(ControllerSocketTimeoutMsProp).asInstanceOf[Int],
- defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int],
- replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long],
- replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int],
- replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int],
- replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int],
- replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int],
- replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int],
- replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Int],
- numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int],
- replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long],
- fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int],
- producerPurgatoryPurgeIntervalRequests = parsed.get(ProducerPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int],
- autoLeaderRebalanceEnable = parsed.get(AutoLeaderRebalanceEnableProp).asInstanceOf[Boolean],
- leaderImbalancePerBrokerPercentage = parsed.get(LeaderImbalancePerBrokerPercentageProp).asInstanceOf[Int],
- leaderImbalanceCheckIntervalSeconds = parsed.get(LeaderImbalanceCheckIntervalSecondsProp).asInstanceOf[Int],
- uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
- interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]),
- interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]),
-
- /** ********* Controlled shutdown configuration ***********/
- controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int],
- controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int],
- controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean],
-
- /** ********* Consumer coordinator configuration ***********/
- consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int],
- consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int],
-
- /** ********* Offset management configuration ***********/
- offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int],
- offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int],
- offsetsTopicReplicationFactor = parsed.get(OffsetsTopicReplicationFactorProp).asInstanceOf[Short],
- offsetsTopicPartitions = parsed.get(OffsetsTopicPartitionsProp).asInstanceOf[Int],
- offsetsTopicSegmentBytes = parsed.get(OffsetsTopicSegmentBytesProp).asInstanceOf[Int],
- offsetsTopicCompressionCodec = Option(parsed.get(OffsetsTopicCompressionCodecProp)).map(_.asInstanceOf[Int]).map(value => CompressionCodec.getCompressionCodec(value)).orNull,
- offsetsRetentionMinutes = parsed.get(OffsetsRetentionMinutesProp).asInstanceOf[Int],
- offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long],
- offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int],
- offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short],
- deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean],
- compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String],
- metricNumSamples = parsed.get(MetricNumSamplesProp).asInstanceOf[Int],
- metricSampleWindowMs = parsed.get(MetricSampleWindowMsProp).asInstanceOf[Long],
- _metricReporterClasses = parsed.get(MetricReporterClassesProp).asInstanceOf[java.util.List[String]]
- )
- }
-
- /**
- * Create a log config instance using the given properties and defaults
- */
- def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
- val props = new Properties(defaults)
- props.putAll(overrides)
- fromProps(props)
- }
-
- /**
* Check that property names are valid
*/
def validateNames(props: Properties) {
@@ -662,171 +533,149 @@ object KafkaConfig {
require(names.contains(name), "Unknown configuration \"%s\".".format(name))
}
- /**
- * Check that the given properties contain only valid kafka config names and that all values can be parsed and are valid
- */
- def validate(props: Properties) {
- validateNames(props)
- configDef.parse(props)
+ def fromProps(props: Properties): KafkaConfig = {
+ KafkaConfig(props)
+ }
- // to bootstrap KafkaConfig.validateValues()
- KafkaConfig.fromProps(props)
+ def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
+ val props = new Properties()
+ props.putAll(defaults)
+ props.putAll(overrides)
+ fromProps(props)
}
+
}
-class KafkaConfig (/** ********* Zookeeper Configuration ***********/
- val zkConnect: String,
- val zkSessionTimeoutMs: Int = Defaults.ZkSessionTimeoutMs,
- private val _zkConnectionTimeoutMs: Option[Int] = None,
- val zkSyncTimeMs: Int = Defaults.ZkSyncTimeMs,
-
- /** ********* General Configuration ***********/
- val maxReservedBrokerId: Int = Defaults.MaxReservedBrokerId,
- var brokerId: Int = Defaults.BrokerId,
- val messageMaxBytes: Int = Defaults.MessageMaxBytes,
- val numNetworkThreads: Int = Defaults.NumNetworkThreads,
- val numIoThreads: Int = Defaults.NumIoThreads,
- val backgroundThreads: Int = Defaults.BackgroundThreads,
- val queuedMaxRequests: Int = Defaults.QueuedMaxRequests,
-
- /** ********* Socket Server Configuration ***********/
- val port: Int = Defaults.Port,
- val hostName: String = Defaults.HostName,
- private val _listeners: Option[String] = None,
- private val _advertisedHostName: Option[String] = None,
- private val _advertisedPort: Option[Int] = None,
- private val _advertisedListeners: Option[String] = None,
- val socketSendBufferBytes: Int = Defaults.SocketSendBufferBytes,
- val socketReceiveBufferBytes: Int = Defaults.SocketReceiveBufferBytes,
- val socketRequestMaxBytes: Int = Defaults.SocketRequestMaxBytes,
- val maxConnectionsPerIp: Int = Defaults.MaxConnectionsPerIp,
- private val _maxConnectionsPerIpOverrides: String = Defaults.MaxConnectionsPerIpOverrides,
- val connectionsMaxIdleMs: Long = Defaults.ConnectionsMaxIdleMs,
-
- /** ********* Log Configuration ***********/
- val numPartitions: Int = Defaults.NumPartitions,
- private val _logDir: String = Defaults.LogDir,
- private val _logDirs: Option[String] = None,
-
- val logSegmentBytes: Int = Defaults.LogSegmentBytes,
-
- val logRollTimeHours: Int = Defaults.LogRollHours,
- private val _logRollTimeMillis: Option[Long] = None,
-
- val logRollTimeJitterHours: Int = Defaults.LogRollJitterHours,
- private val _logRollTimeJitterMillis: Option[Long] = None,
-
- val logRetentionTimeHours: Int = Defaults.LogRetentionHours,
- private val _logRetentionTimeMins: Option[Int] = None,
- private val _logRetentionTimeMillis: Option[Long] = None,
-
- val logRetentionBytes: Long = Defaults.LogRetentionBytes,
- val logCleanupIntervalMs: Long = Defaults.LogCleanupIntervalMs,
- val logCleanupPolicy: String = Defaults.LogCleanupPolicy,
- val logCleanerThreads: Int = Defaults.LogCleanerThreads,
- val logCleanerIoMaxBytesPerSecond: Double = Defaults.LogCleanerIoMaxBytesPerSecond,
- val logCleanerDedupeBufferSize: Long = Defaults.LogCleanerDedupeBufferSize,
- val logCleanerIoBufferSize: Int = Defaults.LogCleanerIoBufferSize,
- val logCleanerDedupeBufferLoadFactor: Double = Defaults.LogCleanerDedupeBufferLoadFactor,
- val logCleanerBackoffMs: Long = Defaults.LogCleanerBackoffMs,
- val logCleanerMinCleanRatio: Double = Defaults.LogCleanerMinCleanRatio,
- val logCleanerEnable: Boolean = Defaults.LogCleanerEnable,
- val logCleanerDeleteRetentionMs: Long = Defaults.LogCleanerDeleteRetentionMs,
- val logIndexSizeMaxBytes: Int = Defaults.LogIndexSizeMaxBytes,
- val logIndexIntervalBytes: Int = Defaults.LogIndexIntervalBytes,
- val logFlushIntervalMessages: Long = Defaults.LogFlushIntervalMessages,
- val logDeleteDelayMs: Long = Defaults.LogDeleteDelayMs,
- val logFlushSchedulerIntervalMs: Long = Defaults.LogFlushSchedulerIntervalMs,
- private val _logFlushIntervalMs: Option[Long] = None,
- val logFlushOffsetCheckpointIntervalMs: Int = Defaults.LogFlushOffsetCheckpointIntervalMs,
- val numRecoveryThreadsPerDataDir: Int = Defaults.NumRecoveryThreadsPerDataDir,
- val autoCreateTopicsEnable: Boolean = Defaults.AutoCreateTopicsEnable,
-
- val minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
-
- /** ********* Replication configuration ***********/
- val controllerSocketTimeoutMs: Int = Defaults.ControllerSocketTimeoutMs,
- val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor,
- val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs,
- val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs,
- val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes,
- val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes,
- val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs,
- val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes,
- val replicaFetchBackoffMs: Int = Defaults.ReplicaFetchBackoffMs,
- val numReplicaFetchers: Int = Defaults.NumReplicaFetchers,
- val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs,
- val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests,
- val producerPurgatoryPurgeIntervalRequests: Int = Defaults.ProducerPurgatoryPurgeIntervalRequests,
- val autoLeaderRebalanceEnable: Boolean = Defaults.AutoLeaderRebalanceEnable,
- val leaderImbalancePerBrokerPercentage: Int = Defaults.LeaderImbalancePerBrokerPercentage,
- val leaderImbalanceCheckIntervalSeconds: Int = Defaults.LeaderImbalanceCheckIntervalSeconds,
- val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
- val interBrokerSecurityProtocol: SecurityProtocol = SecurityProtocol.valueOf(Defaults.InterBrokerSecurityProtocol),
- val interBrokerProtocolVersion: ApiVersion = ApiVersion(Defaults.InterBrokerProtocolVersion),
-
- /** ********* Controlled shutdown configuration ***********/
- val controlledShutdownMaxRetries: Int = Defaults.ControlledShutdownMaxRetries,
- val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs,
- val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable,
-
- /** ********* Consumer coordinator configuration ***********/
- val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs,
- val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs,
-
- /** ********* Offset management configuration ***********/
- val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize,
- val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize,
- val offsetsTopicReplicationFactor: Short = Defaults.OffsetsTopicReplicationFactor,
- val offsetsTopicPartitions: Int = Defaults.OffsetsTopicPartitions,
- val offsetsTopicSegmentBytes: Int = Defaults.OffsetsTopicSegmentBytes,
- val offsetsTopicCompressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(Defaults.OffsetsTopicCompressionCodec),
- val offsetsRetentionMinutes: Int = Defaults.OffsetsRetentionMinutes,
- val offsetsRetentionCheckIntervalMs: Long = Defaults.OffsetsRetentionCheckIntervalMs,
- val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs,
- val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks,
-
- val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable,
- val compressionType: String = Defaults.CompressionType,
-
- val metricSampleWindowMs: Long = Defaults.MetricSampleWindowMs,
- val metricNumSamples: Int = Defaults.MetricNumSamples,
- private val _metricReporterClasses: java.util.List[String] = util.Arrays.asList(Defaults.MetricReporterClasses)
- ) {
-
- val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs)
-
- val listeners = getListeners()
- val advertisedHostName: String = _advertisedHostName.getOrElse(hostName)
- val advertisedPort: Int = _advertisedPort.getOrElse(port)
- val advertisedListeners = getAdvertisedListeners()
- val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir))
-
- val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours)
- val logRollTimeJitterMillis = _logRollTimeJitterMillis.getOrElse(60 * 60 * 1000L * logRollTimeJitterHours)
- val logRetentionTimeMillis = getLogRetentionTimeMillis
+case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) {
+
+ /** ********* Zookeeper Configuration ***********/
+ val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
+ val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp)
+ val zkConnectionTimeoutMs: java.lang.Integer =
+ Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
+ val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
- val logFlushIntervalMs = _logFlushIntervalMs.getOrElse(logFlushSchedulerIntervalMs)
+ /** ********* General Configuration ***********/
+ val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
+ var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
+ val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
+ val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
+ val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
+ val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
+ val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
+ /** ********* Socket Server Configuration ***********/
+ val hostName = getString(KafkaConfig.HostNameProp)
+ val port = getInt(KafkaConfig.PortProp)
+ val advertisedHostName = Option(getString(KafkaConfig.AdvertisedHostNameProp)).getOrElse(hostName)
+ val advertisedPort: java.lang.Integer = Option(getInt(KafkaConfig.AdvertisedPortProp)).getOrElse(port)
+
+ val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
+ val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp)
+ val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp)
+ val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp)
val maxConnectionsPerIpOverrides: Map[String, Int] =
- getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)}
+ getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
+ val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
+
- val metricReporterClasses: java.util.List[MetricsReporter] = getMetricClasses(_metricReporterClasses)
+ /** ********* Log Configuration ***********/
+ val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
+ val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
+ val logDirs = CoreUtils.parseCsvList( Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
+ val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
+ val logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp)
+ val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
+ val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
+ val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
+ val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
+ val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
+ val logCleanupPolicy = getString(KafkaConfig.LogCleanupPolicyProp)
+ val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
+ val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
+ val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
+ val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
+ val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
+ val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
+ val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
+ val logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
+ val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
+ val logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
+ val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
+ val logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
+ val logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
+ val logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
+ val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp))
+ val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
+ val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+ val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
+
+ /** ********* Replication configuration ***********/
+ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
+ val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp)
+ val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
+ val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp)
+ val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp)
+ val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
+ val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
+ val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
+ val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
+ val numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
+ val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
+ val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
+ val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
+ val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
+ val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
+ val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
+ val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+ val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
+ val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
+
+ /** ********* Controlled shutdown configuration ***********/
+ val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)
+ val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp)
+ val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
+
+ /** ********* Consumer coordinator configuration ***********/
+ val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp)
+ val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp)
+
+ /** ********* Offset management configuration ***********/
+ val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
+ val offsetsLoadBufferSize = getInt(KafkaConfig.OffsetsLoadBufferSizeProp)
+ val offsetsTopicReplicationFactor = getShort(KafkaConfig.OffsetsTopicReplicationFactorProp)
+ val offsetsTopicPartitions = getInt(KafkaConfig.OffsetsTopicPartitionsProp)
+ val offsetCommitTimeoutMs = getInt(KafkaConfig.OffsetCommitTimeoutMsProp)
+ val offsetCommitRequiredAcks = getShort(KafkaConfig.OffsetCommitRequiredAcksProp)
+ val offsetsTopicSegmentBytes = getInt(KafkaConfig.OffsetsTopicSegmentBytesProp)
+ val offsetsTopicCompressionCodec = Option(getInt(KafkaConfig.OffsetsTopicCompressionCodecProp)).map(value => CompressionCodec.getCompressionCodec(value)).orNull
+
+ /** ********* Metric Configuration **************/
+ val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
+ val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
+ val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter])
+
+ val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
+ val compressionType = getString(KafkaConfig.CompressionTypeProp)
+
+
+ val listeners = getListeners
+ val advertisedListeners = getAdvertisedListeners
+ val logRetentionTimeMillis = getLogRetentionTimeMillis
private def getLogRetentionTimeMillis: Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
- val millis = {
- _logRetentionTimeMillis.getOrElse(
- _logRetentionTimeMins match {
- case Some(mins) => millisInMinute * mins
- case None => millisInHour * logRetentionTimeHours
- }
- )
- }
- if (millis < 0) return -1
- millis
+ val millis: java.lang.Long =
+ Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
+ Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
+ case Some(mins) => millisInMinute * mins
+ case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
+ })
+
+ if (millis < 0) return -1
+ millis
}
private def getMap(propName: String, propValue: String): Map[String, String] = {
@@ -855,9 +704,9 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
// If the user did not define listeners but did define host or port, let's use them in backward compatible way
// If none of those are defined, we default to PLAINTEXT://:9092
private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
- if (_listeners.isDefined) {
- validateUniquePortAndProtocol(_listeners.get)
- CoreUtils.listenerListToEndPoints(_listeners.get)
+ if (getString(KafkaConfig.ListenersProp) != null) {
+ validateUniquePortAndProtocol(getString(KafkaConfig.ListenersProp))
+ CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp))
} else {
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port)
}
@@ -867,11 +716,12 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
// If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
// If none of these are defined, we'll use the listeners
private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
- if (_advertisedListeners.isDefined) {
- validateUniquePortAndProtocol(_advertisedListeners.get)
- CoreUtils.listenerListToEndPoints(_advertisedListeners.get)
- } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) {
- CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort)
+ if (getString(KafkaConfig.AdvertisedListenersProp) != null) {
+ validateUniquePortAndProtocol(getString(KafkaConfig.AdvertisedListenersProp))
+ CoreUtils.listenerListToEndPoints(getString(KafkaConfig.AdvertisedListenersProp))
+ } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) {
+ CoreUtils.listenerListToEndPoints("PLAINTEXT://" +
+ getString(KafkaConfig.AdvertisedHostNameProp) + ":" + getInt(KafkaConfig.AdvertisedPortProp))
} else {
getListeners()
}
@@ -886,7 +736,7 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
val reporterName = iterator.next()
if (!reporterName.isEmpty) {
val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName)
- reporter.configure(toProps.asInstanceOf[java.util.Map[String, _]])
+ reporter.configure(originals)
reporterList.add(reporter)
}
}
@@ -895,19 +745,13 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
}
-
-
validateValues()
private def validateValues() {
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
-
- require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1")
- require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
-
require(logDirs.size > 0)
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
@@ -920,127 +764,4 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
" Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
}
-
- def toProps: Properties = {
- val props = new Properties()
- import kafka.server.KafkaConfig._
- /** ********* Zookeeper Configuration ***********/
- props.put(ZkConnectProp, zkConnect)
- props.put(ZkSessionTimeoutMsProp, zkSessionTimeoutMs.toString)
- _zkConnectionTimeoutMs.foreach(value => props.put(ZkConnectionTimeoutMsProp, value.toString))
- props.put(ZkSyncTimeMsProp, zkSyncTimeMs.toString)
-
- /** ********* General Configuration ***********/
- props.put(MaxReservedBrokerIdProp, maxReservedBrokerId.toString)
- props.put(BrokerIdProp, brokerId.toString)
- props.put(MessageMaxBytesProp, messageMaxBytes.toString)
- props.put(NumNetworkThreadsProp, numNetworkThreads.toString)
- props.put(NumIoThreadsProp, numIoThreads.toString)
- props.put(BackgroundThreadsProp, backgroundThreads.toString)
- props.put(QueuedMaxRequestsProp, queuedMaxRequests.toString)
-
- /** ********* Socket Server Configuration ***********/
- props.put(PortProp, port.toString)
- props.put(HostNameProp, hostName)
- _listeners.foreach(props.put(ListenersProp, _))
- _advertisedHostName.foreach(props.put(AdvertisedHostNameProp, _))
- _advertisedPort.foreach(value => props.put(AdvertisedPortProp, value.toString))
- _advertisedListeners.foreach(props.put(AdvertisedListenersProp, _))
- props.put(SocketSendBufferBytesProp, socketSendBufferBytes.toString)
- props.put(SocketReceiveBufferBytesProp, socketReceiveBufferBytes.toString)
- props.put(SocketRequestMaxBytesProp, socketRequestMaxBytes.toString)
- props.put(MaxConnectionsPerIpProp, maxConnectionsPerIp.toString)
- props.put(MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides)
- props.put(ConnectionsMaxIdleMsProp, connectionsMaxIdleMs.toString)
-
- /** ********* Log Configuration ***********/
- props.put(NumPartitionsProp, numPartitions.toString)
- props.put(LogDirProp, _logDir)
- _logDirs.foreach(value => props.put(LogDirsProp, value))
- props.put(LogSegmentBytesProp, logSegmentBytes.toString)
-
- props.put(LogRollTimeHoursProp, logRollTimeHours.toString)
- _logRollTimeMillis.foreach(v => props.put(LogRollTimeMillisProp, v.toString))
-
- props.put(LogRollTimeJitterHoursProp, logRollTimeJitterHours.toString)
- _logRollTimeJitterMillis.foreach(v => props.put(LogRollTimeJitterMillisProp, v.toString))
-
-
- props.put(LogRetentionTimeHoursProp, logRetentionTimeHours.toString)
- _logRetentionTimeMins.foreach(v => props.put(LogRetentionTimeMinutesProp, v.toString))
- _logRetentionTimeMillis.foreach(v => props.put(LogRetentionTimeMillisProp, v.toString))
-
- props.put(LogRetentionBytesProp, logRetentionBytes.toString)
- props.put(LogCleanupIntervalMsProp, logCleanupIntervalMs.toString)
- props.put(LogCleanupPolicyProp, logCleanupPolicy)
- props.put(LogCleanerThreadsProp, logCleanerThreads.toString)
- props.put(LogCleanerIoMaxBytesPerSecondProp, logCleanerIoMaxBytesPerSecond.toString)
- props.put(LogCleanerDedupeBufferSizeProp, logCleanerDedupeBufferSize.toString)
- props.put(LogCleanerIoBufferSizeProp, logCleanerIoBufferSize.toString)
- props.put(LogCleanerDedupeBufferLoadFactorProp, logCleanerDedupeBufferLoadFactor.toString)
- props.put(LogCleanerBackoffMsProp, logCleanerBackoffMs.toString)
- props.put(LogCleanerMinCleanRatioProp, logCleanerMinCleanRatio.toString)
- props.put(LogCleanerEnableProp, logCleanerEnable.toString)
- props.put(LogCleanerDeleteRetentionMsProp, logCleanerDeleteRetentionMs.toString)
- props.put(LogIndexSizeMaxBytesProp, logIndexSizeMaxBytes.toString)
- props.put(LogIndexIntervalBytesProp, logIndexIntervalBytes.toString)
- props.put(LogFlushIntervalMessagesProp, logFlushIntervalMessages.toString)
- props.put(LogDeleteDelayMsProp, logDeleteDelayMs.toString)
- props.put(LogFlushSchedulerIntervalMsProp, logFlushSchedulerIntervalMs.toString)
- _logFlushIntervalMs.foreach(v => props.put(LogFlushIntervalMsProp, v.toString))
- props.put(LogFlushOffsetCheckpointIntervalMsProp, logFlushOffsetCheckpointIntervalMs.toString)
- props.put(NumRecoveryThreadsPerDataDirProp, numRecoveryThreadsPerDataDir.toString)
- props.put(AutoCreateTopicsEnableProp, autoCreateTopicsEnable.toString)
- props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
-
- /** ********* Replication configuration ***********/
- props.put(ControllerSocketTimeoutMsProp, controllerSocketTimeoutMs.toString)
- props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString)
- props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
- props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString)
- props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString)
- props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString)
- props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
- props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
- props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString)
- props.put(NumReplicaFetchersProp, numReplicaFetchers.toString)
- props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString)
- props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString)
- props.put(ProducerPurgatoryPurgeIntervalRequestsProp, producerPurgatoryPurgeIntervalRequests.toString)
- props.put(AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
- props.put(LeaderImbalancePerBrokerPercentageProp, leaderImbalancePerBrokerPercentage.toString)
- props.put(LeaderImbalanceCheckIntervalSecondsProp, leaderImbalanceCheckIntervalSeconds.toString)
- props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
- props.put(InterBrokerSecurityProtocolProp, interBrokerSecurityProtocol.toString)
- props.put(InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
-
-
- /** ********* Controlled shutdown configuration ***********/
- props.put(ControlledShutdownMaxRetriesProp, controlledShutdownMaxRetries.toString)
- props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString)
- props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString)
-
- /** ********* Consumer coordinator configuration ***********/
- props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString)
- props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString)
-
- /** ********* Offset management configuration ***********/
- props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString)
- props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString)
- props.put(OffsetsTopicReplicationFactorProp, offsetsTopicReplicationFactor.toString)
- props.put(OffsetsTopicPartitionsProp, offsetsTopicPartitions.toString)
- props.put(OffsetsTopicSegmentBytesProp, offsetsTopicSegmentBytes.toString)
- props.put(OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString)
- props.put(OffsetsRetentionMinutesProp, offsetsRetentionMinutes.toString)
- props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString)
- props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString)
- props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString)
- props.put(DeleteTopicEnableProp, deleteTopicEnable.toString)
- props.put(CompressionTypeProp, compressionType.toString)
- props.put(MetricNumSamplesProp, metricNumSamples.toString)
- props.put(MetricSampleWindowMsProp, metricSampleWindowMs.toString)
- props.put(MetricReporterClassesProp, JavaConversions.collectionAsScalaIterable(_metricReporterClasses).mkString(","))
-
- props
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b320ce9..9de2a6f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,6 +17,9 @@
package kafka.server
+import java.util
+import java.util.Properties
+
import kafka.admin._
import kafka.log.LogConfig
import kafka.log.CleanerConfig
@@ -388,23 +391,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def boundPort(): Int = socketServer.boundPort()
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
- val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
- segmentMs = config.logRollTimeMillis,
- segmentJitterMs = config.logRollTimeJitterMillis,
- flushInterval = config.logFlushIntervalMessages,
- flushMs = config.logFlushIntervalMs.toLong,
- retentionSize = config.logRetentionBytes,
- retentionMs = config.logRetentionTimeMillis,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = config.logIndexSizeMaxBytes,
- indexInterval = config.logIndexIntervalBytes,
- deleteRetentionMs = config.logCleanerDeleteRetentionMs,
- fileDeleteDelayMs = config.logDeleteDelayMs,
- minCleanableRatio = config.logCleanerMinCleanRatio,
- compact = config.logCleanupPolicy.trim.toLowerCase == "compact",
- minInSyncReplicas = config.minInSyncReplicas,
- compressionType = config.compressionType)
- val defaultProps = defaultLogConfig.toProps
+ val defaultProps = copyKafkaConfigToLog(config.originals)
+ val defaultLogConfig = LogConfig(defaultProps)
+
val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
@@ -428,6 +417,38 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
time = time)
}
+ // Copy the subset of properties that are relevant to Logs
+ // I'm listing out individual properties here since the names are slightly different in each Config class...
+ private def copyKafkaConfigToLog(serverProps: java.util.Map[String, Object]): java.util.Map[String, Object] = {
+
+ val logProps = new util.HashMap[String, Object]()
+ val entryset = serverProps.entrySet.iterator
+ while (entryset.hasNext) {
+ val entry = entryset.next
+ entry.getKey match {
+ case KafkaConfig.LogSegmentBytesProp => logProps.put(LogConfig.SegmentBytesProp, entry.getValue)
+ case KafkaConfig.LogRollTimeMillisProp => logProps.put(LogConfig.SegmentMsProp, entry.getValue)
+ case KafkaConfig.LogRollTimeJitterMillisProp => logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue)
+ case KafkaConfig.LogIndexSizeMaxBytesProp => logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue)
+ case KafkaConfig.LogFlushIntervalMessagesProp => logProps.put(LogConfig.FlushMessagesProp, entry.getValue)
+ case KafkaConfig.LogFlushIntervalMsProp => logProps.put(LogConfig.FlushMsProp, entry.getValue)
+ case KafkaConfig.LogRetentionBytesProp => logProps.put(LogConfig.RetentionBytesProp, entry.getValue)
+ case KafkaConfig.LogRetentionTimeMillisProp => logProps.put(LogConfig.RetentionMsProp, entry.getValue)
+ case KafkaConfig.MessageMaxBytesProp => logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue)
+ case KafkaConfig.LogIndexIntervalBytesProp => logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue)
+ case KafkaConfig.LogCleanerDeleteRetentionMsProp => logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue)
+ case KafkaConfig.LogDeleteDelayMsProp => logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue)
+ case KafkaConfig.LogCleanerMinCleanRatioProp => logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue)
+ case KafkaConfig.LogCleanupPolicyProp => logProps.put(LogConfig.CleanupPolicyProp, entry.getValue)
+ case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
+ case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
+ case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
+ case _ => // we just leave those out
+ }
+ }
+ logProps
+ }
+
private def createOffsetManager(): OffsetManager = {
val offsetManagerConfig = OffsetManagerConfig(
maxMetadataSize = config.offsetMetadataMaxSize,
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 181cbc1..c89d00b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -90,7 +90,7 @@ class ReplicaFetcherThread(name:String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
- if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
+ if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
@@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String,
// any logic for partitions whose leader has changed
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
- delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs)
+ delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index b675a7e..01b1b0a 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -101,9 +101,10 @@ class TopicConfigManager(private val zkClient: ZkClient,
val topic = json.substring(1, json.length - 1) // hacky way to dequote
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
- val props = new Properties(logManager.defaultConfig.toProps)
+ val props = new Properties()
+ props.putAll(logManager.defaultConfig.originals)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
- val logConfig = LogConfig.fromProps(props)
+ val logConfig = LogConfig(props)
for (log <- logsByTopic(topic))
log.config = logConfig
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index d0a8fa7..f5d704c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -254,32 +254,6 @@ object CoreUtils extends Logging {
}
/**
- * Turn {@linkplain java.util.Properties} with default values into a {@linkplain java.util.Map}. Following example
- * illustrates difference from the cast
- * <pre>
- * val defaults = new Properties()
- * defaults.put("foo", "bar")
- * val props = new Properties(defaults)
- *
- * props.getProperty("foo") // "bar"
- * props.get("foo") // null
- * evaluateDefaults(props).get("foo") // "bar"
- * </pre>
- *
- * @param props properties to evaluate
- * @return new java.util.Map instance
- */
- def evaluateDefaults(props: Properties): java.util.Map[String, String] = {
- import java.util._
- import JavaConversions.asScalaSet
- val evaluated = new HashMap[String, String]()
- for (name <- props.stringPropertyNames()) {
- evaluated.put(name, props.getProperty(name))
- }
- evaluated
- }
-
- /**
* Read a big-endian integer from a byte array
*/
def readInt(bytes: Array[Byte], offset: Int): Int = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index c0e248d..225d77b 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -17,6 +17,7 @@
package kafka
+import java.util.Properties
import java.util.concurrent.atomic._
import kafka.common._
import kafka.message._
@@ -33,10 +34,13 @@ object StressTestLog {
def main(args: Array[String]) {
val dir = TestUtils.tempDir()
val time = new MockTime
+ val logProprties = new Properties()
+ logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer)
+ logProprties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer)
+ logProprties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer)
+
val log = new Log(dir = dir,
- config = LogConfig(segmentSize = 64*1024*1024,
- maxMessageSize = Int.MaxValue,
- maxIndexSize = 1024*1024),
+ config = LogConfig(logProprties),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 3034c4f..236d857 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -20,7 +20,7 @@ package kafka
import java.io._
import java.nio._
import java.nio.channels._
-import java.util.Random
+import java.util.{Properties, Random}
import kafka.log._
import kafka.utils._
import kafka.message._
@@ -110,7 +110,10 @@ object TestLinearWriteSpeed {
writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer)
} else if(options.has(logOpt)) {
val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary size to avoid herd effect
- writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(segmentSize=segmentSize, flushInterval = flushInterval), scheduler, messageSet)
+ val logProperties = new Properties()
+ logProperties.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+ logProperties.put(LogConfig.FlushMessagesProp, flushInterval: java.lang.Long)
+ writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet)
} else {
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap")
System.exit(1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 375555f..6180b87 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -26,7 +26,7 @@ import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
-import java.util.{ Collection, ArrayList }
+import java.util.{Properties, Collection, ArrayList}
import kafka.server.KafkaConfig
import org.apache.kafka.common.record.CompressionType
import scala.collection.JavaConversions._
@@ -54,9 +54,10 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
@Test
def testBrokerSideCompression() {
val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression)
-
+ val logProps = new Properties()
+ logProps.put(LogConfig.CompressionTypeProp,brokerCompression)
/*configure broker-side compression */
- val log = new Log(logDir, logConfig.copy(compressionType = brokerCompression), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
/* append two messages */
log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8b8249a..0e2a6a1 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -17,6 +17,8 @@
package kafka.log
+import java.util.Properties
+
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Test}
@@ -35,7 +37,11 @@ import org.apache.kafka.common.utils.Utils
class CleanerTest extends JUnitSuite {
val dir = TestUtils.tempDir()
- val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+ logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ val logConfig = LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
@@ -50,8 +56,11 @@ class CleanerTest extends JUnitSuite {
@Test
def testCleanSegments() {
val cleaner = makeCleaner(Int.MaxValue)
- val log = makeLog(config = logConfig.copy(segmentSize = 1024))
-
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
@@ -72,7 +81,10 @@ class CleanerTest extends JUnitSuite {
@Test
def testCleaningWithDeletes() {
val cleaner = makeCleaner(Int.MaxValue)
- val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages with the keys 0 through N
while(log.numberOfSegments < 2)
@@ -98,7 +110,11 @@ class CleanerTest extends JUnitSuite {
val cleaner = makeCleaner(Int.MaxValue)
// create a log with compaction turned off so we can append unkeyed messages
- val log = makeLog(config = logConfig.copy(segmentSize = 1024, compact = false))
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append unkeyed messages
while(log.numberOfSegments < 2)
@@ -114,7 +130,9 @@ class CleanerTest extends JUnitSuite {
val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
// turn on compaction and compact the log
- val compactedLog = makeLog(config = logConfig.copy(segmentSize = 1024))
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+ val compactedLog = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))
@@ -139,7 +157,10 @@ class CleanerTest extends JUnitSuite {
@Test
def testCleanSegmentsWithAbort() {
val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
- val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
@@ -159,7 +180,11 @@ class CleanerTest extends JUnitSuite {
@Test
def testSegmentGrouping() {
val cleaner = makeCleaner(Int.MaxValue)
- val log = makeLog(config = logConfig.copy(segmentSize = 300, indexInterval = 1))
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+ logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append some messages to the log
var i = 0
@@ -208,7 +233,12 @@ class CleanerTest extends JUnitSuite {
@Test
def testSegmentGroupingWithSparseOffsets() {
val cleaner = makeCleaner(Int.MaxValue)
- val log = makeLog(config = logConfig.copy(segmentSize = 1024, indexInterval = 1))
+
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+ logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// fill up first segment
while (log.numberOfSegments == 1)
@@ -288,7 +318,12 @@ class CleanerTest extends JUnitSuite {
@Test
def testRecoveryAfterCrash() {
val cleaner = makeCleaner(Int.MaxValue)
- val config = logConfig.copy(segmentSize = 300, indexInterval = 1, fileDeleteDelayMs = 10)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+ logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+ logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer)
+
+ val config = LogConfig.fromProps(logConfig.originals, logProps)
def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {
// Recover log file and check that after recovery, keys are as expected
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 471ddff..381e9aa 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,6 +18,7 @@
package kafka.log
import java.io.File
+import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.message._
@@ -127,8 +128,13 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
for(i <- 0 until parts) {
val dir = new File(logDir, "log-" + i)
dir.mkdirs()
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+ logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
+ logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val log = new Log(dir = dir,
- LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true),
+ LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)