You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/10 17:27:40 UTC
git commit: kafka-1325; Fix inconsistent per topic log configs;
patched by Manikumar Reddy; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk cd3ce27d4 -> 420628d69
kafka-1325; Fix inconsistent per topic log configs; patched by Manikumar Reddy; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/420628d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/420628d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/420628d6
Branch: refs/heads/trunk
Commit: 420628d695cc675711b94af5cfd14653147bf7f7
Parents: cd3ce27
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Thu Jul 10 08:27:33 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jul 10 08:27:33 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/server/KafkaConfig.scala | 23 +++++++--
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../unit/kafka/server/KafkaConfigTest.scala | 53 ++++++++++++++++++++
3 files changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/420628d6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ef75b67..bb2e654 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -35,14 +35,29 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
private def getLogRetentionTimeMillis(): Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
- if(props.containsKey("log.retention.minutes")){
+
+ if(props.containsKey("log.retention.ms")){
+ props.getIntInRange("log.retention.ms", (1, Int.MaxValue))
+ }
+ else if(props.containsKey("log.retention.minutes")){
millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
- } else {
+ }
+ else {
millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
}
-
}
+ private def getLogRollTimeMillis(): Long = {
+ val millisInHour = 60L * 60L * 1000L
+
+ if(props.containsKey("log.roll.ms")){
+ props.getIntInRange("log.roll.ms", (1, Int.MaxValue))
+ }
+ else {
+ millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
+ }
+ }
+
/*********** General Configuration ***********/
/* the broker id for this server */
@@ -105,7 +120,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum time before a new log segment is rolled out */
- val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
+ val logRollTimeMillis = getLogRollTimeMillis
/* the number of hours to keep a log file before deleting it */
val logRetentionTimeMillis = getLogRetentionTimeMillis
http://git-wip-us.apache.org/repos/asf/kafka/blob/420628d6/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c22e51e..5a56f57 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -275,7 +275,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
- segmentMs = 60L * 60L * 1000L * config.logRollHours,
+ segmentMs = config.logRollTimeMillis,
flushInterval = config.logFlushIntervalMessages,
flushMs = config.logFlushIntervalMs.toLong,
retentionSize = config.logRetentionBytes,
http://git-wip-us.apache.org/repos/asf/kafka/blob/420628d6/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6f4809d..2377abe 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -45,6 +45,16 @@ class KafkaConfigTest extends JUnit3Suite {
}
@Test
+ def testLogRetentionTimeMsProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+ props.put("log.retention.ms", "1800000")
+
+ val cfg = new KafkaConfig(props)
+ assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+ }
+
+ @Test
def testLogRetentionTimeNoConfigProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
@@ -63,6 +73,17 @@ class KafkaConfigTest extends JUnit3Suite {
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
+
+ @Test
+ def testLogRetentionTimeBothMinutesAndMsProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+ props.put("log.retention.ms", "1800000")
+ props.put("log.retention.minutes", "10")
+
+ val cfg = new KafkaConfig(props)
+ assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+ }
@Test
def testAdvertiseDefaults() {
@@ -129,4 +150,36 @@ class KafkaConfigTest extends JUnit3Suite {
new KafkaConfig(props)
}
}
+
+ @Test
+ def testLogRollTimeMsProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+ props.put("log.roll.ms", "1800000")
+
+ val cfg = new KafkaConfig(props)
+ assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
+
+ }
+
+ @Test
+ def testLogRollTimeBothMsAndHoursProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+ props.put("log.roll.ms", "1800000")
+ props.put("log.roll.hours", "1")
+
+ val cfg = new KafkaConfig(props)
+ assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
+
+ }
+
+ @Test
+ def testLogRollTimeNoConfigProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+
+ val cfg = new KafkaConfig(props)
+ assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis )
+
+ }
+
+
}