You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/08/13 22:39:09 UTC

[kafka] branch trunk updated: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None (#11199)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64b8e17  KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None (#11199)
64b8e17 is described below

commit 64b8e17827251174490678dd296ce2c1a79ff5ef
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Fri Aug 13 15:35:59 2021 -0700

    KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None (#11199)
    
    When the high watermark is contained in a non-active segment, we are not correctly bounding it by the hwm. This means that uncommitted records may overwrite committed data. I've separated out the bounding point tests to check the hwm case in addition to the existing active segment case.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../main/scala/kafka/log/LogCleanerManager.scala   | 11 ++++----
 .../log/AbstractLogCleanerIntegrationTest.scala    |  2 ++
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |  2 ++
 .../kafka/log/LogCleanerLagIntegrationTest.scala   |  2 ++
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 30 +++++++++++++++++++++-
 .../LogCleanerParameterizedIntegrationTest.scala   |  6 +++++
 6 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 7e14184..c0030b1 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -590,13 +590,14 @@ private[log] object LogCleanerManager extends Logging {
 
     val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
-    // find first segment that cannot be cleaned
-    // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
-    // may be cleaned
+    // Find the first segment that cannot be cleaned. We cannot clean past:
+    // 1. The active segment
+    // 2. The last stable offset (including the high watermark)
+    // 3. Any segments closer to the head of the log than the minimum compaction lag time
     val firstUncleanableDirtyOffset: Long = Seq(
 
-      // we do not clean beyond the first unstable offset
-      log.firstUnstableOffset,
+      // we do not clean beyond the last stable offset
+      Some(log.lastStableOffset),
 
       // the active segment is always uncleanable
       Option(log.activeSegment.baseOffset),
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 2fc7942..ba10336 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -139,6 +139,8 @@ abstract class AbstractLogCleanerIntegrationTest {
       val value = counter.toString
       val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
         key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       incCounter()
       (key, value, appendInfo.firstOffset.get.messageOffset)
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 16b219c..5a65fe3 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -192,6 +192,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
       val curValue = valCounter
       log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec,
         key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       valCounter += step
       (key, curValue)
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index c077542..7e0a33d 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -106,6 +106,8 @@ class LogCleanerLagIntegrationTest extends AbstractLogCleanerIntegrationTest wit
       val count = counter
       log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec,
               key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       incCounter()
       (key, count)
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 93d9713..e20e661 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -529,7 +529,7 @@ class LogCleanerManagerTest extends Logging {
   }
 
   /**
-    * Test computation of cleanable range with no minimum compaction lag settings active
+    * Test computation of cleanable range with no minimum compaction lag settings active where bounded by LSO
     */
   @Test
   def testCleanableOffsetsForNone(): Unit = {
@@ -541,6 +541,30 @@ class LogCleanerManagerTest extends Logging {
     while(log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
+    log.updateHighWatermark(50)
+
+    val lastCleanOffset = Some(0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
+    assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
+    assertEquals(log.highWatermark, log.lastStableOffset, "The high watermark equals the last stable offset as no transactions are in progress")
+    assertEquals(log.lastStableOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is bounded by the last stable offset.")
+  }
+
+  /**
+   * Test computation of cleanable range with no minimum compaction lag settings active where bounded by active segment
+   */
+  @Test
+  def testCleanableOffsetsActiveSegment(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    while(log.numberOfSegments < 8)
+      log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+
+    log.updateHighWatermark(log.logEndOffset)
+
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
     assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
@@ -571,6 +595,8 @@ class LogCleanerManagerTest extends Logging {
     while (log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0)
 
+    log.updateHighWatermark(log.logEndOffset)
+
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
     assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
@@ -594,6 +620,8 @@ class LogCleanerManagerTest extends Logging {
     while (log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0), leaderEpoch = 0)
 
+    log.updateHighWatermark(log.logEndOffset)
+
     time.sleep(compactionLag + 1)
 
     val lastCleanOffset = Some(0L)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 1471ff1..70ac47e 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -66,6 +66,8 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
     checkLogAfterAppendingDups(log, startSize, appends)
 
     val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
+    // move LSO forward to increase compaction bound
+    log.updateHighWatermark(log.logEndOffset)
     val largeMessageOffset = appendInfo.firstOffset.get.messageOffset
 
     val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
@@ -166,6 +168,8 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
     val appends2: Seq[(Int, String, Long)] = {
       val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
       val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       val largeMessageOffset = appendInfo.firstOffset.map(_.messageOffset).get
 
       // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
@@ -311,6 +315,8 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
     }
 
     val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
+    // move LSO forward to increase compaction bound
+    log.updateHighWatermark(log.logEndOffset)
     val offsets = appendInfo.firstOffset.get.messageOffset to appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }