You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/17 13:41:03 UTC

[kafka] branch 3.3 updated: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot (#12655)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 74c4bbfaf9 KAFKA-14238;  KRaft metadata log should not delete segment past the latest snapshot (#12655)
74c4bbfaf9 is described below

commit 74c4bbfaf9a7da16baf0219df2613901aa706639
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Sat Sep 17 06:30:50 2022 -0700

    KAFKA-14238;  KRaft metadata log should not delete segment past the latest snapshot (#12655)
    
    Disable segment deletion based on size and time by setting the KRaft metadata log's `RetentionMsProp` and `RetentionBytesProp` to `-1`. This will cause `UnifiedLog.deleteRetentionMsBreachedSegments` and `UnifiedLog.deleteRetentionSizeBreachedSegments` to short circuit instead of deleting segments.
    
    Without this changes the included test would fail. This happens because `deleteRetentionMsBreachedSegments` is able to delete past the `logStartOffset`. Deleting past the `logStartOffset` would violate the invariant that if the `logStartOffset` is greater than 0 then there is a snapshot with an end offset greater than or equal to the log start offset.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   | 26 ++++++++---
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    | 54 +++++++++++++++++++++-
 2 files changed, 72 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 83b8bee444..d112f3b581 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -382,7 +382,7 @@ final class KafkaMetadataLog private (
   }
 
   /**
-   * Perform cleaning of old snapshots and log segments based on size.
+   * Perform cleaning of old snapshots and log segments based on size and time.
    *
    * If our configured retention size has been violated, we perform cleaning as follows:
    *
@@ -556,15 +556,29 @@ object KafkaMetadataLog {
     config: MetadataLogConfig
   ): KafkaMetadataLog = {
     val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
-    props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
-    props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
-    props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
+    props.setProperty(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
+    props.setProperty(LogConfig.SegmentBytesProp, config.logSegmentBytes.toString)
+    props.setProperty(LogConfig.SegmentMsProp, config.logSegmentMillis.toString)
+    props.setProperty(LogConfig.FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString)
+
+    // Disable time and byte retention when deleting segments
+    props.setProperty(LogConfig.RetentionMsProp, "-1")
+    props.setProperty(LogConfig.RetentionBytesProp, "-1")
     LogConfig.validateValues(props)
     val defaultLogConfig = LogConfig(props)
 
     if (config.logSegmentBytes < config.logSegmentMinBytes) {
-      throw new InvalidConfigurationException(s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+      throw new InvalidConfigurationException(
+        s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
+      )
+    } else if (defaultLogConfig.retentionMs >= 0) {
+      throw new InvalidConfigurationException(
+        s"Cannot set ${LogConfig.RetentionMsProp} above -1: ${defaultLogConfig.retentionMs}."
+      )
+    } else if (defaultLogConfig.retentionSize >= 0) {
+      throw new InvalidConfigurationException(
+        s"Cannot set ${LogConfig.RetentionBytesProp} above -1: ${defaultLogConfig.retentionSize}."
+      )
     }
 
     val log = UnifiedLog(
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index bf37719f73..49b1206606 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -66,13 +66,13 @@ final class KafkaMetadataLogTest {
     props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
     assertThrows(classOf[InvalidConfigurationException], () => {
       val kafkaConfig = KafkaConfig.fromProps(props)
-      val metadataConfig = MetadataLogConfig.apply(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+      val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
       buildMetadataLog(tempDir, mockTime, metadataConfig)
     })
 
     props.put(MetadataLogSegmentMinBytesProp, Int.box(10240))
     val kafkaConfig = KafkaConfig.fromProps(props)
-    val metadataConfig = MetadataLogConfig.apply(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+    val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
     buildMetadataLog(tempDir, mockTime, metadataConfig)
   }
 
@@ -869,6 +869,56 @@ final class KafkaMetadataLogTest {
       })
     })
   }
+
+  @Test
+  def testSegmentsLessThanLatestSnapshot(): Unit = {
+    val config = DefaultMetadataLogConfig.copy(
+      logSegmentBytes = 10240,
+      logSegmentMinBytes = 10240,
+      logSegmentMillis = 10 * 1000,
+      retentionMaxBytes = 10240,
+      retentionMillis = 60 * 1000,
+      maxBatchSizeInBytes = 200
+    )
+    val log = buildMetadataLog(tempDir, mockTime, config)
+
+    // Generate enough data to cause a segment roll
+    for (_ <- 0 to 2000) {
+      append(log, 10, 1)
+    }
+    log.updateHighWatermark(new LogOffsetMetadata(log.endOffset.offset))
+
+    // The clean up code requires that there are at least two snapshots
+    // Generate first snapshots that includes the first segment by using the base offset of the second segment
+    val snapshotId1 = new OffsetAndEpoch(
+      log.log.logSegments.drop(1).head.baseOffset,
+      1
+    )
+    TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot =>
+      snapshot.freeze()
+    }
+    // Generate second snapshots that includes the second segment by using the base offset of the third segment
+    val snapshotId2 = new OffsetAndEpoch(
+      log.log.logSegments.drop(2).head.baseOffset,
+      1
+    )
+    TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot =>
+      snapshot.freeze()
+    }
+
+    // Sleep long enough to trigger a possible segment delete because of the default retention
+    val defaultLogRetentionMs = Defaults.RetentionMs * 2
+    mockTime.sleep(defaultLogRetentionMs)
+
+    assertTrue(log.maybeClean())
+    assertEquals(1, log.snapshotCount())
+    assertTrue(log.startOffset > 0, s"${log.startOffset} must be greater than 0")
+    val latestSnapshotOffset = log.latestSnapshotId().get.offset
+    assertTrue(
+      latestSnapshotOffset >= log.startOffset,
+      s"latest snapshot offset ($latestSnapshotOffset) must be >= log start offset (${log.startOffset})"
+    )
+  }
 }
 
 object KafkaMetadataLogTest {