You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2019/05/13 16:16:27 UTC

[kafka] branch trunk updated: KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (#6009)

This is an automated email from the ASF dual-hosted git repository.

jjkoshy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1fdc853  KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (#6009)
1fdc853 is described below

commit 1fdc8533016e948b1d534145978252209d7612ed
Author: Xiongqi Wu <xi...@linkedin.com>
AuthorDate: Mon May 13 09:16:12 2019 -0700

    KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (#6009)
    
    KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)
    Records become eligible for compaction after the specified time interval.
    
    Author: Xiongqi Wu <xi...@linkedin.com>
    Reviewer: Joel Koshy <jj...@gmail.com>
---
 .../apache/kafka/common/config/TopicConfig.java    |  4 +
 core/src/main/scala/kafka/log/Log.scala            | 16 +++-
 core/src/main/scala/kafka/log/LogCleaner.scala     | 41 +++++++++-
 .../main/scala/kafka/log/LogCleanerManager.scala   | 51 +++++++++---
 core/src/main/scala/kafka/log/LogConfig.scala      | 24 +++++-
 core/src/main/scala/kafka/log/LogManager.scala     |  2 +
 core/src/main/scala/kafka/log/LogSegment.scala     | 34 ++++++--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  5 ++
 core/src/main/scala/kafka/server/KafkaServer.scala |  1 +
 .../log/AbstractLogCleanerIntegrationTest.scala    | 19 +++--
 .../unit/kafka/log/LogCleanerIntegrationTest.scala | 94 +++++++++++++++++++++-
 .../kafka/log/LogCleanerLagIntegrationTest.scala   | 10 +--
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 20 +++--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  1 +
 14 files changed, 281 insertions(+), 41 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 57662d5..b1be6c8 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -113,6 +113,10 @@ public class TopicConfig {
         "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
         "space in the log.";
 
+    public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";
+    public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " +
+        "ineligible for compaction in the log. Only applicable for logs that are being compacted.";
+
     public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
     public static final String CLEANUP_POLICY_COMPACT = "compact";
     public static final String CLEANUP_POLICY_DELETE = "delete";
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 149a4f0..ef786be 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -159,7 +159,7 @@ case class RollParams(maxSegmentMs: Long,
 
 object RollParams {
   def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
-   new RollParams(config.segmentMs,
+   new RollParams(config.maxSegmentMs,
      config.segmentSize,
      appendInfo.maxTimestamp,
      appendInfo.lastOffset,
@@ -2028,6 +2028,20 @@ class Log(@volatile var dir: File,
   }
 
   /**
+    * This function does not acquire Log.lock. The caller has to make sure log segments don't get deleted during
+    * this call, and also protects against calling this function on the same segment in parallel.
+    *
+    * Currently, it is used by LogCleaner threads on log compact non-active segments only with LogCleanerManager's lock
+    * to ensure no other logcleaner threads and retention thread can work on the same segment.
+    */
+  private[log] def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = {
+    segments.map {
+      segment =>
+        segment.getFirstBatchTimestamp()
+    }
+  }
+
+  /**
    * remove deleted log metrics
    */
   private[log] def removeLogMetrics(): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 9c8010c..b972388 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -132,6 +132,12 @@ class LogCleaner(initialConfig: CleanerConfig,
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
            })
+  // a metric to track delay between the time when a log is required to be compacted
+  // as determined by max compaction lag and the time of last cleaner run.
+  newGauge("max-compaction-delay-secs",
+          new Gauge[Int] {
+          def value: Int = Math.max(0, (cleaners.map(_.lastPreCleanStats).map(_.maxCompactionDelayMs).max / 1000).toInt)
+          })
 
   /**
    * Start the background cleaning
@@ -285,6 +291,7 @@ class LogCleaner(initialConfig: CleanerConfig,
                               checkDone = checkDone)
 
     @volatile var lastStats: CleanerStats = new CleanerStats()
+    @volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
 
     private def checkDone(topicPartition: TopicPartition) {
       if (!isRunning)
@@ -310,10 +317,12 @@ class LogCleaner(initialConfig: CleanerConfig,
       var currentLog: Option[Log] = None
 
       try {
-        val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+        val preCleanStats = new PreCleanStats()
+        val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
           case None =>
             false
           case Some(cleanable) =>
+            this.lastPreCleanStats = preCleanStats
             // there's a log, clean it
             currentLog = Some(cleanable.log)
             cleanLog(cleanable)
@@ -386,6 +395,9 @@ class LogCleaner(initialConfig: CleanerConfig,
         "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
                                                                    100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
       info(message)
+      if (lastPreCleanStats.delayedPartitions > 0) {
+        info("\tCleanable partitions: %d, Delayed partitions: %d, max delay: %d".format(lastPreCleanStats.cleanablePartitions, lastPreCleanStats.delayedPartitions, lastPreCleanStats.maxCompactionDelayMs))
+      }
       if (stats.invalidMessagesRead > 0) {
         warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead))
       }
@@ -947,6 +959,25 @@ private[log] class Cleaner(val id: Int,
 }
 
 /**
+  * A simple struct for collecting pre-clean stats
+  */
+private class PreCleanStats() {
+  var maxCompactionDelayMs = 0L
+  var delayedPartitions = 0
+  var cleanablePartitions = 0
+
+  def updateMaxCompactionDelay(delayMs: Long): Unit = {
+    maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs)
+    if (delayMs > 0) {
+      delayedPartitions += 1
+    }
+  }
+  def recordCleanablePartitions(numOfCleanables: Int): Unit = {
+    cleanablePartitions = numOfCleanables
+  }
+}
+
+/**
  * A simple struct for collecting stats about log cleaning
  */
 private class CleanerStats(time: Time = Time.SYSTEM) {
@@ -999,9 +1030,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
 }
 
 /**
- * Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position
- */
-private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
+  * Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position,
+  * and whether it needs compaction immediately.
+  */
+private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long,
+                              uncleanableOffset: Long, needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
   val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
   val totalBytes = cleanBytes + cleanableBytes
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index e4559b8..f8dce22 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -165,7 +165,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * each time from the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
     */
-  def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
     inLock(lock) {
       val now = time.milliseconds
       this.timeOfLastRun = now
@@ -178,17 +178,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
           inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
-          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
-            lastClean, now)
-          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
+          val (firstDirtyOffset, firstUncleanableDirtyOffset) =
+            LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, now)
+
+          val compactionDelayMs = LogCleanerManager.getMaxCompactionDelay(log, firstDirtyOffset, now)
+          preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
+
+          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
       }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
       this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
-      // and must meet the minimum threshold for dirty byte ratio
-      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
+      // and must meet the minimum threshold for dirty byte ratio or have some bytes required to be compacted
+      val cleanableLogs = dirtyLogs.filter { ltc =>
+        (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
+      }
       if(cleanableLogs.isEmpty) {
         None
       } else {
+        preCleanStats.recordCleanablePartitions(cleanableLogs.size)
         val filthiest = cleanableLogs.max
         inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
         Some(filthiest)
@@ -476,6 +483,30 @@ private[log] object LogCleanerManager extends Logging {
     log.config.compact && log.config.delete
   }
 
+  /**
+    * get max delay between the time when log is required to be compacted as determined
+    * by maxCompactionLagMs and the current time.
+    */
+  def getMaxCompactionDelay(log: Log, firstDirtyOffset: Long, now: Long) : Long = {
+
+    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
+
+    val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).filter(_ > 0)
+
+    val earliestDirtySegmentTimestamp = {
+      if (firstBatchTimestamps.nonEmpty)
+        firstBatchTimestamps.min
+      else Long.MaxValue
+    }
+
+    val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
+    val cleanUntilTime = now - maxCompactionLagMs
+
+    if (earliestDirtySegmentTimestamp < cleanUntilTime)
+      cleanUntilTime - earliestDirtySegmentTimestamp
+    else
+      0L
+  }
 
   /**
     * Returns the range of dirty offsets that can be cleaned.
@@ -505,7 +536,7 @@ private[log] object LogCleanerManager extends Logging {
       }
     }
 
-    val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
+    val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
     // find first segment that cannot be cleaned
     // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
@@ -519,12 +550,12 @@ private[log] object LogCleanerManager extends Logging {
       Option(log.activeSegment.baseOffset),
 
       // the first segment whose largest message timestamp is within a minimum time lag from now
-      if (compactionLagMs > 0) {
+      if (minCompactionLagMs > 0) {
         // dirty log segments
         val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
         dirtyNonActiveSegments.find { s =>
-          val isUncleanable = s.largestTimestamp > now - compactionLagMs
-          debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
+          val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
+          debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; is uncleanable=$isUncleanable")
           isUncleanable
         }.map(_.baseOffset)
       } else None
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index c1e5c62..c3684e8 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -46,6 +46,7 @@ object Defaults {
   val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
   val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
   val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
+  val MaxCompactionLagMs = kafka.server.Defaults.LogCleanerMaxCompactionLagMs
   val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
 
   @deprecated(message = "This is a misleading variable name as it actually refers to the 'delete' cleanup policy. Use " +
@@ -85,6 +86,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
   val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
   val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
   val compactionLagMs = getLong(LogConfig.MinCompactionLagMsProp)
+  val maxCompactionLagMs = getLong(LogConfig.MaxCompactionLagMsProp)
   val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
   val compact = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Compact)
   val delete = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Delete)
@@ -101,6 +103,11 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
+
+  def maxSegmentMs :Long = {
+    if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, segmentMs)
+    else segmentMs
+  }
 }
 
 object LogConfig {
@@ -121,6 +128,7 @@ object LogConfig {
   val IndexIntervalBytesProp = TopicConfig.INDEX_INTERVAL_BYTES_CONFIG
   val DeleteRetentionMsProp = TopicConfig.DELETE_RETENTION_MS_CONFIG
   val MinCompactionLagMsProp = TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG
+  val MaxCompactionLagMsProp = TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG
   val FileDeleteDelayMsProp = TopicConfig.FILE_DELETE_DELAY_MS_CONFIG
   val MinCleanableDirtyRatioProp = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
   val CleanupPolicyProp = TopicConfig.CLEANUP_POLICY_CONFIG
@@ -152,6 +160,7 @@ object LogConfig {
   val FileDeleteDelayMsDoc = TopicConfig.FILE_DELETE_DELAY_MS_DOC
   val DeleteRetentionMsDoc = TopicConfig.DELETE_RETENTION_MS_DOC
   val MinCompactionLagMsDoc = TopicConfig.MIN_COMPACTION_LAG_MS_DOC
+  val MaxCompactionLagMsDoc = TopicConfig.MAX_COMPACTION_LAG_MS_DOC
   val MinCleanableRatioDoc = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC
   val CompactDoc = TopicConfig.CLEANUP_POLICY_DOC
   val UncleanLeaderElectionEnableDoc = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC
@@ -242,6 +251,8 @@ object LogConfig {
         DeleteRetentionMsDoc, KafkaConfig.LogCleanerDeleteRetentionMsProp)
       .define(MinCompactionLagMsProp, LONG, Defaults.MinCompactionLagMs, atLeast(0), MEDIUM, MinCompactionLagMsDoc,
         KafkaConfig.LogCleanerMinCompactionLagMsProp)
+      .define(MaxCompactionLagMsProp, LONG, Defaults.MaxCompactionLagMs, atLeast(1), MEDIUM, MaxCompactionLagMsDoc,
+        KafkaConfig.LogCleanerMaxCompactionLagMsProp)
       .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc,
         KafkaConfig.LogDeleteDelayMsProp)
       .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
@@ -299,12 +310,22 @@ object LogConfig {
 
   private[kafka] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala
 
+  def validateValues(props: java.util.Map[_, _]): Unit = {
+    val minCompactionLag =  props.get(MinCompactionLagMsProp).asInstanceOf[Long]
+    val maxCompactionLag =  props.get(MaxCompactionLagMsProp).asInstanceOf[Long]
+    if (minCompactionLag > maxCompactionLag) {
+      throw new InvalidConfigurationException(s"conflict topic config setting $MinCompactionLagMsProp " +
+        s"($minCompactionLag) > $MaxCompactionLagMsProp ($maxCompactionLag)")
+    }
+  }
+
   /**
    * Check that the given properties contain only valid log config names and that all values can be parsed and are valid
    */
   def validate(props: Properties) {
     validateNames(props)
-    configDef.parse(props)
+    val valueMaps = configDef.parse(props)
+    validateValues(valueMaps)
   }
 
   /**
@@ -324,6 +345,7 @@ object LogConfig {
     IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp,
     DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp,
     MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp,
+    MaxCompactionLagMsProp -> KafkaConfig.LogCleanerMaxCompactionLagMsProp,
     FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp,
     MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp,
     CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index cae47f7..5a16193 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -994,6 +994,8 @@ object LogManager {
             brokerTopicStats: BrokerTopicStats,
             logDirFailureChannel: LogDirFailureChannel): LogManager = {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
+
+    LogConfig.validateValues(defaultProps)
     val defaultLogConfig = LogConfig(defaultProps)
 
     // read the log configurations from zookeeper
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 624f3ea..ecd85f9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -95,8 +95,9 @@ class LogSegment private[log] (val log: FileRecords,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
 
-  /* The timestamp we used for time based log rolling */
-  private var rollingBasedTimestamp: Option[Long] = None
+  // The timestamp we used for time based log rolling and for ensuring max compaction delay
+  // volatile for LogCleaner to see the update
+  @volatile private var rollingBasedTimestamp: Option[Long] = None
 
   /* The maximum timestamp we see so far */
   @volatile private var _maxTimestampSoFar: Option[Long] = None
@@ -523,6 +524,18 @@ class LogSegment private[log] (val log: FileRecords,
   }
 
   /**
+    * If not previously loaded,
+    * load the timestamp of the first message into memory.
+    */
+  private def loadFirstBatchTimestamp(): Unit = {
+    if (rollingBasedTimestamp.isEmpty) {
+      val iter = log.batches.iterator()
+      if (iter.hasNext)
+        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
+    }
+  }
+
+  /**
    * The time this segment has waited to be rolled.
    * If the first message batch has a timestamp we use its timestamp to determine when to roll a segment. A segment
    * is rolled if the difference between the new batch's timestamp and the first batch's timestamp exceeds the
@@ -533,11 +546,7 @@ class LogSegment private[log] (val log: FileRecords,
    */
   def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
-    if (rollingBasedTimestamp.isEmpty) {
-      val iter = log.batches.iterator()
-      if (iter.hasNext)
-        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
-    }
+    loadFirstBatchTimestamp()
     rollingBasedTimestamp match {
       case Some(t) if t >= 0 => messageTimestamp - t
       case _ => now - created
@@ -545,6 +554,17 @@ class LogSegment private[log] (val log: FileRecords,
   }
 
   /**
+    * @return the first batch timestamp if the timestamp is available. Otherwise return Long.MaxValue
+    */
+  def getFirstBatchTimestamp() : Long = {
+    loadFirstBatchTimestamp()
+    rollingBasedTimestamp match {
+      case Some(t) if t >= 0 => t
+      case _ => Long.MaxValue
+    }
+  }
+
+  /**
    * Search the message offset based on timestamp and offset.
    *
    * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 04f4f3b..526e174 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -103,6 +103,7 @@ object Defaults {
   val LogCleanerEnable = true
   val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
   val LogCleanerMinCompactionLagMs = 0L
+  val LogCleanerMaxCompactionLagMs = Long.MaxValue
   val LogIndexSizeMaxBytes = 10 * 1024 * 1024
   val LogIndexIntervalBytes = 4096
   val LogFlushIntervalMessages = Long.MaxValue
@@ -328,6 +329,7 @@ object KafkaConfig {
   val LogCleanerEnableProp = "log.cleaner.enable"
   val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
   val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
+  val LogCleanerMaxCompactionLagMsProp = "log.cleaner.max.compaction.lag.ms"
   val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
   val LogIndexIntervalBytesProp = "log.index.interval.bytes"
   val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
@@ -615,6 +617,7 @@ object KafkaConfig {
   val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
   val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
   val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
+  val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
   val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index"
   val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk "
@@ -917,6 +920,7 @@ object KafkaConfig {
       .define(LogCleanerEnableProp, BOOLEAN, Defaults.LogCleanerEnable, MEDIUM, LogCleanerEnableDoc)
       .define(LogCleanerDeleteRetentionMsProp, LONG, Defaults.LogCleanerDeleteRetentionMs, MEDIUM, LogCleanerDeleteRetentionMsDoc)
       .define(LogCleanerMinCompactionLagMsProp, LONG, Defaults.LogCleanerMinCompactionLagMs, MEDIUM, LogCleanerMinCompactionLagMsDoc)
+      .define(LogCleanerMaxCompactionLagMsProp, LONG, Defaults.LogCleanerMaxCompactionLagMs, MEDIUM, LogCleanerMaxCompactionLagMsDoc)
       .define(LogIndexSizeMaxBytesProp, INT, Defaults.LogIndexSizeMaxBytes, atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
       .define(LogIndexIntervalBytesProp, INT, Defaults.LogIndexIntervalBytes, atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
       .define(LogFlushIntervalMessagesProp, LONG, Defaults.LogFlushIntervalMessages, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
@@ -1202,6 +1206,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
   def logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
   def logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
+  def logCleanerMaxCompactionLagMs = getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
   val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
   def logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
   val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4a25811..07ffe9d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -70,6 +70,7 @@ object KafkaServer {
     logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
     logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
     logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs)
+    logProps.put(LogConfig.MaxCompactionLagMsProp, kafkaConfig.logCleanerMaxCompactionLagMs)
     logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
     logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
     logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index e778336..fe98ebf 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -43,9 +43,10 @@ abstract class AbstractLogCleanerIntegrationTest {
   private val logs = ListBuffer.empty[Log]
   private val defaultMaxMessageSize = 128
   private val defaultMinCleanableDirtyRatio = 0.0F
-  private val defaultCompactionLag = 0L
+  private val defaultMinCompactionLagMS = 0L
   private val defaultDeleteDelay = 1000
   private val defaultSegmentSize = 2048
+  private val defaultMaxCompactionLagMs = Long.MaxValue
 
   def time: MockTime
 
@@ -61,9 +62,10 @@ abstract class AbstractLogCleanerIntegrationTest {
   def logConfigProperties(propertyOverrides: Properties = new Properties(),
                           maxMessageSize: Int,
                           minCleanableDirtyRatio: Float = defaultMinCleanableDirtyRatio,
-                          compactionLag: Long = defaultCompactionLag,
+                          minCompactionLagMs: Long = defaultMinCompactionLagMS,
                           deleteDelay: Int = defaultDeleteDelay,
-                          segmentSize: Int = defaultSegmentSize): Properties = {
+                          segmentSize: Int = defaultSegmentSize,
+                          maxCompactionLagMs: Long = defaultMaxCompactionLagMs): Properties = {
     val props = new Properties()
     props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
     props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
@@ -72,7 +74,8 @@ abstract class AbstractLogCleanerIntegrationTest {
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
     props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
-    props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
+    props.put(LogConfig.MinCompactionLagMsProp, minCompactionLagMs: java.lang.Long)
+    props.put(LogConfig.MaxCompactionLagMsProp, maxCompactionLagMs: java.lang.Long)
     props ++= propertyOverrides
     props
   }
@@ -82,9 +85,10 @@ abstract class AbstractLogCleanerIntegrationTest {
                   numThreads: Int = 1,
                   backOffMs: Long = 15000L,
                   maxMessageSize: Int = defaultMaxMessageSize,
-                  compactionLag: Long = defaultCompactionLag,
+                  minCompactionLagMs: Long = defaultMinCompactionLagMS,
                   deleteDelay: Int = defaultDeleteDelay,
                   segmentSize: Int = defaultSegmentSize,
+                  maxCompactionLagMs: Long = defaultMaxCompactionLagMs,
                   cleanerIoBufferSize: Option[Int] = None,
                   propertyOverrides: Properties = new Properties()): LogCleaner = {
 
@@ -96,9 +100,10 @@ abstract class AbstractLogCleanerIntegrationTest {
       val logConfig = LogConfig(logConfigProperties(propertyOverrides,
         maxMessageSize = maxMessageSize,
         minCleanableDirtyRatio = minCleanableDirtyRatio,
-        compactionLag = compactionLag,
+        minCompactionLagMs = minCompactionLagMs,
         deleteDelay = deleteDelay,
-        segmentSize = segmentSize))
+        segmentSize = segmentSize,
+        maxCompactionLagMs = maxCompactionLagMs))
       val log = Log(dir,
         logConfig,
         logStartOffset = 0L,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index bfee811..2d342fa 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -25,9 +25,10 @@ import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
 import org.apache.kafka.common.record.{CompressionType, RecordBatch}
-import org.junit.Assert.{assertFalse, assertTrue, fail}
+import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.{Iterable, JavaConverters, Seq}
 import scala.collection.JavaConverters.mapAsScalaMapConverter
 
 /**
@@ -93,4 +94,95 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
     assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
     assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
   }
+
+  @Test
+  def testMaxLogCompactionLag(): Unit = {
+    val msPerHour = 60 * 60 * 1000
+
+    val minCompactionLagMs = 1 * msPerHour
+    val maxCompactionLagMs = 6 * msPerHour
+
+    val cleanerBackOffMs = 200L
+    val segmentSize = 512
+    val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
+    val minCleanableDirtyRatio = 1.0F
+
+    cleaner = makeCleaner(partitions = topicPartitions,
+      backOffMs = cleanerBackOffMs,
+      minCompactionLagMs = minCompactionLagMs,
+      segmentSize = segmentSize,
+      maxCompactionLagMs= maxCompactionLagMs,
+      minCleanableDirtyRatio = minCleanableDirtyRatio)
+    val log = cleaner.logs.get(topicPartitions(0))
+
+    val T0 = time.milliseconds
+    writeKeyDups(numKeys = 100, numDups = 3, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 1)
+
+    val startSizeBlock0 = log.size
+
+    val activeSegAtT0 = log.activeSegment
+
+    cleaner.startup()
+
+    // advance to a time still less than maxCompactionLagMs from start
+    time.sleep(maxCompactionLagMs/2)
+    Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
+    assertEquals("There should be no cleaning until the max compaction lag has passed", startSizeBlock0, log.size)
+
+    // advance to time a bit more than one maxCompactionLagMs from start
+    time.sleep(maxCompactionLagMs/2 + 1)
+    val T1 = time.milliseconds
+
+    // write the second block of data: all zero keys
+    val appends1 = writeKeyDups(numKeys = 100, numDups = 1, log, CompressionType.NONE, timestamp = T1, startValue = 0, step = 0)
+
+    // roll the active segment
+    log.roll()
+    val activeSegAtT1 = log.activeSegment
+    val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset
+
+    // the first block should get cleaned
+    cleaner.awaitCleaned(new TopicPartition("log", 0), firstBlockCleanableSegmentOffset)
+
+    val read1 = readFromLog(log)
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
+    assertTrue(s"log cleaner should have processed at least to offset $firstBlockCleanableSegmentOffset, " +
+      s"but lastCleaned=$lastCleaned", lastCleaned >= firstBlockCleanableSegmentOffset)
+
+    //minCleanableDirtyRatio  will prevent second block of data from compacting
+    assertNotEquals(s"log should still contain non-zero keys", appends1, read1)
+
+    time.sleep(maxCompactionLagMs + 1)
+    // the second block should get cleaned. only zero keys left
+    cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT1.baseOffset)
+
+    val read2 = readFromLog(log)
+
+    assertEquals(s"log should only contains zero keys now", appends1, read2)
+
+    val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
+    val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset
+    assertTrue(s"log cleaner should have processed at least to offset $secondBlockCleanableSegmentOffset, " +
+      s"but lastCleaned=$lastCleaned2", lastCleaned2 >= secondBlockCleanableSegmentOffset)
+  }
+
+  private def readFromLog(log: Log): Iterable[(Int, Int)] = {
+    import JavaConverters._
+    for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
+      val key = TestUtils.readString(record.key).toInt
+      val value = TestUtils.readString(record.value).toInt
+      key -> value
+    }
+  }
+
+  private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
+    var valCounter = startValue
+    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
+      val curValue = valCounter
+      log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec,
+        key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+      valCounter += step
+      (key, curValue)
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 6e8c9b9..0232e57 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -35,8 +35,8 @@ import scala.collection._
 class LogCleanerLagIntegrationTest(compressionCodecName: String) extends AbstractLogCleanerIntegrationTest with Logging {
   val msPerHour = 60 * 60 * 1000
 
-  val compactionLag = 1 * msPerHour
-  assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0)
+  val minCompactionLag = 1 * msPerHour
+  assertTrue("compactionLag must be divisible by 2 for this test", minCompactionLag % 2 == 0)
 
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val cleanerBackOffMs = 200L
@@ -50,7 +50,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
   def cleanerTest(): Unit = {
     cleaner = makeCleaner(partitions = topicPartitions,
       backOffMs = cleanerBackOffMs,
-      compactionLag = compactionLag,
+      minCompactionLagMs = minCompactionLag,
       segmentSize = segmentSize)
     val log = cleaner.logs.get(topicPartitions(0))
 
@@ -69,13 +69,13 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
 
     // T0 < t < T1
     // advance to a time still less than one compaction lag from start
-    time.sleep(compactionLag/2)
+    time.sleep(minCompactionLag/2)
     Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
     assertEquals("There should be no cleaning until the compaction lag has passed", startSizeBlock0, log.size)
 
     // t = T1 > T0 + compactionLag
     // advance to time a bit more than one compaction lag from start
-    time.sleep(compactionLag/2 + 1)
+    time.sleep(minCompactionLag/2 + 1)
     val T1 = time.milliseconds
 
     // write another block of data
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 66702d6..2cd7904 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -28,11 +28,11 @@ import org.scalatest.Assertions._
 
 class LogConfigTest {
 
-  /** 
-   * This test verifies that KafkaConfig object initialization does not depend on 
-   * LogConfig initialization. Bad things happen due to static initialization 
-   * order dependencies. For example, LogConfig.configDef ends up adding null 
-   * values in serverDefaultConfigNames. This test ensures that the mapping of 
+  /**
+   * This test verifies that KafkaConfig object initialization does not depend on
+   * LogConfig initialization. Bad things happen due to static initialization
+   * order dependencies. For example, LogConfig.configDef ends up adding null
+   * values in serverDefaultConfigNames. This test ensures that the mapping of
    * keys from LogConfig to KafkaConfig are not missing values.
    */
   @Test
@@ -82,6 +82,16 @@ class LogConfigTest {
   }
 
   @Test
+  def testInvalidCompactionLagConfig(): Unit = {
+    val props = new Properties
+    props.setProperty(LogConfig.MaxCompactionLagMsProp, "100")
+    props.setProperty(LogConfig.MinCompactionLagMsProp, "200")
+    intercept[Exception] {
+      LogConfig.validate(props)
+    }
+  }
+
+  @Test
   def shouldValidateThrottledReplicasConfig() {
     assertTrue(isValid("*"))
     assertTrue(isValid("* "))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1d7e687..9d28c1b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -635,6 +635,7 @@ class KafkaConfigTest {
         case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
         case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
         case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")