You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/08/25 15:16:01 UTC

kafka git commit: KAFKA-4015; Change cleanup.policy config to accept a list of valid policies

Repository: kafka
Updated Branches:
  refs/heads/trunk 151288fe1 -> 36044fdc5


KAFKA-4015; Change cleanup.policy config to accept a list of valid policies

Change cleanup.policy to accept a comma separated list of valid policies.
Updated LogCleaner.CleanerThread to also run deletion for any topics configured with compact,delete.
Ensure Log.deleteSegments only runs when delete is enabled.
Additional Integration and unit tests to cover new option

Author: Damian Guy <da...@gmail.com>

Reviewers: Grant Henke <gr...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #1742 from dguy/kafka-4015


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36044fdc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36044fdc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36044fdc

Branch: refs/heads/trunk
Commit: 36044fdc544f1b6b4829558691e0e7d280abce99
Parents: 151288f
Author: Damian Guy <da...@gmail.com>
Authored: Thu Aug 25 08:15:55 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Aug 25 08:15:55 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/config/ConfigDef.java   |  25 ++++
 .../kafka/common/config/ConfigDefTest.java      |   1 +
 core/src/main/scala/kafka/log/Log.scala         |  50 ++++++-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  18 ++-
 .../scala/kafka/log/LogCleanerManager.scala     |  38 +++++-
 core/src/main/scala/kafka/log/LogConfig.scala   |   8 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  40 +-----
 .../main/scala/kafka/server/KafkaConfig.scala   |   7 +-
 .../test/scala/unit/kafka/KafkaConfigTest.scala |   9 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |   6 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  56 +++++++-
 .../unit/kafka/log/LogCleanerManagerTest.scala  | 117 +++++++++++++++++
 .../scala/unit/kafka/log/LogManagerTest.scala   |  25 ++++
 .../src/test/scala/unit/kafka/log/LogTest.scala | 130 +++++++++++++++++--
 14 files changed, 452 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 073f474..7174736 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -808,6 +808,31 @@ public class ConfigDef {
         }
     }
 
+    public static class ValidList implements Validator {
+
+        ValidString validString;
+
+        private ValidList(List<String> validStrings) {
+            this.validString = new ValidString(validStrings);
+        }
+
+        public static ValidList in(String... validStrings) {
+            return new ValidList(Arrays.asList(validStrings));
+        }
+
+        @Override
+        public void ensureValid(final String name, final Object value) {
+            List<String> values = (List<String>) value;
+            for (String string : values) {
+                validString.ensureValid(name, string);
+            }
+        }
+
+        public String toString() {
+            return validString.toString();
+        }
+    }
+
     public static class ValidString implements Validator {
         List<String> validStrings;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 0ed0f1a..0b781fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -135,6 +135,7 @@ public class ConfigDefTest {
         testValidators(Type.INT, Range.between(0, 10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11, null});
         testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
                 new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs", null});
+        testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b4aa470..d343d6f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -656,14 +656,9 @@ class Log(val dir: File,
    * @param predicate A function that takes in a single log segment and returns true iff it is deletable
    * @return The number of segments deleted
    */
-  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
+  private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
     lock synchronized {
-      //find any segments that match the user-supplied predicate UNLESS it is the final segment
-      //and it is empty (since we would just end up re-creating it)
-      val lastEntry = segments.lastEntry
-      val deletable =
-        if (lastEntry == null) Seq.empty
-        else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
+      val deletable = deletableSegments(predicate)
       val numToDelete = deletable.size
       if (numToDelete > 0) {
         // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
@@ -677,6 +672,47 @@ class Log(val dir: File,
   }
 
   /**
+    * Find segments starting from the oldest until the the user-supplied predicate is false.
+    * A final segment that is empty will never be returned (since we would just end up re-creating it).
+    * @param predicate A function that takes in a single log segment and returns true iff it is deletable
+    * @return the segments ready to be deleted
+    */
+  private def deletableSegments(predicate: LogSegment => Boolean) = {
+    val lastEntry = segments.lastEntry
+    if (lastEntry == null) Seq.empty
+    else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
+  }
+
+  /**
+    * Delete any log segments that have either expired due to time based retention
+    * or because the log size is > retentionSize
+    */
+  def deleteOldSegments(): Int = {
+    if (!config.delete) return 0
+    deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
+  }
+
+  private def deleteRetenionMsBreachedSegments() : Int = {
+    if (config.retentionMs < 0) return 0
+    val startMs = time.milliseconds
+    deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
+  }
+
+  private def deleteRetentionSizeBreachedSegments() : Int = {
+    if (config.retentionSize < 0 || size < config.retentionSize) return 0
+    var diff = size - config.retentionSize
+    def shouldDelete(segment: LogSegment) = {
+      if (diff - segment.size >= 0) {
+        diff -= segment.size
+        true
+      } else {
+        false
+      }
+    }
+    deleteOldSegments(shouldDelete)
+  }
+
+  /**
    * The size of the log in bytes
    */
   def size: Long = logSegments.map(_.size).sum

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index ef880e6..5e3e662 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -28,6 +28,7 @@ import kafka.message._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 
+import scala.Iterable
 import scala.collection._
 
 /**
@@ -226,10 +227,9 @@ class LogCleaner(val config: CleanerConfig,
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
     private def cleanOrSleep() {
-      cleanerManager.grabFilthiestLog() match {
+      val cleaned = cleanerManager.grabFilthiestCompactedLog() match {
         case None =>
-          // there are no cleanable logs, sleep a while
-          backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
+          false
         case Some(cleanable) =>
           // there's a log, clean it
           var endOffset = cleanable.firstDirtyOffset
@@ -241,7 +241,19 @@ class LogCleaner(val config: CleanerConfig,
           } finally {
             cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
           }
+          true
       }
+      val deletable: Iterable[(TopicAndPartition, Log)] = cleanerManager.deletableLogs()
+      deletable.foreach{
+        case (topicPartition, log) =>
+          try {
+            log.deleteOldSegments()
+          } finally {
+            cleanerManager.doneDeleting(topicPartition)
+          }
+      }
+      if (!cleaned)
+        backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 60bcba6..c1068f8 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -42,7 +42,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState
  *  requested to be resumed.
  */
 private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
-  
+
   override val loggerName = classOf[LogCleaner].getName
 
   // package-private for testing
@@ -75,13 +75,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
     */
-  def grabFilthiestLog(): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(): Option[LogToClean] = {
     inLock(lock) {
       val lastClean = allCleanerCheckpoints()
       val dirtyLogs = logs.filter {
-        case (topicAndPartition, log) => log.config.compact  // skip any logs marked for delete rather than dedupe
+        case (_, log) => log.config.compact  // match logs that are marked as compacted
       }.filterNot {
-        case (topicAndPartition, log) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
+        case (topicAndPartition, _) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
       }.map {
         case (topicAndPartition, log) => // create a LogToClean instance for each
           // if the log segments are abnormally truncated and hence the checkpointed offset
@@ -90,7 +90,9 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
           val firstDirtyOffset = {
             val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
             if (offset < logStartOffset) {
-              warn("Resetting first dirty offset to log start offset %d since the checkpointed offset %d is invalid."
+              // don't bother with the warning if compact and delete are enabled.
+              if (!isCompactAndDelete(log))
+                warn("Resetting first dirty offset to log start offset %d since the checkpointed offset %d is invalid."
                     .format(logStartOffset, offset))
               logStartOffset
             } else {
@@ -114,6 +116,26 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   }
 
   /**
+    * Find any logs that have compact and delete enabled
+    */
+  def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
+    inLock(lock) {
+      val toClean = logs.filterNot {
+        case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
+      }.filter {
+        case (topicAndPartition, log) => isCompactAndDelete(log)
+      }
+      toClean.foreach{x => inProgress.put(x._1, LogCleaningInProgress)}
+      toClean
+    }
+
+  }
+
+  def isCompactAndDelete(log: Log): Boolean = {
+    log.config.compact && log.config.delete
+  }
+
+  /**
    *  Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
    *  the partition is aborted.
    *  This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
@@ -239,4 +261,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
       }
     }
   }
+
+  def doneDeleting(topicAndPartition: TopicAndPartition): Unit = {
+    inLock(lock) {
+      inProgress.remove(topicAndPartition)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 1b1ecb7..a01ecc4 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -20,7 +20,6 @@ package kafka.log
 import java.util.Properties
 
 import scala.collection.JavaConverters._
-
 import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
 import kafka.server.KafkaConfig
@@ -31,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
 import java.util.Locale
 
 import scala.collection.mutable
-import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
 
 object Defaults {
   val SegmentSize = kafka.server.Defaults.LogSegmentBytes
@@ -75,7 +74,8 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
   val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
   val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
   val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
-  val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase(Locale.ROOT) != LogConfig.Delete
+  val compact = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Compact)
+  val delete = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Delete)
   val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
   val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
   val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase(Locale.ROOT)
@@ -257,7 +257,7 @@ object LogConfig {
         KafkaConfig.LogDeleteDelayMsProp)
       .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
         MinCleanableRatioDoc, KafkaConfig.LogCleanerMinCleanRatioProp)
-      .define(CleanupPolicyProp, STRING, Defaults.Compact, in(Compact, Delete), MEDIUM, CompactDoc,
+      .define(CleanupPolicyProp, LIST, Defaults.Compact, ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc,
         KafkaConfig.LogCleanupPolicyProp)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
         MEDIUM, UncleanLeaderElectionEnableDoc, KafkaConfig.UncleanLeaderElectionEnableProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index e6c60b9..7806eda 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -187,10 +187,10 @@ class LogManager(val logDirs: Array[File],
     /* Schedule the cleanup task to delete old logs */
     if(scheduler != null) {
       info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
-      scheduler.schedule("kafka-log-retention", 
-                         cleanupLogs, 
-                         delay = InitialTaskDelayMs, 
-                         period = retentionCheckMs, 
+      scheduler.schedule("kafka-log-retention",
+                         cleanupLogs,
+                         delay = InitialTaskDelayMs,
+                         period = retentionCheckMs,
                          TimeUnit.MILLISECONDS)
       info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
       scheduler.schedule("kafka-log-flusher", 
@@ -417,36 +417,8 @@ class LogManager(val logDirs: Array[File],
   }
 
   /**
-   * Runs through the log removing segments older than a certain age
-   */
-  private def cleanupExpiredSegments(log: Log): Int = {
-    if (log.config.retentionMs < 0)
-      return 0
-    val startMs = time.milliseconds
-    log.deleteOldSegments(startMs - _.largestTimestamp > log.config.retentionMs)
-  }
-
-  /**
-   *  Runs through the log removing segments until the size of the log
-   *  is at least logRetentionSize bytes in size
-   */
-  private def cleanupSegmentsToMaintainSize(log: Log): Int = {
-    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
-      return 0
-    var diff = log.size - log.config.retentionSize
-    def shouldDelete(segment: LogSegment) = {
-      if(diff - segment.size >= 0) {
-        diff -= segment.size
-        true
-      } else {
-        false
-      }
-    }
-    log.deleteOldSegments(shouldDelete)
-  }
-
-  /**
    * Delete any eligible logs. Return the number of segments deleted.
+   * Only consider logs that are not compacted.
    */
   def cleanupLogs() {
     debug("Beginning log cleanup...")
@@ -454,7 +426,7 @@ class LogManager(val logDirs: Array[File],
     val startMs = time.milliseconds
     for(log <- allLogs; if !log.config.compact) {
       debug("Garbage collecting '" + log.name + "'")
-      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
+      total += log.deleteOldSegments()
     }
     debug("Log cleanup completed. " + total + " files deleted in " +
                   (time.milliseconds - startMs) / 1000 + " seconds")

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b31f596..42ae8e5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -27,6 +27,7 @@ import kafka.log.LogConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.ConfigDef.ValidList
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
 import org.apache.kafka.common.metrics.MetricsReporter
@@ -422,7 +423,7 @@ object KafkaConfig {
 
   val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
   val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion"
-  val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond the retention window, must be either \"delete\" or \"compact\""
+  val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: \"delete\" and \"compact\""
   val LogCleanerThreadsDoc = "The number of background threads to use for log cleaning"
   val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average"
   val LogCleanerDedupeBufferSizeDoc = "The total memory used for log deduplication across all cleaner threads"
@@ -622,7 +623,7 @@ object KafkaConfig {
 
       .define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc)
       .define(LogCleanupIntervalMsProp, LONG, Defaults.LogCleanupIntervalMs, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc)
-      .define(LogCleanupPolicyProp, STRING, Defaults.LogCleanupPolicy, in(Defaults.Compact, Defaults.Delete), MEDIUM, LogCleanupPolicyDoc)
+      .define(LogCleanupPolicyProp, LIST, Defaults.LogCleanupPolicy, ValidList.in(Defaults.Compact, Defaults.Delete), MEDIUM, LogCleanupPolicyDoc)
       .define(LogCleanerThreadsProp, INT, Defaults.LogCleanerThreads, atLeast(0), MEDIUM, LogCleanerThreadsDoc)
       .define(LogCleanerIoMaxBytesPerSecondProp, DOUBLE, Defaults.LogCleanerIoMaxBytesPerSecond, MEDIUM, LogCleanerIoMaxBytesPerSecondDoc)
       .define(LogCleanerDedupeBufferSizeProp, LONG, Defaults.LogCleanerDedupeBufferSize, MEDIUM, LogCleanerDedupeBufferSizeDoc)
@@ -823,7 +824,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
   val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
   val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
-  val logCleanupPolicy = getString(KafkaConfig.LogCleanupPolicyProp)
+  val logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
   val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
   val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
   val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 806c704..5942104 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -16,8 +16,9 @@
  */
 package kafka
 
-import java.io.{FileOutputStream, File}
+import java.io.{File, FileOutputStream}
 import java.security.Permission
+import java.util
 
 import kafka.server.KafkaConfig
 import org.apache.kafka.common.config.SslConfigs
@@ -69,12 +70,12 @@ class KafkaTest {
     // We should be also able to set completely new property
     val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")))
     assertEquals(1, config3.brokerId)
-    assertEquals("compact", config3.logCleanupPolicy)
+    assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)
 
     // We should be also able to set several properties
-    val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact", "--override", "broker.id=2")))
+    val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "broker.id=2")))
     assertEquals(2, config4.brokerId)
-    assertEquals("compact", config4.logCleanupPolicy)
+    assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
   }
 
   @Test(expected = classOf[ExitCalled])

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 5b0ce9a..4f116ab 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -68,18 +68,18 @@ class CleanerTest extends JUnitSuite {
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
     val keysFound = keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
-    
+
     // pretend we have the following keys
     val keys = immutable.ListSet(1, 3, 5, 7, 9)
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
-    
+
     // clean the log
     cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L)
     val shouldRemain = keysInLog(log).filter(!keys.contains(_))
     assertEquals(shouldRemain, keysInLog(log))
   }
-  
+
   @Test
   def testCleaningWithDeletes() {
     val cleaner = makeCleaner(Int.MaxValue)

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 825a55b..0449be5 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -33,6 +33,7 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
 
+import scala.Seq
 import scala.collection._
 import scala.util.Random
 
@@ -90,7 +91,47 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     assertFalse(checkpoints.contains(topics(0)))
   }
 
-  // returns (value, ByteBufferMessageSet)
+  @Test
+  def testCleansCombinedCompactAndDeleteTopic() {
+    val logProps  = new Properties()
+    val retentionMs: Integer = 100000
+    logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
+
+    def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String)]) = {
+      cleaner = makeCleaner(parts = 1, propertyOverrides = logProps, logCleanerBackOffMillis = 100L)
+      val log = cleaner.logs.get(topics(0))
+
+      val messages: Seq[(Int, String)] = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
+      val startSize = log.size
+
+      val firstDirty = log.activeSegment.baseOffset
+      cleaner.startup()
+
+      // should compact the log
+      checkLastCleaned("log", 0, firstDirty)
+      val compactedSize = log.logSegments.map(_.size).sum
+      assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
+      (log, messages)
+    }
+
+
+    val (log, _) = runCleanerAndCheckCompacted(100)
+    // should delete old segments
+    log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs))
+
+    TestUtils.waitUntilTrue(() => log.numberOfSegments == 1, "There should only be 1 segment remaining", 10000L)
+    assertEquals(1, log.numberOfSegments)
+
+    cleaner.shutdown()
+
+    // run the cleaner again to make sure if there are no issues post deletion
+    val (log2, messages) = runCleanerAndCheckCompacted(20)
+    val read = readFromLog(log2)
+    assertEquals("Contents of the map shouldn't change", messages.toMap, read.toMap)
+  }
+
+  // returns (value, ByteBufferMessag eSet)
   private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, ByteBufferMessageSet) = {
     def messageValue(length: Int): String = {
       val random = new Random(0)
@@ -119,7 +160,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
 
     val log = cleaner.logs.get(topics(0))
-    val props = logConfigProperties(maxMessageSize)
+    val props = logConfigProperties(maxMessageSize = maxMessageSize)
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
     log.config = new LogConfig(props)
 
@@ -197,7 +238,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     Utils.delete(logDir)
   }
 
-  private def logConfigProperties(maxMessageSize: Int, minCleanableDirtyRatio: Float = 0.0F): Properties = {
+  private def logConfigProperties(propertyOverrides: Properties = new Properties(), maxMessageSize: Int, minCleanableDirtyRatio: Float = 0.0F): Properties = {
     val props = new Properties()
     props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
     props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
@@ -205,6 +246,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     props.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+    props.putAll(propertyOverrides)
     props
   }
   
@@ -213,8 +255,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
                           minCleanableDirtyRatio: Float = 0.0F,
                           numThreads: Int = 1,
                           maxMessageSize: Int = 128,
-                          defaultPolicy: String = "compact",
-                          policyOverrides: Map[String, String] = Map()): LogCleaner = {
+                          logCleanerBackOffMillis: Long = 15000L,
+                          propertyOverrides: Properties = new Properties()): LogCleaner = {
     
     // create partitions and add them to the pool
     val logs = new Pool[TopicAndPartition, Log]()
@@ -223,14 +265,14 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       dir.mkdirs()
 
       val log = new Log(dir = dir,
-                        LogConfig(logConfigProperties(maxMessageSize, minCleanableDirtyRatio)),
+                        LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)),
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)
       logs.put(TopicAndPartition("log", i), log)      
     }
   
-    new LogCleaner(CleanerConfig(numThreads = numThreads, ioBufferSize = maxMessageSize / 2, maxMessageSize = maxMessageSize),
+    new LogCleaner(CleanerConfig(numThreads = numThreads, ioBufferSize = maxMessageSize / 2, maxMessageSize = maxMessageSize, backOffMs = logCleanerBackOffMillis),
                    logDirs = Array(logDir),
                    logs = logs,
                    time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
new file mode 100644
index 0000000..5508d69
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -0,0 +1,117 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.log
+
+import java.io.File
+import java.util.Properties
+
+import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
+import kafka.utils.{MockTime, Pool, TestUtils}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Assert._
+import org.junit.{After, Test}
+import org.scalatest.junit.JUnitSuite
+
+class LogCleanerManagerTest extends JUnitSuite {
+
+  val tmpDir = TestUtils.tempDir()
+  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+  val time = new MockTime()
+
+  @After
+  def tearDown() {
+    Utils.delete(tmpDir)
+  }
+
+  /**
+    * When checking for logs with segments ready for deletion
+    * we shouldn't consider logs where cleanup.policy=delete
+    * as they are handled by the LogManager
+    */
+  @Test
+  def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs() {
+    val messageSet = TestUtils.singleMessageSet("test".getBytes)
+    val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    val readyToDelete = cleanerManager.deletableLogs().size
+    assertEquals("should have 0 logs ready to be deleted", 0, readyToDelete)
+  }
+
+  /**
+    * We should find logs with segments ready to be deleted when cleanup.policy=compact,delete
+    */
+  @Test
+  def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs(): Unit = {
+    val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    val readyToDelete = cleanerManager.deletableLogs().size
+    assertEquals("should have 1 logs ready to be deleted", 1, readyToDelete)
+  }
+
+  /**
+    * When looking for logs with segments ready to be deleted we shouldn't consider
+    * logs with cleanup.policy=compact as they shouldn't have segments truncated.
+    */
+  @Test
+  def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): Unit = {
+    val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    val readyToDelete = cleanerManager.deletableLogs().size
+    assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete)
+  }
+
+
+  def createCleanerManager(log: Log): LogCleanerManager = {
+    val logs = new Pool[TopicAndPartition, Log]()
+    logs.put(TopicAndPartition("log", 0), log)
+    val cleanerManager = new LogCleanerManager(Array(logDir), logs)
+    cleanerManager
+  }
+
+  def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
+    // append some messages to create some segments
+    for (i <- 0 until 100)
+      log.append(set)
+
+    // expire all segments
+    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
+  }
+
+  def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
+    logProps.put(LogConfig.RetentionMsProp, 1: Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+
+    val config = LogConfig(logProps)
+    val partitionDir = new File(logDir, "log-0")
+    val log = new Log(partitionDir,
+      config,
+      recoveryPoint = 0L,
+      time.scheduler,
+      time)
+    log
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index dc4cc79..95a1cb5 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -159,6 +159,31 @@ class LogManagerTest {
   }
 
   /**
+    * Ensures that LogManager only runs on logs with cleanup.policy=delete
+    * LogCleaner.CleanerThread handles all logs where compaction is enabled.
+    */
+  @Test
+  def testDoesntCleanLogsWithCompactDeletePolicy() {
+    val logProps = new Properties()
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
+    val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
+    var offset = 0L
+    for(i <- 0 until 200) {
+      var set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
+      val info = log.append(set)
+      offset = info.lastOffset
+    }
+
+    val numSegments = log.numberOfSegments
+    assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
+
+    log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
+
+    time.sleep(maxLogAgeMs + 1)
+    assertEquals("number of segments shouldn't have changed", numSegments, log.numberOfSegments)
+  }
+
+  /**
    * Test that flush is invoked by the background scheduler thread.
    */
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/36044fdc/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2466ef2..7f6ef6e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -35,7 +35,7 @@ class LogTest extends JUnitSuite {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val time = new MockTime(100)
+  val time = new MockTime()
   var config: KafkaConfig = null
   val logConfig = LogConfig()
 
@@ -340,19 +340,20 @@ class LogTest extends JUnitSuite {
       // first test a log segment starting at 0
       val logProps = new Properties()
       logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+      logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
       val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
-        log.append(TestUtils.singleMessageSet(i.toString.getBytes))
+        log.append(TestUtils.singleMessageSet(payload = i.toString.getBytes, timestamp = time.milliseconds - 10))
 
       val currOffset = log.logEndOffset
       assertEquals(currOffset, messagesToAppend)
 
       // time goes by; the log file is deleted
-      log.deleteOldSegments(_ => true)
+      log.deleteOldSegments()
 
       assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("We should still have one segment left", 1, log.numberOfSegments)
-      assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
+      assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments())
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
                    currOffset,
@@ -816,6 +817,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
     logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
     val config = LogConfig(logProps)
 
     val log = new Log(logDir,
@@ -831,7 +833,10 @@ class LogTest extends JUnitSuite {
     // files should be renamed
     val segments = log.logSegments.toArray
     val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
-    log.deleteOldSegments((s) => true)
+    // expire all segments
+    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000L)
+
+    log.deleteOldSegments()
 
     assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
     assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
@@ -855,6 +860,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
     val config = LogConfig(logProps)
     var log = new Log(logDir,
                       config,
@@ -866,7 +872,9 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until 100)
       log.append(set)
 
-    log.deleteOldSegments((s) => true)
+    // expire all segments
+    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
+    log.deleteOldSegments()
     log.close()
 
     log = new Log(logDir,
@@ -1052,6 +1060,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
@@ -1063,7 +1072,9 @@ class LogTest extends JUnitSuite {
     for (i <- 0 until 100)
       log.append(set)
 
-    log.deleteOldSegments(_ => true)
+    // expire all segments
+    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
+    log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
 
     // append some messages to create some segments
@@ -1072,6 +1083,109 @@ class LogTest extends JUnitSuite {
 
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
-    assertEquals("The number of deleted segments shoud be zero.", 0, log.deleteOldSegments(_ => true))
+    assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
+  }
+
+
+  @Test
+  def shouldDeleteSizeBasedSegments() {
+    val set = TestUtils.singleMessageSet("test".getBytes)
+    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+
+    // append some messages to create some segments
+    for (i <- 0 until 15)
+      log.append(set)
+
+    log.deleteOldSegments
+    assertEquals("should have 2 segments", 2,log.numberOfSegments)
+  }
+
+  @Test
+  def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() {
+    val set = TestUtils.singleMessageSet("test".getBytes)
+    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
+
+    // append some messages to create some segments
+    for (i <- 0 until 15)
+      log.append(set)
+
+    log.deleteOldSegments
+    assertEquals("should have 3 segments", 3,log.numberOfSegments)
+  }
+
+  @Test
+  def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() {
+    val set = TestUtils.singleMessageSet("test".getBytes, timestamp = 10)
+    val log = createLog(set.sizeInBytes, retentionMs = 10000)
+
+    // append some messages to create some segments
+    for (i <- 0 until 15)
+      log.append(set)
+
+    log.deleteOldSegments()
+    assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
+  }
+
+  @Test
+  def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() {
+    val set = TestUtils.singleMessageSet("test".getBytes, timestamp = time.milliseconds)
+    val log = createLog(set.sizeInBytes, retentionMs = 10000000)
+
+    // append some messages to create some segments
+    for (i <- 0 until 15)
+      log.append(set)
+
+    log.deleteOldSegments()
+    assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
+  }
+
+  @Test
+  def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() {
+    val set = TestUtils.singleMessageSet("test".getBytes, key = "test".getBytes(), timestamp = 10L)
+    val log = createLog(set.sizeInBytes,
+      retentionMs = 10000,
+      cleanupPolicy = "compact")
+
+    // append some messages to create some segments
+    for (i <- 0 until 15)
+      log.append(set)
+
+    // mark oldest segment as older the retention.ms
+    log.logSegments.head.lastModified = time.milliseconds - 20000
+
+    val segments = log.numberOfSegments
+    log.deleteOldSegments()
+    assertEquals("There should be 3 segments remaining", segments, log.numberOfSegments)
+  }
+
+  @Test
+  def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
+    val set = TestUtils.singleMessageSet("test".getBytes, key = "test".getBytes,timestamp = 10L)
+    val log = createLog(set.sizeInBytes,
+      retentionMs = 10000,
+      cleanupPolicy = "compact,delete")
+
+    // append some messages to create some segments
+    for (i <- 0 until 15)
+      log.append(set)
+
+    log.deleteOldSegments()
+    assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
+  }
+
+  def createLog(messageSizeInBytes: Int, retentionMs: Int = -1,
+                retentionBytes: Int = -1, cleanupPolicy: String = "delete"): Log = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, messageSizeInBytes * 5: Integer)
+    logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
+    logProps.put(LogConfig.RetentionBytesProp, retentionBytes: Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    val config = LogConfig(logProps)
+    val log = new Log(logDir,
+      config,
+      recoveryPoint = 0L,
+      time.scheduler,
+      time)
+    log
   }
 }