You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/06 22:15:00 UTC

[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

    [ https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712082#comment-16712082 ] 

ASF GitHub Bot commented on KAFKA-7321:
---------------------------------------

xiowu0 closed pull request #5611: KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)
URL: https://github.com/apache/kafka/pull/5611
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 4410c971a16..927e1032e3c 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -113,6 +113,10 @@
         "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
         "space in the log.";
 
+    public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";
+    public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " +
+        "uncompacted in the log. Only applicable for logs that are being compacted.";
+
     public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
     public static final String CLEANUP_POLICY_COMPACT = "compact";
     public static final String CLEANUP_POLICY_DELETE = "delete";
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index bc328d77efc..710419bb2c1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -157,7 +157,7 @@ case class RollParams(maxSegmentMs: Long,
 
 object RollParams {
   def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
-   new RollParams(config.segmentMs,
+   new RollParams(config.maxSegmentMs,
      config.segmentSize,
      appendInfo.maxTimestamp,
      appendInfo.lastOffset,
@@ -1905,6 +1905,13 @@ class Log(@volatile var dir: File,
     }
   }
 
+  @threadsafe
+  private[log] def getFirstBatchTimestampForSegment(segment: LogSegment): Long = {
+    lock synchronized {
+      segment.getFirstBatchTimestamp()
+    }
+  }
+
   /**
    * remove deleted log metrics
    */
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8449e39d581..43b954ac7e8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -132,6 +132,12 @@ class LogCleaner(initialConfig: CleanerConfig,
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
            })
+  // a metric to track delay between the time when a log is required to be compacted
+  // as determined by max compaction lag and the time of last cleaner run.
+  newGauge("max-compaction-delay-secs",
+          new Gauge[Int] {
+          def value: Int = Math.max(0, (cleaners.map(_.lastPreCleanStats).map(_.maxCompactionDelayMs).max / 1000).toInt)
+          })
 
   /**
    * Start the background cleaning
@@ -285,6 +291,7 @@ class LogCleaner(initialConfig: CleanerConfig,
                               checkDone = checkDone)
 
     @volatile var lastStats: CleanerStats = new CleanerStats()
+    @volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
 
     private def checkDone(topicPartition: TopicPartition) {
       if (!isRunning)
@@ -310,10 +317,12 @@ class LogCleaner(initialConfig: CleanerConfig,
       var currentLog: Option[Log] = None
 
       try {
-        val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+        val preCleanStats = new PreCleanStats()
+        val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
           case None =>
             false
           case Some(cleanable) =>
+            this.lastPreCleanStats = preCleanStats
             // there's a log, clean it
             currentLog = Some(cleanable.log)
             cleanLog(cleanable)
@@ -930,6 +939,17 @@ private[log] class Cleaner(val id: Int,
   }
 }
 
+/**
+  * A simple struct for collecting pre-clean stats
+  */
+private class PreCleanStats() {
+  var maxCompactionDelayMs = 0L
+
+  def updateMaxCompactionDelay(delayMs: Long): Unit = {
+    maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs)
+  }
+}
+
 /**
  * A simple struct for collecting stats about log cleaning
  */
@@ -983,9 +1003,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
 }
 
 /**
- * Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position
- */
-private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
+  * Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position,
+  * and whether it needs compaction immediately.
+  */
+private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long,
+                              uncleanableOffset: Long, needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
   val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
   val totalBytes = cleanBytes + cleanableBytes
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 2fc7b749852..7bb5cdfe540 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -168,7 +168,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * each time from the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
     */
-  def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
     inLock(lock) {
       val now = time.milliseconds
       this.timeOfLastRun = now
@@ -181,14 +181,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
           inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
-          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
-            lastClean, now)
-          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
+          val (firstDirtyOffset, firstUncleanableDirtyOffset) =
+            LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, now)
+
+          val firstDirtySegmentCreateTime = LogCleanerManager.getFirstDirtySegmentEstimatedCreateTime(log, firstDirtyOffset)
+          val compactionDelayMs = LogCleanerManager.getCompactionDelay(log, firstDirtySegmentCreateTime, now)
+          preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
+
+          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
       }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
       this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
-      // and must meet the minimum threshold for dirty byte ratio
-      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
+      // and must meet the minimum threshold for dirty byte ratio or have some bytes required to be compacted
+      val cleanableLogs = dirtyLogs.filter { ltc =>
+        (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
+      }
       if(cleanableLogs.isEmpty) {
         None
       } else {
@@ -479,6 +486,41 @@ private[log] object LogCleanerManager extends Logging {
     log.config.compact && log.config.delete
   }
 
+  /**
+    * return first dirty segment estimated createTime
+    */
+  def getFirstDirtySegmentEstimatedCreateTime(log: Log, firstDirtyOffset: Long) : Long = {
+    val largestCleanTimestamp = log.logSegments(-1, firstDirtyOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
+    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
+
+    val firstDirtySegmentCreateTime: Long = dirtyNonActiveSegments.headOption match {
+      case None => -1
+      case Some(segment) =>
+        if (largestCleanTimestamp > 0)
+          largestCleanTimestamp
+        else {
+          val firstBatchTimestamp = log.getFirstBatchTimestampForSegment(segment)
+          // if timestamp of the first batch is unavailable,
+          // use largestTimestamp instead
+          if (firstBatchTimestamp > 0) firstBatchTimestamp
+          else segment.largestTimestamp
+        }
+    }
+    firstDirtySegmentCreateTime
+  }
+
+  /**
+    * get delay between the time when log is required to be compacted as determined
+    * by maxCompactionLagMs and the current time.
+    */
+  def getCompactionDelay(log: Log, firstDirtySegmentCreateTime: Long, now: Long) : Long = {
+    val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
+    val cleanUtilTime = now - maxCompactionLagMs
+    if (firstDirtySegmentCreateTime < cleanUtilTime && firstDirtySegmentCreateTime > 0)
+      cleanUtilTime - firstDirtySegmentCreateTime
+    else
+      0L
+  }
 
   /**
     * Returns the range of dirty offsets that can be cleaned.
@@ -508,7 +550,7 @@ private[log] object LogCleanerManager extends Logging {
       }
     }
 
-    val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
+    val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
     // find first segment that cannot be cleaned
     // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
@@ -522,12 +564,12 @@ private[log] object LogCleanerManager extends Logging {
       Option(log.activeSegment.baseOffset),
 
       // the first segment whose largest message timestamp is within a minimum time lag from now
-      if (compactionLagMs > 0) {
+      if (minCompactionLagMs > 0) {
         // dirty log segments
         val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
         dirtyNonActiveSegments.find { s =>
-          val isUncleanable = s.largestTimestamp > now - compactionLagMs
-          debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
+          val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
+          debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; is uncleanable=$isUncleanable")
           isUncleanable
         }.map(_.baseOffset)
       } else None
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index d872e09ed79..5f96dc81c91 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -46,6 +46,7 @@ object Defaults {
   val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
   val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
   val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
+  val MaxCompactionLagMs = kafka.server.Defaults.LogCleanerMaxCompactionLagMs
   val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
 
   @deprecated(message = "This is a misleading variable name as it actually refers to the 'delete' cleanup policy. Use " +
@@ -85,6 +86,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
   val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
   val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
   val compactionLagMs = getLong(LogConfig.MinCompactionLagMsProp)
+  val maxCompactionLagMs = getLong(LogConfig.MaxCompactionLagMsProp)
   val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
   val compact = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Compact)
   val delete = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Delete)
@@ -101,6 +103,11 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
+
+  def maxSegmentMs :Long = {
+    if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, segmentMs)
+    else segmentMs
+  }
 }
 
 object LogConfig {
@@ -121,6 +128,7 @@ object LogConfig {
   val IndexIntervalBytesProp = TopicConfig.INDEX_INTERVAL_BYTES_CONFIG
   val DeleteRetentionMsProp = TopicConfig.DELETE_RETENTION_MS_CONFIG
   val MinCompactionLagMsProp = TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG
+  val MaxCompactionLagMsProp = TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG
   val FileDeleteDelayMsProp = TopicConfig.FILE_DELETE_DELAY_MS_CONFIG
   val MinCleanableDirtyRatioProp = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
   val CleanupPolicyProp = TopicConfig.CLEANUP_POLICY_CONFIG
@@ -152,6 +160,7 @@ object LogConfig {
   val FileDeleteDelayMsDoc = TopicConfig.FILE_DELETE_DELAY_MS_DOC
   val DeleteRetentionMsDoc = TopicConfig.DELETE_RETENTION_MS_DOC
   val MinCompactionLagMsDoc = TopicConfig.MIN_COMPACTION_LAG_MS_DOC
+  val MaxCompactionLagMsDoc = TopicConfig.MAX_COMPACTION_LAG_MS_DOC
   val MinCleanableRatioDoc = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC
   val CompactDoc = TopicConfig.CLEANUP_POLICY_DOC
   val UncleanLeaderElectionEnableDoc = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC
@@ -242,6 +251,8 @@ object LogConfig {
         DeleteRetentionMsDoc, KafkaConfig.LogCleanerDeleteRetentionMsProp)
       .define(MinCompactionLagMsProp, LONG, Defaults.MinCompactionLagMs, atLeast(0), MEDIUM, MinCompactionLagMsDoc,
         KafkaConfig.LogCleanerMinCompactionLagMsProp)
+      .define(MaxCompactionLagMsProp, LONG, Defaults.MaxCompactionLagMs, atLeast(1), MEDIUM, MaxCompactionLagMsDoc,
+        KafkaConfig.LogCleanerMaxCompactionLagMsProp)
       .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc,
         KafkaConfig.LogDeleteDelayMsProp)
       .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
@@ -297,12 +308,21 @@ object LogConfig {
         throw new InvalidConfigurationException(s"Unknown topic config name: $name")
   }
 
+  def crossValidateCheck(props: java.util.Map[_, _]): Unit = {
+    val minCompactionLag =  props.get(MinCompactionLagMsProp).asInstanceOf[Long]
+    val maxCompactionLag =  props.get(MaxCompactionLagMsProp).asInstanceOf[Long]
+    if (minCompactionLag > maxCompactionLag) {
+      throw new InvalidConfigurationException(s"conflict topic config setting $MinCompactionLagMsProp " +
+        s"($minCompactionLag) > $MaxCompactionLagMsProp ($maxCompactionLag)")
+    }
+  }
   /**
    * Check that the given properties contain only valid log config names and that all values can be parsed and are valid
    */
   def validate(props: Properties) {
     validateNames(props)
-    configDef.parse(props)
+    val valueMaps = configDef.parse(props)
+    crossValidateCheck(valueMaps)
   }
 
   /**
@@ -322,6 +342,7 @@ object LogConfig {
     IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp,
     DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp,
     MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp,
+    MaxCompactionLagMsProp -> KafkaConfig.LogCleanerMaxCompactionLagMsProp,
     FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp,
     MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp,
     CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp,
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index d910a29100c..ebc7521c6c9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -91,7 +91,7 @@ class LogSegment private[log] (val log: FileRecords,
   private var bytesSinceLastIndexEntry = 0
 
   /* The timestamp we used for time based log rolling */
-  private var rollingBasedTimestamp: Option[Long] = None
+  @volatile private var rollingBasedTimestamp: Option[Long] = None
 
   /* The maximum timestamp we see so far */
   @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp
@@ -503,6 +503,18 @@ class LogSegment private[log] (val log: FileRecords,
     log.trim()
   }
 
+  /**
+    * If not previously loaded,
+    * load the timestamp of the first message into memory.
+    */
+  private def mayLoadFirstBatchTimestamp(): Unit = {
+    if (rollingBasedTimestamp.isEmpty) {
+      val iter = log.batches.iterator()
+      if (iter.hasNext)
+        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
+    }
+  }
+
   /**
    * The time this segment has waited to be rolled.
    * If the first message batch has a timestamp we use its timestamp to determine when to roll a segment. A segment
@@ -514,17 +526,24 @@ class LogSegment private[log] (val log: FileRecords,
    */
   def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
-    if (rollingBasedTimestamp.isEmpty) {
-      val iter = log.batches.iterator()
-      if (iter.hasNext)
-        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
-    }
+    mayLoadFirstBatchTimestamp()
     rollingBasedTimestamp match {
       case Some(t) if t >= 0 => messageTimestamp - t
       case _ => now - created
     }
   }
 
+  /**
+    * @return the first batch timestamp if the timestamp is available. Otherwise return 0
+    */
+  def getFirstBatchTimestamp() : Long = {
+    mayLoadFirstBatchTimestamp()
+    rollingBasedTimestamp match {
+      case Some(t) if t >= 0 => t
+      case _ => 0L
+    }
+  }
+
   /**
    * Search the message offset based on timestamp and offset.
    *
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 13f555a2059..f63d3521b36 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -102,6 +102,7 @@ object Defaults {
   val LogCleanerEnable = true
   val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
   val LogCleanerMinCompactionLagMs = 0L
+  val LogCleanerMaxCompactionLagMs = Long.MaxValue
   val LogIndexSizeMaxBytes = 10 * 1024 * 1024
   val LogIndexIntervalBytes = 4096
   val LogFlushIntervalMessages = Long.MaxValue
@@ -324,6 +325,7 @@ object KafkaConfig {
   val LogCleanerEnableProp = "log.cleaner.enable"
   val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
   val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
+  val LogCleanerMaxCompactionLagMsProp = "log.cleaner.max.compaction.lag.ms"
   val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
   val LogIndexIntervalBytesProp = "log.index.interval.bytes"
   val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
@@ -587,6 +589,7 @@ object KafkaConfig {
   val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
   val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
   val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
+  val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
   val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index"
   val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk "
@@ -886,6 +889,7 @@ object KafkaConfig {
       .define(LogCleanerEnableProp, BOOLEAN, Defaults.LogCleanerEnable, MEDIUM, LogCleanerEnableDoc)
       .define(LogCleanerDeleteRetentionMsProp, LONG, Defaults.LogCleanerDeleteRetentionMs, MEDIUM, LogCleanerDeleteRetentionMsDoc)
       .define(LogCleanerMinCompactionLagMsProp, LONG, Defaults.LogCleanerMinCompactionLagMs, MEDIUM, LogCleanerMinCompactionLagMsDoc)
+      .define(LogCleanerMaxCompactionLagMsProp, LONG, Defaults.LogCleanerMaxCompactionLagMs, MEDIUM, LogCleanerMaxCompactionLagMsDoc)
       .define(LogIndexSizeMaxBytesProp, INT, Defaults.LogIndexSizeMaxBytes, atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
       .define(LogIndexIntervalBytesProp, INT, Defaults.LogIndexIntervalBytes, atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
       .define(LogFlushIntervalMessagesProp, LONG, Defaults.LogFlushIntervalMessages, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
@@ -1169,6 +1173,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
   def logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
   def logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
+  def logCleanerMaxCompactionLagMs = getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
   val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
   def logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
   val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
@@ -1445,5 +1450,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
         s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
           s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
           " authentication responses from timing out")
+
+    require(logCleanerMinCompactionLagMs >= 0 && logCleanerMaxCompactionLagMs > 0 &&
+      logCleanerMinCompactionLagMs <= logCleanerMaxCompactionLagMs,
+      s"${KafkaConfig.LogCleanerMinCompactionLagMsProp} must be less than or equal to ${KafkaConfig.LogCleanerMaxCompactionLagMsProp} and non-negative" )
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5f9b3612c4f..608907ccf60 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -69,6 +69,7 @@ object KafkaServer {
     logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
     logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
     logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs)
+    logProps.put(LogConfig.MaxCompactionLagMsProp, kafkaConfig.logCleanerMaxCompactionLagMs)
     logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
     logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
     logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index cc35f1d9ff6..9f99fbdb801 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -40,9 +40,10 @@ abstract class AbstractLogCleanerIntegrationTest {
   private val logs = ListBuffer.empty[Log]
   private val defaultMaxMessageSize = 128
   private val defaultMinCleanableDirtyRatio = 0.0F
-  private val defaultCompactionLag = 0L
+  private val defaultMinCompactionLagMS = 0L
   private val defaultDeleteDelay = 1000
   private val defaultSegmentSize = 2048
+  private val defaultMaxCompactionLagMs = Long.MaxValue
 
   def time: MockTime
 
@@ -58,9 +59,10 @@ abstract class AbstractLogCleanerIntegrationTest {
   def logConfigProperties(propertyOverrides: Properties = new Properties(),
                           maxMessageSize: Int,
                           minCleanableDirtyRatio: Float = defaultMinCleanableDirtyRatio,
-                          compactionLag: Long = defaultCompactionLag,
+                          minCompactionLagMs: Long = defaultMinCompactionLagMS,
                           deleteDelay: Int = defaultDeleteDelay,
-                          segmentSize: Int = defaultSegmentSize): Properties = {
+                          segmentSize: Int = defaultSegmentSize,
+                          maxCompactionLagMs: Long = defaultMaxCompactionLagMs): Properties = {
     val props = new Properties()
     props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
     props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
@@ -69,7 +71,8 @@ abstract class AbstractLogCleanerIntegrationTest {
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
     props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
-    props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
+    props.put(LogConfig.MinCompactionLagMsProp, minCompactionLagMs: java.lang.Long)
+    props.put(LogConfig.MaxCompactionLagMsProp, maxCompactionLagMs: java.lang.Long)
     props ++= propertyOverrides
     props
   }
@@ -79,9 +82,10 @@ abstract class AbstractLogCleanerIntegrationTest {
                   numThreads: Int = 1,
                   backOffMs: Long = 15000L,
                   maxMessageSize: Int = defaultMaxMessageSize,
-                  compactionLag: Long = defaultCompactionLag,
+                  minCompactionLagMs: Long = defaultMinCompactionLagMS,
                   deleteDelay: Int = defaultDeleteDelay,
                   segmentSize: Int = defaultSegmentSize,
+                  maxCompactionLagMs: Long = defaultMaxCompactionLagMs,
                   cleanerIoBufferSize: Option[Int] = None,
                   propertyOverrides: Properties = new Properties()): LogCleaner = {
 
@@ -93,9 +97,10 @@ abstract class AbstractLogCleanerIntegrationTest {
       val logConfig = LogConfig(logConfigProperties(propertyOverrides,
         maxMessageSize = maxMessageSize,
         minCleanableDirtyRatio = minCleanableDirtyRatio,
-        compactionLag = compactionLag,
+        minCompactionLagMs = minCompactionLagMs,
         deleteDelay = deleteDelay,
-        segmentSize = segmentSize))
+        segmentSize = segmentSize,
+        maxCompactionLagMs = maxCompactionLagMs))
       val log = Log(dir,
         logConfig,
         logStartOffset = 0L,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index bfee811167c..2d342facf52 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -25,9 +25,10 @@ import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
 import org.apache.kafka.common.record.{CompressionType, RecordBatch}
-import org.junit.Assert.{assertFalse, assertTrue, fail}
+import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.{Iterable, JavaConverters, Seq}
 import scala.collection.JavaConverters.mapAsScalaMapConverter
 
 /**
@@ -93,4 +94,95 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
     assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
     assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
   }
+
+  @Test
+  def testMaxLogCompactionLag(): Unit = {
+    val msPerHour = 60 * 60 * 1000
+
+    val minCompactionLagMs = 1 * msPerHour
+    val maxCompactionLagMs = 6 * msPerHour
+
+    val cleanerBackOffMs = 200L
+    val segmentSize = 512
+    val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
+    val minCleanableDirtyRatio = 1.0F
+
+    cleaner = makeCleaner(partitions = topicPartitions,
+      backOffMs = cleanerBackOffMs,
+      minCompactionLagMs = minCompactionLagMs,
+      segmentSize = segmentSize,
+      maxCompactionLagMs= maxCompactionLagMs,
+      minCleanableDirtyRatio = minCleanableDirtyRatio)
+    val log = cleaner.logs.get(topicPartitions(0))
+
+    val T0 = time.milliseconds
+    writeKeyDups(numKeys = 100, numDups = 3, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 1)
+
+    val startSizeBlock0 = log.size
+
+    val activeSegAtT0 = log.activeSegment
+
+    cleaner.startup()
+
+    // advance to a time still less than maxCompactionLagMs from start
+    time.sleep(maxCompactionLagMs/2)
+    Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
+    assertEquals("There should be no cleaning until the max compaction lag has passed", startSizeBlock0, log.size)
+
+    // advance to time a bit more than one maxCompactionLagMs from start
+    time.sleep(maxCompactionLagMs/2 + 1)
+    val T1 = time.milliseconds
+
+    // write the second block of data: all zero keys
+    val appends1 = writeKeyDups(numKeys = 100, numDups = 1, log, CompressionType.NONE, timestamp = T1, startValue = 0, step = 0)
+
+    // roll the active segment
+    log.roll()
+    val activeSegAtT1 = log.activeSegment
+    val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset
+
+    // the first block should get cleaned
+    cleaner.awaitCleaned(new TopicPartition("log", 0), firstBlockCleanableSegmentOffset)
+
+    val read1 = readFromLog(log)
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
+    assertTrue(s"log cleaner should have processed at least to offset $firstBlockCleanableSegmentOffset, " +
+      s"but lastCleaned=$lastCleaned", lastCleaned >= firstBlockCleanableSegmentOffset)
+
+    //minCleanableDirtyRatio  will prevent second block of data from compacting
+    assertNotEquals(s"log should still contain non-zero keys", appends1, read1)
+
+    time.sleep(maxCompactionLagMs + 1)
+    // the second block should get cleaned. only zero keys left
+    cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT1.baseOffset)
+
+    val read2 = readFromLog(log)
+
+    assertEquals(s"log should only contains zero keys now", appends1, read2)
+
+    val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
+    val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset
+    assertTrue(s"log cleaner should have processed at least to offset $secondBlockCleanableSegmentOffset, " +
+      s"but lastCleaned=$lastCleaned2", lastCleaned2 >= secondBlockCleanableSegmentOffset)
+  }
+
+  private def readFromLog(log: Log): Iterable[(Int, Int)] = {
+    import JavaConverters._
+    for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
+      val key = TestUtils.readString(record.key).toInt
+      val value = TestUtils.readString(record.value).toInt
+      key -> value
+    }
+  }
+
+  private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
+    var valCounter = startValue
+    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
+      val curValue = valCounter
+      log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec,
+        key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+      valCounter += step
+      (key, curValue)
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 6e8c9b9d336..0232e5772d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -35,8 +35,8 @@ import scala.collection._
 class LogCleanerLagIntegrationTest(compressionCodecName: String) extends AbstractLogCleanerIntegrationTest with Logging {
   val msPerHour = 60 * 60 * 1000
 
-  val compactionLag = 1 * msPerHour
-  assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0)
+  val minCompactionLag = 1 * msPerHour
+  assertTrue("compactionLag must be divisible by 2 for this test", minCompactionLag % 2 == 0)
 
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val cleanerBackOffMs = 200L
@@ -50,7 +50,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
   def cleanerTest(): Unit = {
     cleaner = makeCleaner(partitions = topicPartitions,
       backOffMs = cleanerBackOffMs,
-      compactionLag = compactionLag,
+      minCompactionLagMs = minCompactionLag,
       segmentSize = segmentSize)
     val log = cleaner.logs.get(topicPartitions(0))
 
@@ -69,13 +69,13 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
 
     // T0 < t < T1
     // advance to a time still less than one compaction lag from start
-    time.sleep(compactionLag/2)
+    time.sleep(minCompactionLag/2)
     Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
     assertEquals("There should be no cleaning until the compaction lag has passed", startSizeBlock0, log.size)
 
     // t = T1 > T0 + compactionLag
     // advance to time a bit more than one compaction lag from start
-    time.sleep(compactionLag/2 + 1)
+    time.sleep(minCompactionLag/2 + 1)
     val T1 = time.milliseconds
 
     // write another block of data
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 10dba1a4793..1c7fbd97c40 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -610,6 +610,7 @@ class KafkaConfigTest {
         case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
         case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
         case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7321
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7321
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>            Reporter: xiongqi wu
>            Assignee: xiongqi wu
>            Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for deletion while other messages can be retained for a relatively longer time.  Today, a log segment may remain un-compacted for a long time since the eligibility for log compaction is determined based on compaction ratio (“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") setting.  Ability to delete a log message through compaction in a timely manner has become an important requirement in some use cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal Identifiable information) data within 7 days while keeping non-PII indefinitely in compacted format.  The goal of this change is to provide a time-based compaction policy that ensures the cleanable section is compacted after the specified time interval regardless of dirty ratio and “min compaction lag”.  However, dirty ratio and “min compaction lag” are still honored if the time based compaction rule is not violated. In other words, if Kafka receives a deletion request on a key (e..g, a key with null value), the corresponding log segment will be picked up for compaction after the configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)