You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/18 22:03:24 UTC
kafka git commit: KAFKA-2436;
log.retention.hours should be honored by LogManager
Repository: kafka
Updated Branches:
refs/heads/trunk 786867c2e -> 503bd3664
KAFKA-2436; log.retention.hours should be honored by LogManager
Author: Dong Lin <li...@gmail.com>
Reviewers: Joel Koshy, Gwen Shapira
Closes #142 from lindong28/KAFKA-2436
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/503bd366
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/503bd366
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/503bd366
Branch: refs/heads/trunk
Commit: 503bd36647695e8cc91893ffb80346dd03eb0bc5
Parents: 786867c
Author: Dong Lin <li...@gmail.com>
Authored: Tue Aug 18 13:03:11 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Aug 18 13:03:11 2015 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogConfig.scala | 5 +-
.../main/scala/kafka/server/KafkaConfig.scala | 8 +--
.../main/scala/kafka/server/KafkaServer.scala | 65 ++++++++++----------
.../scala/unit/kafka/log/LogConfigTest.scala | 19 ++++++
4 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index c969d16..7fc7e33 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -47,7 +47,10 @@ object Defaults {
}
case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
-
+ /**
+ * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig
+ * should also go in copyKafkaConfigToLog.
+ */
val segmentSize = getInt(LogConfig.SegmentBytesProp)
val segmentMs = getLong(LogConfig.SegmentMsProp)
val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 394f21b..c39402c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -335,7 +335,7 @@ object KafkaConfig {
val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property"
val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
- val LogCleanupIntervalMsDoc = "The frequency in minutes that the log cleaner checks whether any log is eligible for deletion"
+ val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion"
val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond the retention window, must be either \"delete\" or \"compact\""
val LogCleanerThreadsDoc = "The number of background threads to use for log cleaning"
val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average"
@@ -652,8 +652,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp))
val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+ val logRetentionTimeMillis = getLogRetentionTimeMillis
val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
- val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
+ val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -672,7 +673,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
- val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+ val uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
@@ -713,7 +714,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val listeners = getListeners
val advertisedListeners = getAdvertisedListeners
- val logRetentionTimeMillis = getLogRetentionTimeMillis
private def getLogRetentionTimeMillis: Long = {
val millisInMinute = 60L * 1000L
http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 6d65507..a0e3fdf 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,6 +43,36 @@ import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
+
+object KafkaServer {
+ // Copy the subset of properties that are relevant to Logs
+ // I'm listing out individual properties here since the names are slightly different in each Config class...
+ def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, Object] = {
+ val logProps = new util.HashMap[String, Object]()
+ logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes)
+ logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis)
+ logProps.put(LogConfig.SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis)
+ logProps.put(LogConfig.SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes)
+ logProps.put(LogConfig.FlushMessagesProp, kafkaConfig.logFlushIntervalMessages)
+ logProps.put(LogConfig.FlushMsProp, kafkaConfig.logFlushIntervalMs)
+ logProps.put(LogConfig.RetentionBytesProp, kafkaConfig.logRetentionBytes)
+ logProps.put(LogConfig.RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long)
+ logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes)
+ logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
+ logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
+ logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
+ logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
+ logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
+ logProps.put(LogConfig.MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas)
+ logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
+ logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
+ logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
+ logProps
+ }
+}
+
+
+
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
@@ -387,7 +417,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def boundPort(): Int = socketServer.boundPort()
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
- val defaultProps = copyKafkaConfigToLog(config.originals)
+ val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
@@ -413,39 +443,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
time = time)
}
- // Copy the subset of properties that are relevant to Logs
- // I'm listing out individual properties here since the names are slightly different in each Config class...
- private def copyKafkaConfigToLog(serverProps: java.util.Map[String, Object]): java.util.Map[String, Object] = {
-
- val logProps = new util.HashMap[String, Object]()
- val entryset = serverProps.entrySet.iterator
- while (entryset.hasNext) {
- val entry = entryset.next
- entry.getKey match {
- case KafkaConfig.LogSegmentBytesProp => logProps.put(LogConfig.SegmentBytesProp, entry.getValue)
- case KafkaConfig.LogRollTimeMillisProp => logProps.put(LogConfig.SegmentMsProp, entry.getValue)
- case KafkaConfig.LogRollTimeJitterMillisProp => logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue)
- case KafkaConfig.LogIndexSizeMaxBytesProp => logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue)
- case KafkaConfig.LogFlushIntervalMessagesProp => logProps.put(LogConfig.FlushMessagesProp, entry.getValue)
- case KafkaConfig.LogFlushIntervalMsProp => logProps.put(LogConfig.FlushMsProp, entry.getValue)
- case KafkaConfig.LogRetentionBytesProp => logProps.put(LogConfig.RetentionBytesProp, entry.getValue)
- case KafkaConfig.LogRetentionTimeMillisProp => logProps.put(LogConfig.RetentionMsProp, entry.getValue)
- case KafkaConfig.MessageMaxBytesProp => logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue)
- case KafkaConfig.LogIndexIntervalBytesProp => logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue)
- case KafkaConfig.LogCleanerDeleteRetentionMsProp => logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue)
- case KafkaConfig.LogDeleteDelayMsProp => logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue)
- case KafkaConfig.LogCleanerMinCleanRatioProp => logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue)
- case KafkaConfig.LogCleanupPolicyProp => logProps.put(LogConfig.CleanupPolicyProp, entry.getValue)
- case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
- case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
- case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
- case KafkaConfig.LogPreAllocateProp => logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue)
- case _ => // we just leave those out
- }
- }
- logProps
- }
-
/**
* Generates new brokerId or reads from meta.properties based on following conditions
* <ol>
http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 348b012..51cd62c 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -19,13 +19,32 @@ package kafka.log
import java.util.Properties
+import kafka.server.KafkaConfig
+import kafka.server.KafkaServer
+import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigException
import org.junit.{Assert, Test}
+import org.junit.Assert._
import org.scalatest.Assertions._
class LogConfigTest {
@Test
+ def testKafkaConfigToProps() {
+ val millisInHour = 60L * 60L * 1000L
+ val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+ kafkaProps.put(KafkaConfig.LogRollTimeHoursProp, "2")
+ kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2")
+ kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2")
+
+ val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+ val logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig)
+ assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentMsProp))
+ assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentJitterMsProp))
+ assertEquals(2 * millisInHour, logProps.get(LogConfig.RetentionMsProp))
+ }
+
+ @Test
def testFromPropsEmpty() {
val p = new Properties()
val config = LogConfig(p)