You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/04/09 19:47:40 UTC

git commit: KAFKA-1373; Set first dirty (uncompacted) offset to first offset of the log if no checkpoint exists. Reviewed by Neha Narkhede and Timothy Chen.

Repository: kafka
Updated Branches:
  refs/heads/trunk 75d5f5bff -> 8d15de851


KAFKA-1373; Set first dirty (uncompacted) offset to first offset of the
log if no checkpoint exists. Reviewed by Neha Narkhede and Timothy Chen.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d15de85
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d15de85
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d15de85

Branch: refs/heads/trunk
Commit: 8d15de85114da6012530f0dd837f131bd1e367cd
Parents: 75d5f5b
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue Apr 8 14:21:46 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed Apr 9 10:45:10 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleanerManager.scala       | 9 +++++----
 core/src/main/scala/kafka/server/KafkaConfig.scala          | 2 +-
 core/src/main/scala/kafka/server/OffsetCheckpoint.scala     | 2 +-
 core/src/main/scala/kafka/server/OffsetManager.scala        | 2 +-
 .../scala/unit/kafka/log/LogCleanerIntegrationTest.scala    | 2 +-
 5 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 6a98134..514941c 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -75,10 +75,11 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   def grabFilthiestLog(): Option[LogToClean] = {
     inLock(lock) {
       val lastClean = allCleanerCheckpoints()
-      val dirtyLogs = logs.filter(l => l._2.config.compact)                                    // skip any logs marked for delete rather than dedupe
-                              .filterNot(l => inProgress.contains(l._1))                       // skip any logs already in-progress
-                              .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0)))  // create a LogToClean instance for each
-                              .filter(l => l.totalBytes > 0)                                   // skip any empty logs
+      val dirtyLogs = logs.filter(l => l._2.config.compact)          // skip any logs marked for delete rather than dedupe
+                          .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
+                          .map(l => LogToClean(l._1, l._2,           // create a LogToClean instance for each
+                                               lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
+                          .filter(l => l.totalBytes > 0)             // skip any empty logs
       if(!dirtyLogs.isEmpty)
         this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio
       val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b0506d4..d0bbeb6 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -116,7 +116,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))
   
-  /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */
+  /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "compact" */
   val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
   
   /* the number of background threads to use for log cleaning */

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index 19f61a9..7af2f43 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
               val topic = pieces(0)
               val partition = pieces(1).toInt
               val offset = pieces(2).toLong
-              offsets += (TopicAndPartition(pieces(0), partition) -> offset)
+              offsets += (TopicAndPartition(topic, partition) -> offset)
               line = reader.readLine()
             }
             if(offsets.size != expectedSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 89a88a7..5417628 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -159,7 +159,7 @@ class OffsetManager(val config: OffsetManagerConfig,
   def offsetsTopicConfig: Properties = {
     val props = new Properties
     props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
-    props.put(LogConfig.CleanupPolicyProp, "dedupe")
+    props.put(LogConfig.CleanupPolicyProp, "compact")
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 9aeb69d..5bfa764 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -92,7 +92,7 @@ class LogCleanerIntegrationTest extends JUnitSuite {
   def makeCleaner(parts: Int, 
                   minDirtyMessages: Int = 0, 
                   numThreads: Int = 1,
-                  defaultPolicy: String = "dedupe",
+                  defaultPolicy: String = "compact",
                   policyOverrides: Map[String, String] = Map()): LogCleaner = {
     
     // create partitions and add them to the pool