You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:07 UTC

[10/37] git commit: kafka-1325; Fix inconsistent per topic log configs; patched by Manikumar Reddy; reviewed by Jun Rao

kafka-1325; Fix inconsistent per topic log configs; patched by Manikumar Reddy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/420628d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/420628d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/420628d6

Branch: refs/heads/transactional_messaging
Commit: 420628d695cc675711b94af5cfd14653147bf7f7
Parents: cd3ce27
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Thu Jul 10 08:27:33 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jul 10 08:27:33 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   | 23 +++++++--
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../unit/kafka/server/KafkaConfigTest.scala     | 53 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/420628d6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ef75b67..bb2e654 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -35,14 +35,29 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   private def getLogRetentionTimeMillis(): Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
-    if(props.containsKey("log.retention.minutes")){
+    
+    if(props.containsKey("log.retention.ms")){
+       props.getIntInRange("log.retention.ms", (1, Int.MaxValue))
+    }
+    else if(props.containsKey("log.retention.minutes")){
        millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
-    } else {
+    } 
+    else {
        millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
     }
-
   }
 
+  private def getLogRollTimeMillis(): Long = {
+    val millisInHour = 60L * 60L * 1000L
+    
+    if(props.containsKey("log.roll.ms")){
+       props.getIntInRange("log.roll.ms", (1, Int.MaxValue))
+    }
+    else {
+       millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
+    }
+  }
+  
   /*********** General Configuration ***********/
 
   /* the broker id for this server */
@@ -105,7 +120,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
 
   /* the maximum time before a new log segment is rolled out */
-  val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
+  val logRollTimeMillis = getLogRollTimeMillis
 
   /* the number of hours to keep a log file before deleting it */
   val logRetentionTimeMillis = getLogRetentionTimeMillis

http://git-wip-us.apache.org/repos/asf/kafka/blob/420628d6/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c22e51e..5a56f57 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -275,7 +275,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
     val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, 
-                                     segmentMs = 60L * 60L * 1000L * config.logRollHours,
+                                     segmentMs = config.logRollTimeMillis,
                                      flushInterval = config.logFlushIntervalMessages,
                                      flushMs = config.logFlushIntervalMs.toLong,
                                      retentionSize = config.logRetentionBytes,

http://git-wip-us.apache.org/repos/asf/kafka/blob/420628d6/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6f4809d..2377abe 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -45,6 +45,16 @@ class KafkaConfigTest extends JUnit3Suite {
   }
   
   @Test
+  def testLogRetentionTimeMsProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("log.retention.ms", "1800000")
+
+    val cfg = new KafkaConfig(props)
+    assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+  }
+  
+  @Test
   def testLogRetentionTimeNoConfigProvided() {
     val props = TestUtils.createBrokerConfig(0, 8181)
 
@@ -63,6 +73,17 @@ class KafkaConfigTest extends JUnit3Suite {
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
 
   }
+  
+  @Test
+  def testLogRetentionTimeBothMinutesAndMsProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("log.retention.ms", "1800000")
+    props.put("log.retention.minutes", "10")
+
+    val cfg = new KafkaConfig(props)
+    assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+  }
 
   @Test
   def testAdvertiseDefaults() {
@@ -129,4 +150,36 @@ class KafkaConfigTest extends JUnit3Suite {
       new KafkaConfig(props)
     }
   }
+  
+  @Test
+  def testLogRollTimeMsProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("log.roll.ms", "1800000")
+
+    val cfg = new KafkaConfig(props)
+    assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
+
+  }
+  
+  @Test
+  def testLogRollTimeBothMsAndHoursProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("log.roll.ms", "1800000")
+    props.put("log.roll.hours", "1")
+
+    val cfg = new KafkaConfig(props)
+    assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
+
+  }
+    
+  @Test
+  def testLogRollTimeNoConfigProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+
+    val cfg = new KafkaConfig(props)
+    assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis																									)
+
+  }
+  
+
 }