You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/18 22:03:24 UTC

kafka git commit: KAFKA-2436; log.retention.hours should be honored by LogManager

Repository: kafka
Updated Branches:
  refs/heads/trunk 786867c2e -> 503bd3664


KAFKA-2436; log.retention.hours should be honored by LogManager

Author: Dong Lin <li...@gmail.com>

Reviewers: Joel Koshy, Gwen Shapira

Closes #142 from lindong28/KAFKA-2436


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/503bd366
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/503bd366
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/503bd366

Branch: refs/heads/trunk
Commit: 503bd36647695e8cc91893ffb80346dd03eb0bc5
Parents: 786867c
Author: Dong Lin <li...@gmail.com>
Authored: Tue Aug 18 13:03:11 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Aug 18 13:03:11 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogConfig.scala   |  5 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  8 +--
 .../main/scala/kafka/server/KafkaServer.scala   | 65 ++++++++++----------
 .../scala/unit/kafka/log/LogConfigTest.scala    | 19 ++++++
 4 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index c969d16..7fc7e33 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -47,7 +47,10 @@ object Defaults {
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
-
+  /**
+   * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig
+   * should also go in copyKafkaConfigToLog.
+   */
   val segmentSize = getInt(LogConfig.SegmentBytesProp)
   val segmentMs = getLong(LogConfig.SegmentMsProp)
   val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 394f21b..c39402c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -335,7 +335,7 @@ object KafkaConfig {
   val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property"
 
   val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
-  val LogCleanupIntervalMsDoc = "The frequency in minutes that the log cleaner checks whether any log is eligible for deletion"
+  val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion"
   val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond the retention window, must be either \"delete\" or \"compact\""
   val LogCleanerThreadsDoc = "The number of background threads to use for log cleaning"
   val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average"
@@ -652,8 +652,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp))
   val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
   val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+  val logRetentionTimeMillis = getLogRetentionTimeMillis
   val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
-  val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
+  val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -672,7 +673,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
-  val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+  val uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
   val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
   val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
 
@@ -713,7 +714,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
 
   val listeners = getListeners
   val advertisedListeners = getAdvertisedListeners
-  val logRetentionTimeMillis = getLogRetentionTimeMillis
 
   private def getLogRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L

http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 6d65507..a0e3fdf 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,6 +43,36 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
 
+
+object KafkaServer {
+  // Copy the subset of properties that are relevant to Logs
+  // I'm listing out individual properties here since the names are slightly different in each Config class...
+  def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, Object] = {
+    val logProps = new util.HashMap[String, Object]()
+    logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes)
+    logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis)
+    logProps.put(LogConfig.SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis)
+    logProps.put(LogConfig.SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes)
+    logProps.put(LogConfig.FlushMessagesProp, kafkaConfig.logFlushIntervalMessages)
+    logProps.put(LogConfig.FlushMsProp, kafkaConfig.logFlushIntervalMs)
+    logProps.put(LogConfig.RetentionBytesProp, kafkaConfig.logRetentionBytes)
+    logProps.put(LogConfig.RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long)
+    logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes)
+    logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
+    logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
+    logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
+    logProps.put(LogConfig.MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas)
+    logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
+    logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
+    logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
+    logProps
+  }
+}
+
+
+
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
@@ -387,7 +417,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   def boundPort(): Int = socketServer.boundPort()
 
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
-    val defaultProps = copyKafkaConfigToLog(config.originals)
+    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
     val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
@@ -413,39 +443,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                    time = time)
   }
 
-  // Copy the subset of properties that are relevant to Logs
-  // I'm listing out individual properties here since the names are slightly different in each Config class...
-  private def copyKafkaConfigToLog(serverProps: java.util.Map[String, Object]): java.util.Map[String, Object] = {
-
-    val logProps = new util.HashMap[String, Object]()
-    val entryset = serverProps.entrySet.iterator
-    while (entryset.hasNext) {
-      val entry = entryset.next
-      entry.getKey match {
-        case KafkaConfig.LogSegmentBytesProp => logProps.put(LogConfig.SegmentBytesProp, entry.getValue)
-        case KafkaConfig.LogRollTimeMillisProp => logProps.put(LogConfig.SegmentMsProp, entry.getValue)
-        case KafkaConfig.LogRollTimeJitterMillisProp => logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue)
-        case KafkaConfig.LogIndexSizeMaxBytesProp => logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue)
-        case KafkaConfig.LogFlushIntervalMessagesProp => logProps.put(LogConfig.FlushMessagesProp, entry.getValue)
-        case KafkaConfig.LogFlushIntervalMsProp => logProps.put(LogConfig.FlushMsProp, entry.getValue)
-        case KafkaConfig.LogRetentionBytesProp => logProps.put(LogConfig.RetentionBytesProp, entry.getValue)
-        case KafkaConfig.LogRetentionTimeMillisProp => logProps.put(LogConfig.RetentionMsProp, entry.getValue)
-        case KafkaConfig.MessageMaxBytesProp => logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue)
-        case KafkaConfig.LogIndexIntervalBytesProp => logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue)
-        case KafkaConfig.LogCleanerDeleteRetentionMsProp => logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue)
-        case KafkaConfig.LogDeleteDelayMsProp => logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue)
-        case KafkaConfig.LogCleanerMinCleanRatioProp => logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue)
-        case KafkaConfig.LogCleanupPolicyProp => logProps.put(LogConfig.CleanupPolicyProp, entry.getValue)
-        case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
-        case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
-        case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
-        case KafkaConfig.LogPreAllocateProp => logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue)
-        case _ => // we just leave those out
-      }
-    }
-    logProps
-  }
-
   /**
     * Generates new brokerId or reads from meta.properties based on following conditions
     * <ol>

http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 348b012..51cd62c 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -19,13 +19,32 @@ package kafka.log
 
 import java.util.Properties
 
+import kafka.server.KafkaConfig
+import kafka.server.KafkaServer
+import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigException
 import org.junit.{Assert, Test}
+import org.junit.Assert._
 import org.scalatest.Assertions._
 
 class LogConfigTest {
 
   @Test
+  def testKafkaConfigToProps() {
+    val millisInHour = 60L * 60L * 1000L
+    val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    kafkaProps.put(KafkaConfig.LogRollTimeHoursProp, "2")
+    kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2")
+    kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2")
+
+    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+    val logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig)
+    assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentMsProp))
+    assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentJitterMsProp))
+    assertEquals(2 * millisInHour, logProps.get(LogConfig.RetentionMsProp))
+  }
+
+  @Test
   def testFromPropsEmpty() {
     val p = new Properties()
     val config = LogConfig(p)