You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/04/09 19:47:40 UTC
git commit: KAFKA-1373;
Set first dirty (uncompacted) offset to first offset of the log if no
checkpoint exists. Reviewed by Neha Narkhede and Timothy Chen.
Repository: kafka
Updated Branches:
refs/heads/trunk 75d5f5bff -> 8d15de851
KAFKA-1373; Set first dirty (uncompacted) offset to first offset of the
log if no checkpoint exists. Reviewed by Neha Narkhede and Timothy Chen.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d15de85
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d15de85
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d15de85
Branch: refs/heads/trunk
Commit: 8d15de85114da6012530f0dd837f131bd1e367cd
Parents: 75d5f5b
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue Apr 8 14:21:46 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed Apr 9 10:45:10 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleanerManager.scala | 9 +++++----
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
core/src/main/scala/kafka/server/OffsetCheckpoint.scala | 2 +-
core/src/main/scala/kafka/server/OffsetManager.scala | 2 +-
.../scala/unit/kafka/log/LogCleanerIntegrationTest.scala | 2 +-
5 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 6a98134..514941c 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -75,10 +75,11 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
def grabFilthiestLog(): Option[LogToClean] = {
inLock(lock) {
val lastClean = allCleanerCheckpoints()
- val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe
- .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
- .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each
- .filter(l => l.totalBytes > 0) // skip any empty logs
+ val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe
+ .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
+ .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each
+ lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
+ .filter(l => l.totalBytes > 0) // skip any empty logs
if(!dirtyLogs.isEmpty)
this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio
val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b0506d4..d0bbeb6 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -116,7 +116,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))
- /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */
+ /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "compact" */
val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
/* the number of background threads to use for log cleaning */
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index 19f61a9..7af2f43 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
val topic = pieces(0)
val partition = pieces(1).toInt
val offset = pieces(2).toLong
- offsets += (TopicAndPartition(pieces(0), partition) -> offset)
+ offsets += (TopicAndPartition(topic, partition) -> offset)
line = reader.readLine()
}
if(offsets.size != expectedSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 89a88a7..5417628 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -159,7 +159,7 @@ class OffsetManager(val config: OffsetManagerConfig,
def offsetsTopicConfig: Properties = {
val props = new Properties
props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
- props.put(LogConfig.CleanupPolicyProp, "dedupe")
+ props.put(LogConfig.CleanupPolicyProp, "compact")
props
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d15de85/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 9aeb69d..5bfa764 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -92,7 +92,7 @@ class LogCleanerIntegrationTest extends JUnitSuite {
def makeCleaner(parts: Int,
minDirtyMessages: Int = 0,
numThreads: Int = 1,
- defaultPolicy: String = "dedupe",
+ defaultPolicy: String = "compact",
policyOverrides: Map[String, String] = Map()): LogCleaner = {
// create partitions and add them to the pool