You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2019/05/13 16:16:27 UTC
[kafka] branch trunk updated: KAFKA-7321: Add a Maximum Log
Compaction Lag (KIP-354) (#6009)
This is an automated email from the ASF dual-hosted git repository.
jjkoshy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1fdc853 KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (#6009)
1fdc853 is described below
commit 1fdc8533016e948b1d534145978252209d7612ed
Author: Xiongqi Wu <xi...@linkedin.com>
AuthorDate: Mon May 13 09:16:12 2019 -0700
KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (#6009)
KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)
Records become eligible for compaction after the specified time interval.
Author: Xiongqi Wu <xi...@linkedin.com>
Reviewer: Joel Koshy <jj...@gmail.com>
---
.../apache/kafka/common/config/TopicConfig.java | 4 +
core/src/main/scala/kafka/log/Log.scala | 16 +++-
core/src/main/scala/kafka/log/LogCleaner.scala | 41 +++++++++-
.../main/scala/kafka/log/LogCleanerManager.scala | 51 +++++++++---
core/src/main/scala/kafka/log/LogConfig.scala | 24 +++++-
core/src/main/scala/kafka/log/LogManager.scala | 2 +
core/src/main/scala/kafka/log/LogSegment.scala | 34 ++++++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 5 ++
core/src/main/scala/kafka/server/KafkaServer.scala | 1 +
.../log/AbstractLogCleanerIntegrationTest.scala | 19 +++--
.../unit/kafka/log/LogCleanerIntegrationTest.scala | 94 +++++++++++++++++++++-
.../kafka/log/LogCleanerLagIntegrationTest.scala | 10 +--
.../test/scala/unit/kafka/log/LogConfigTest.scala | 20 +++--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
14 files changed, 281 insertions(+), 41 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 57662d5..b1be6c8 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -113,6 +113,10 @@ public class TopicConfig {
"higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
"space in the log.";
+ public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";
+ public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " +
+ "ineligible for compaction in the log. Only applicable for logs that are being compacted.";
+
public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
public static final String CLEANUP_POLICY_COMPACT = "compact";
public static final String CLEANUP_POLICY_DELETE = "delete";
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 149a4f0..ef786be 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -159,7 +159,7 @@ case class RollParams(maxSegmentMs: Long,
object RollParams {
def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
- new RollParams(config.segmentMs,
+ new RollParams(config.maxSegmentMs,
config.segmentSize,
appendInfo.maxTimestamp,
appendInfo.lastOffset,
@@ -2028,6 +2028,20 @@ class Log(@volatile var dir: File,
}
/**
+ * This function does not acquire Log.lock. The caller has to make sure log segments don't get deleted during
+ * this call, and also protects against calling this function on the same segment in parallel.
+ *
+ * Currently, it is used by LogCleaner threads on log compact non-active segments only with LogCleanerManager's lock
+ * to ensure no other logcleaner threads and retention thread can work on the same segment.
+ */
+ private[log] def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = {
+ segments.map {
+ segment =>
+ segment.getFirstBatchTimestamp()
+ }
+ }
+
+ /**
* remove deleted log metrics
*/
private[log] def removeLogMetrics(): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 9c8010c..b972388 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -132,6 +132,12 @@ class LogCleaner(initialConfig: CleanerConfig,
new Gauge[Int] {
def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
})
+ // a metric to track delay between the time when a log is required to be compacted
+ // as determined by max compaction lag and the time of last cleaner run.
+ newGauge("max-compaction-delay-secs",
+ new Gauge[Int] {
+ def value: Int = Math.max(0, (cleaners.map(_.lastPreCleanStats).map(_.maxCompactionDelayMs).max / 1000).toInt)
+ })
/**
* Start the background cleaning
@@ -285,6 +291,7 @@ class LogCleaner(initialConfig: CleanerConfig,
checkDone = checkDone)
@volatile var lastStats: CleanerStats = new CleanerStats()
+ @volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
private def checkDone(topicPartition: TopicPartition) {
if (!isRunning)
@@ -310,10 +317,12 @@ class LogCleaner(initialConfig: CleanerConfig,
var currentLog: Option[Log] = None
try {
- val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+ val preCleanStats = new PreCleanStats()
+ val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
case None =>
false
case Some(cleanable) =>
+ this.lastPreCleanStats = preCleanStats
// there's a log, clean it
currentLog = Some(cleanable.log)
cleanLog(cleanable)
@@ -386,6 +395,9 @@ class LogCleaner(initialConfig: CleanerConfig,
"\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
info(message)
+ if (lastPreCleanStats.delayedPartitions > 0) {
+ info("\tCleanable partitions: %d, Delayed partitions: %d, max delay: %d".format(lastPreCleanStats.cleanablePartitions, lastPreCleanStats.delayedPartitions, lastPreCleanStats.maxCompactionDelayMs))
+ }
if (stats.invalidMessagesRead > 0) {
warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead))
}
@@ -947,6 +959,25 @@ private[log] class Cleaner(val id: Int,
}
/**
+ * A simple struct for collecting pre-clean stats
+ */
+private class PreCleanStats() {
+ var maxCompactionDelayMs = 0L
+ var delayedPartitions = 0
+ var cleanablePartitions = 0
+
+ def updateMaxCompactionDelay(delayMs: Long): Unit = {
+ maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs)
+ if (delayMs > 0) {
+ delayedPartitions += 1
+ }
+ }
+ def recordCleanablePartitions(numOfCleanables: Int): Unit = {
+ cleanablePartitions = numOfCleanables
+ }
+}
+
+/**
* A simple struct for collecting stats about log cleaning
*/
private class CleanerStats(time: Time = Time.SYSTEM) {
@@ -999,9 +1030,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
}
/**
- * Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position
- */
-private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
+ * Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position,
+ * and whether it needs compaction immediately.
+ */
+private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long,
+ uncleanableOffset: Long, needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
val totalBytes = cleanBytes + cleanableBytes
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index e4559b8..f8dce22 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -165,7 +165,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
* each time from the full set of logs to allow logs to be dynamically added to the pool of logs
* the log manager maintains.
*/
- def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
+ def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
inLock(lock) {
val now = time.milliseconds
this.timeOfLastRun = now
@@ -178,17 +178,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
}.map {
case (topicPartition, log) => // create a LogToClean instance for each
- val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
- lastClean, now)
- LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
+ val (firstDirtyOffset, firstUncleanableDirtyOffset) =
+ LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, now)
+
+ val compactionDelayMs = LogCleanerManager.getMaxCompactionDelay(log, firstDirtyOffset, now)
+ preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
+
+ LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
- // and must meet the minimum threshold for dirty byte ratio
- val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
+ // and must meet the minimum threshold for dirty byte ratio or have some bytes required to be compacted
+ val cleanableLogs = dirtyLogs.filter { ltc =>
+ (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
+ }
if(cleanableLogs.isEmpty) {
None
} else {
+ preCleanStats.recordCleanablePartitions(cleanableLogs.size)
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
@@ -476,6 +483,30 @@ private[log] object LogCleanerManager extends Logging {
log.config.compact && log.config.delete
}
+ /**
+ * get max delay between the time when log is required to be compacted as determined
+ * by maxCompactionLagMs and the current time.
+ */
+ def getMaxCompactionDelay(log: Log, firstDirtyOffset: Long, now: Long) : Long = {
+
+ val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
+
+ val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).filter(_ > 0)
+
+ val earliestDirtySegmentTimestamp = {
+ if (firstBatchTimestamps.nonEmpty)
+ firstBatchTimestamps.min
+ else Long.MaxValue
+ }
+
+ val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
+ val cleanUntilTime = now - maxCompactionLagMs
+
+ if (earliestDirtySegmentTimestamp < cleanUntilTime)
+ cleanUntilTime - earliestDirtySegmentTimestamp
+ else
+ 0L
+ }
/**
* Returns the range of dirty offsets that can be cleaned.
@@ -505,7 +536,7 @@ private[log] object LogCleanerManager extends Logging {
}
}
- val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
+ val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L)
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
@@ -519,12 +550,12 @@ private[log] object LogCleanerManager extends Logging {
Option(log.activeSegment.baseOffset),
// the first segment whose largest message timestamp is within a minimum time lag from now
- if (compactionLagMs > 0) {
+ if (minCompactionLagMs > 0) {
// dirty log segments
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
dirtyNonActiveSegments.find { s =>
- val isUncleanable = s.largestTimestamp > now - compactionLagMs
- debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
+ val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
+ debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; is uncleanable=$isUncleanable")
isUncleanable
}.map(_.baseOffset)
} else None
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index c1e5c62..c3684e8 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -46,6 +46,7 @@ object Defaults {
val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
+ val MaxCompactionLagMs = kafka.server.Defaults.LogCleanerMaxCompactionLagMs
val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
@deprecated(message = "This is a misleading variable name as it actually refers to the 'delete' cleanup policy. Use " +
@@ -85,6 +86,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
val compactionLagMs = getLong(LogConfig.MinCompactionLagMsProp)
+ val maxCompactionLagMs = getLong(LogConfig.MaxCompactionLagMsProp)
val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
val compact = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Compact)
val delete = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Delete)
@@ -101,6 +103,11 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
+
+ def maxSegmentMs :Long = {
+ if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, segmentMs)
+ else segmentMs
+ }
}
object LogConfig {
@@ -121,6 +128,7 @@ object LogConfig {
val IndexIntervalBytesProp = TopicConfig.INDEX_INTERVAL_BYTES_CONFIG
val DeleteRetentionMsProp = TopicConfig.DELETE_RETENTION_MS_CONFIG
val MinCompactionLagMsProp = TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG
+ val MaxCompactionLagMsProp = TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG
val FileDeleteDelayMsProp = TopicConfig.FILE_DELETE_DELAY_MS_CONFIG
val MinCleanableDirtyRatioProp = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
val CleanupPolicyProp = TopicConfig.CLEANUP_POLICY_CONFIG
@@ -152,6 +160,7 @@ object LogConfig {
val FileDeleteDelayMsDoc = TopicConfig.FILE_DELETE_DELAY_MS_DOC
val DeleteRetentionMsDoc = TopicConfig.DELETE_RETENTION_MS_DOC
val MinCompactionLagMsDoc = TopicConfig.MIN_COMPACTION_LAG_MS_DOC
+ val MaxCompactionLagMsDoc = TopicConfig.MAX_COMPACTION_LAG_MS_DOC
val MinCleanableRatioDoc = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC
val CompactDoc = TopicConfig.CLEANUP_POLICY_DOC
val UncleanLeaderElectionEnableDoc = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC
@@ -242,6 +251,8 @@ object LogConfig {
DeleteRetentionMsDoc, KafkaConfig.LogCleanerDeleteRetentionMsProp)
.define(MinCompactionLagMsProp, LONG, Defaults.MinCompactionLagMs, atLeast(0), MEDIUM, MinCompactionLagMsDoc,
KafkaConfig.LogCleanerMinCompactionLagMsProp)
+ .define(MaxCompactionLagMsProp, LONG, Defaults.MaxCompactionLagMs, atLeast(1), MEDIUM, MaxCompactionLagMsDoc,
+ KafkaConfig.LogCleanerMaxCompactionLagMsProp)
.define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc,
KafkaConfig.LogDeleteDelayMsProp)
.define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
@@ -299,12 +310,22 @@ object LogConfig {
private[kafka] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala
+ def validateValues(props: java.util.Map[_, _]): Unit = {
+ val minCompactionLag = props.get(MinCompactionLagMsProp).asInstanceOf[Long]
+ val maxCompactionLag = props.get(MaxCompactionLagMsProp).asInstanceOf[Long]
+ if (minCompactionLag > maxCompactionLag) {
+ throw new InvalidConfigurationException(s"conflict topic config setting $MinCompactionLagMsProp " +
+ s"($minCompactionLag) > $MaxCompactionLagMsProp ($maxCompactionLag)")
+ }
+ }
+
/**
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
def validate(props: Properties) {
validateNames(props)
- configDef.parse(props)
+ val valueMaps = configDef.parse(props)
+ validateValues(valueMaps)
}
/**
@@ -324,6 +345,7 @@ object LogConfig {
IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp,
DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp,
MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp,
+ MaxCompactionLagMsProp -> KafkaConfig.LogCleanerMaxCompactionLagMsProp,
FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp,
MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp,
CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index cae47f7..5a16193 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -994,6 +994,8 @@ object LogManager {
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel): LogManager = {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
+
+ LogConfig.validateValues(defaultProps)
val defaultLogConfig = LogConfig(defaultProps)
// read the log configurations from zookeeper
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 624f3ea..ecd85f9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -95,8 +95,9 @@ class LogSegment private[log] (val log: FileRecords,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
- /* The timestamp we used for time based log rolling */
- private var rollingBasedTimestamp: Option[Long] = None
+ // The timestamp we used for time based log rolling and for ensuring max compaction delay
+ // volatile for LogCleaner to see the update
+ @volatile private var rollingBasedTimestamp: Option[Long] = None
/* The maximum timestamp we see so far */
@volatile private var _maxTimestampSoFar: Option[Long] = None
@@ -523,6 +524,18 @@ class LogSegment private[log] (val log: FileRecords,
}
/**
+ * If not previously loaded,
+ * load the timestamp of the first message into memory.
+ */
+ private def loadFirstBatchTimestamp(): Unit = {
+ if (rollingBasedTimestamp.isEmpty) {
+ val iter = log.batches.iterator()
+ if (iter.hasNext)
+ rollingBasedTimestamp = Some(iter.next().maxTimestamp)
+ }
+ }
+
+ /**
* The time this segment has waited to be rolled.
* If the first message batch has a timestamp we use its timestamp to determine when to roll a segment. A segment
* is rolled if the difference between the new batch's timestamp and the first batch's timestamp exceeds the
@@ -533,11 +546,7 @@ class LogSegment private[log] (val log: FileRecords,
*/
def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
// Load the timestamp of the first message into memory
- if (rollingBasedTimestamp.isEmpty) {
- val iter = log.batches.iterator()
- if (iter.hasNext)
- rollingBasedTimestamp = Some(iter.next().maxTimestamp)
- }
+ loadFirstBatchTimestamp()
rollingBasedTimestamp match {
case Some(t) if t >= 0 => messageTimestamp - t
case _ => now - created
@@ -545,6 +554,17 @@ class LogSegment private[log] (val log: FileRecords,
}
/**
+ * @return the first batch timestamp if the timestamp is available. Otherwise return Long.MaxValue
+ */
+ def getFirstBatchTimestamp() : Long = {
+ loadFirstBatchTimestamp()
+ rollingBasedTimestamp match {
+ case Some(t) if t >= 0 => t
+ case _ => Long.MaxValue
+ }
+ }
+
+ /**
* Search the message offset based on timestamp and offset.
*
* This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 04f4f3b..526e174 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -103,6 +103,7 @@ object Defaults {
val LogCleanerEnable = true
val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
val LogCleanerMinCompactionLagMs = 0L
+ val LogCleanerMaxCompactionLagMs = Long.MaxValue
val LogIndexSizeMaxBytes = 10 * 1024 * 1024
val LogIndexIntervalBytes = 4096
val LogFlushIntervalMessages = Long.MaxValue
@@ -328,6 +329,7 @@ object KafkaConfig {
val LogCleanerEnableProp = "log.cleaner.enable"
val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
+ val LogCleanerMaxCompactionLagMsProp = "log.cleaner.max.compaction.lag.ms"
val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
val LogIndexIntervalBytesProp = "log.index.interval.bytes"
val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
@@ -615,6 +617,7 @@ object KafkaConfig {
val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
+ val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index"
val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk "
@@ -917,6 +920,7 @@ object KafkaConfig {
.define(LogCleanerEnableProp, BOOLEAN, Defaults.LogCleanerEnable, MEDIUM, LogCleanerEnableDoc)
.define(LogCleanerDeleteRetentionMsProp, LONG, Defaults.LogCleanerDeleteRetentionMs, MEDIUM, LogCleanerDeleteRetentionMsDoc)
.define(LogCleanerMinCompactionLagMsProp, LONG, Defaults.LogCleanerMinCompactionLagMs, MEDIUM, LogCleanerMinCompactionLagMsDoc)
+ .define(LogCleanerMaxCompactionLagMsProp, LONG, Defaults.LogCleanerMaxCompactionLagMs, MEDIUM, LogCleanerMaxCompactionLagMsDoc)
.define(LogIndexSizeMaxBytesProp, INT, Defaults.LogIndexSizeMaxBytes, atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
.define(LogIndexIntervalBytesProp, INT, Defaults.LogIndexIntervalBytes, atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
.define(LogFlushIntervalMessagesProp, LONG, Defaults.LogFlushIntervalMessages, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
@@ -1202,6 +1206,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
def logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
def logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
+ def logCleanerMaxCompactionLagMs = getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
def logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4a25811..07ffe9d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -70,6 +70,7 @@ object KafkaServer {
logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs)
+ logProps.put(LogConfig.MaxCompactionLagMsProp, kafkaConfig.logCleanerMaxCompactionLagMs)
logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index e778336..fe98ebf 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -43,9 +43,10 @@ abstract class AbstractLogCleanerIntegrationTest {
private val logs = ListBuffer.empty[Log]
private val defaultMaxMessageSize = 128
private val defaultMinCleanableDirtyRatio = 0.0F
- private val defaultCompactionLag = 0L
+ private val defaultMinCompactionLagMS = 0L
private val defaultDeleteDelay = 1000
private val defaultSegmentSize = 2048
+ private val defaultMaxCompactionLagMs = Long.MaxValue
def time: MockTime
@@ -61,9 +62,10 @@ abstract class AbstractLogCleanerIntegrationTest {
def logConfigProperties(propertyOverrides: Properties = new Properties(),
maxMessageSize: Int,
minCleanableDirtyRatio: Float = defaultMinCleanableDirtyRatio,
- compactionLag: Long = defaultCompactionLag,
+ minCompactionLagMs: Long = defaultMinCompactionLagMS,
deleteDelay: Int = defaultDeleteDelay,
- segmentSize: Int = defaultSegmentSize): Properties = {
+ segmentSize: Int = defaultSegmentSize,
+ maxCompactionLagMs: Long = defaultMaxCompactionLagMs): Properties = {
val props = new Properties()
props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
@@ -72,7 +74,8 @@ abstract class AbstractLogCleanerIntegrationTest {
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
- props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
+ props.put(LogConfig.MinCompactionLagMsProp, minCompactionLagMs: java.lang.Long)
+ props.put(LogConfig.MaxCompactionLagMsProp, maxCompactionLagMs: java.lang.Long)
props ++= propertyOverrides
props
}
@@ -82,9 +85,10 @@ abstract class AbstractLogCleanerIntegrationTest {
numThreads: Int = 1,
backOffMs: Long = 15000L,
maxMessageSize: Int = defaultMaxMessageSize,
- compactionLag: Long = defaultCompactionLag,
+ minCompactionLagMs: Long = defaultMinCompactionLagMS,
deleteDelay: Int = defaultDeleteDelay,
segmentSize: Int = defaultSegmentSize,
+ maxCompactionLagMs: Long = defaultMaxCompactionLagMs,
cleanerIoBufferSize: Option[Int] = None,
propertyOverrides: Properties = new Properties()): LogCleaner = {
@@ -96,9 +100,10 @@ abstract class AbstractLogCleanerIntegrationTest {
val logConfig = LogConfig(logConfigProperties(propertyOverrides,
maxMessageSize = maxMessageSize,
minCleanableDirtyRatio = minCleanableDirtyRatio,
- compactionLag = compactionLag,
+ minCompactionLagMs = minCompactionLagMs,
deleteDelay = deleteDelay,
- segmentSize = segmentSize))
+ segmentSize = segmentSize,
+ maxCompactionLagMs = maxCompactionLagMs))
val log = Log(dir,
logConfig,
logStartOffset = 0L,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index bfee811..2d342fa 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -25,9 +25,10 @@ import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
import org.apache.kafka.common.record.{CompressionType, RecordBatch}
-import org.junit.Assert.{assertFalse, assertTrue, fail}
+import org.junit.Assert._
import org.junit.Test
+import scala.collection.{Iterable, JavaConverters, Seq}
import scala.collection.JavaConverters.mapAsScalaMapConverter
/**
@@ -93,4 +94,95 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
}
+
+ @Test
+ def testMaxLogCompactionLag(): Unit = {
+ val msPerHour = 60 * 60 * 1000
+
+ val minCompactionLagMs = 1 * msPerHour
+ val maxCompactionLagMs = 6 * msPerHour
+
+ val cleanerBackOffMs = 200L
+ val segmentSize = 512
+ val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
+ val minCleanableDirtyRatio = 1.0F
+
+ cleaner = makeCleaner(partitions = topicPartitions,
+ backOffMs = cleanerBackOffMs,
+ minCompactionLagMs = minCompactionLagMs,
+ segmentSize = segmentSize,
+ maxCompactionLagMs= maxCompactionLagMs,
+ minCleanableDirtyRatio = minCleanableDirtyRatio)
+ val log = cleaner.logs.get(topicPartitions(0))
+
+ val T0 = time.milliseconds
+ writeKeyDups(numKeys = 100, numDups = 3, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 1)
+
+ val startSizeBlock0 = log.size
+
+ val activeSegAtT0 = log.activeSegment
+
+ cleaner.startup()
+
+ // advance to a time still less than maxCompactionLagMs from start
+ time.sleep(maxCompactionLagMs/2)
+ Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
+ assertEquals("There should be no cleaning until the max compaction lag has passed", startSizeBlock0, log.size)
+
+ // advance to time a bit more than one maxCompactionLagMs from start
+ time.sleep(maxCompactionLagMs/2 + 1)
+ val T1 = time.milliseconds
+
+ // write the second block of data: all zero keys
+ val appends1 = writeKeyDups(numKeys = 100, numDups = 1, log, CompressionType.NONE, timestamp = T1, startValue = 0, step = 0)
+
+ // roll the active segment
+ log.roll()
+ val activeSegAtT1 = log.activeSegment
+ val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset
+
+ // the first block should get cleaned
+ cleaner.awaitCleaned(new TopicPartition("log", 0), firstBlockCleanableSegmentOffset)
+
+ val read1 = readFromLog(log)
+ val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
+ assertTrue(s"log cleaner should have processed at least to offset $firstBlockCleanableSegmentOffset, " +
+ s"but lastCleaned=$lastCleaned", lastCleaned >= firstBlockCleanableSegmentOffset)
+
+ //minCleanableDirtyRatio will prevent second block of data from compacting
+ assertNotEquals(s"log should still contain non-zero keys", appends1, read1)
+
+ time.sleep(maxCompactionLagMs + 1)
+ // the second block should get cleaned. only zero keys left
+ cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT1.baseOffset)
+
+ val read2 = readFromLog(log)
+
+ assertEquals(s"log should only contains zero keys now", appends1, read2)
+
+ val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
+ val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset
+ assertTrue(s"log cleaner should have processed at least to offset $secondBlockCleanableSegmentOffset, " +
+ s"but lastCleaned=$lastCleaned2", lastCleaned2 >= secondBlockCleanableSegmentOffset)
+ }
+
+ private def readFromLog(log: Log): Iterable[(Int, Int)] = {
+ import JavaConverters._
+ for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
+ val key = TestUtils.readString(record.key).toInt
+ val value = TestUtils.readString(record.value).toInt
+ key -> value
+ }
+ }
+
+ private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
+ var valCounter = startValue
+ for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
+ val curValue = valCounter
+ log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec,
+ key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+ valCounter += step
+ (key, curValue)
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 6e8c9b9..0232e57 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -35,8 +35,8 @@ import scala.collection._
class LogCleanerLagIntegrationTest(compressionCodecName: String) extends AbstractLogCleanerIntegrationTest with Logging {
val msPerHour = 60 * 60 * 1000
- val compactionLag = 1 * msPerHour
- assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0)
+ val minCompactionLag = 1 * msPerHour
+ assertTrue("compactionLag must be divisible by 2 for this test", minCompactionLag % 2 == 0)
val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
val cleanerBackOffMs = 200L
@@ -50,7 +50,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
def cleanerTest(): Unit = {
cleaner = makeCleaner(partitions = topicPartitions,
backOffMs = cleanerBackOffMs,
- compactionLag = compactionLag,
+ minCompactionLagMs = minCompactionLag,
segmentSize = segmentSize)
val log = cleaner.logs.get(topicPartitions(0))
@@ -69,13 +69,13 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
// T0 < t < T1
// advance to a time still less than one compaction lag from start
- time.sleep(compactionLag/2)
+ time.sleep(minCompactionLag/2)
Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
assertEquals("There should be no cleaning until the compaction lag has passed", startSizeBlock0, log.size)
// t = T1 > T0 + compactionLag
// advance to time a bit more than one compaction lag from start
- time.sleep(compactionLag/2 + 1)
+ time.sleep(minCompactionLag/2 + 1)
val T1 = time.milliseconds
// write another block of data
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 66702d6..2cd7904 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -28,11 +28,11 @@ import org.scalatest.Assertions._
class LogConfigTest {
- /**
- * This test verifies that KafkaConfig object initialization does not depend on
- * LogConfig initialization. Bad things happen due to static initialization
- * order dependencies. For example, LogConfig.configDef ends up adding null
- * values in serverDefaultConfigNames. This test ensures that the mapping of
+ /**
+ * This test verifies that KafkaConfig object initialization does not depend on
+ * LogConfig initialization. Bad things happen due to static initialization
+ * order dependencies. For example, LogConfig.configDef ends up adding null
+ * values in serverDefaultConfigNames. This test ensures that the mapping of
* keys from LogConfig to KafkaConfig are not missing values.
*/
@Test
@@ -82,6 +82,16 @@ class LogConfigTest {
}
@Test
+ def testInvalidCompactionLagConfig(): Unit = {
+ val props = new Properties
+ props.setProperty(LogConfig.MaxCompactionLagMsProp, "100")
+ props.setProperty(LogConfig.MinCompactionLagMsProp, "200")
+ intercept[Exception] {
+ LogConfig.validate(props)
+ }
+ }
+
+ @Test
def shouldValidateThrottledReplicasConfig() {
assertTrue(isValid("*"))
assertTrue(isValid("* "))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1d7e687..9d28c1b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -635,6 +635,7 @@ class KafkaConfigTest {
case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")