You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/09/21 20:31:55 UTC

[kafka] branch trunk updated: KAFKA-7400: Compacted topic segments that precede the log start offse… (#5646)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0bc7008  KAFKA-7400: Compacted topic segments that precede the log start offse… (#5646)
0bc7008 is described below

commit 0bc7008e7559337b588c60366d1b935eace59ffc
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Fri Sep 21 13:31:45 2018 -0700

    KAFKA-7400: Compacted topic segments that precede the log start offse… (#5646)
    
    * KAFKA-7400: Compacted topic segments that precede the log start offset are not cleaned up
    
    Currently we don't delete any log segments if the cleanup policy doesn't include delete. This patch changes the behavior to delete log segments that fully precede the log start offset even when deletion is not enabled. Tested with unit tests to verify that LogManager.cleanupLogs now cleans logs with cleanup.policy=compact and that Log.deleteOldSegments deletes segments that preced the start offset regardless of the cleanup policy.
    
    Reviewers: Dhruvil Shah <dh...@confluent.io>, Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 13 +++++++----
 .../main/scala/kafka/log/LogCleanerManager.scala   |  5 +++--
 .../unit/kafka/log/LogCleanerManagerTest.scala     |  8 +++----
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 17 ++++++++++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 26 ++++++++++++++++++++++
 5 files changed, 57 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index afe151d..c9b877b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1353,12 +1353,17 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Delete any log segments that have either expired due to time based retention
-   * or because the log size is > retentionSize
+   * If topic deletion is enabled, delete any log segments that have either expired due to time based retention
+   * or because the log size is > retentionSize.
+   *
+   * Whether or not deletion is enabled, delete any log segments that are before the log start offset
    */
   def deleteOldSegments(): Int = {
-    if (!config.delete) return 0
-    deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
+    if (config.delete) {
+      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
+    } else {
+      deleteLogStartOffsetBreachedSegments()
+    }
   }
 
   private def deleteRetentionMsBreachedSegments(): Int = {
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 83d902f..680fa94 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -171,12 +171,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-    * Find any logs that have compact and delete enabled
+    * Find any logs that have compaction enabled. Include logs without delete enabled, as they may have segments
+    * that precede the start offset.
     */
   def deletableLogs(): Iterable[(TopicPartition, Log)] = {
     inLock(lock) {
       val toClean = logs.filter { case (topicPartition, log) =>
-        !inProgress.contains(topicPartition) && isCompactAndDelete(log)
+        !inProgress.contains(topicPartition) && log.config.compact
       }
       toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
       toClean
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 8cb2f9e..3653e28 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -77,17 +77,17 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   }
 
   /**
-    * When looking for logs with segments ready to be deleted we shouldn't consider
-    * logs with cleanup.policy=compact as they shouldn't have segments truncated.
+    * When looking for logs with segments ready to be deleted we should consider
+    * logs with cleanup.policy=compact because they may have segments from before the log start offset
     */
   @Test
-  def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): Unit = {
+  def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
     val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
-    assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete)
+    assertEquals("should have 1 logs ready to be deleted", 1, readyToDelete)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 38d6f71..ae8bc01 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -190,13 +190,26 @@ class LogManagerTest {
   }
 
   /**
-    * Ensures that LogManager only runs on logs with cleanup.policy=delete
+    * Ensures that LogManager doesn't run on logs with cleanup.policy=compact,delete
     * LogCleaner.CleanerThread handles all logs where compaction is enabled.
     */
   @Test
   def testDoesntCleanLogsWithCompactDeletePolicy() {
+    testDoesntCleanLogs(LogConfig.Compact + "," + LogConfig.Delete)
+  }
+
+  /**
+    * Ensures that LogManager doesn't run on logs with cleanup.policy=compact
+    * LogCleaner.CleanerThread handles all logs where compaction is enabled.
+    */
+  @Test
+  def testDoesntCleanLogsWithCompactPolicy() {
+    testDoesntCleanLogs(LogConfig.Compact)
+  }
+
+  private def testDoesntCleanLogs(policy: String) {
     val logProps = new Properties()
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
+    logProps.put(LogConfig.CleanupPolicyProp, policy)
     val log = logManager.getOrCreateLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
     var offset = 0L
     for (_ <- 0 until 200) {
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9a9bc61..1381bc6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2747,6 +2747,32 @@ class LogTest {
   }
 
   @Test
+  def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
+    def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
+    val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
+
+    // Create log with start offset ahead of the first log segment
+    val log = createLog(logDir, logConfig, brokerTopicStats, logStartOffset = 5L)
+
+    // append some messages to create some segments
+    for (_ <- 0 until 15)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+
+    // Three segments should be created, with the first one entirely preceding the log start offset
+    assertEquals(3, log.logSegments.count(_ => true))
+    assertTrue(log.logSegments.slice(1, 2).head.baseOffset <= log.logStartOffset)
+
+    // The first segment, which is entirely before the log start offset, should be deleted
+    // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset
+    // greater than the start offset
+    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.deleteOldSegments()
+    assertEquals("There should be 2 segments remaining", 2, log.numberOfSegments)
+    assertTrue(log.logSegments.head.baseOffset <= log.logStartOffset)
+    assertTrue(log.logSegments.tail.forall(s => s.baseOffset > log.logStartOffset))
+  }
+
+  @Test
   def shouldApplyEpochToMessageOnAppendIfLeader() {
     val records = (0 until 50).toArray.map(id => new SimpleRecord(id.toString.getBytes))