You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/17 13:41:03 UTC
[kafka] branch 3.3 updated: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot (#12655)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 74c4bbfaf9 KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot (#12655)
74c4bbfaf9 is described below
commit 74c4bbfaf9a7da16baf0219df2613901aa706639
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Sat Sep 17 06:30:50 2022 -0700
KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot (#12655)
Disable segment deletion based on size and time by setting the KRaft metadata log's `RetentionMsProp` and `RetentionBytesProp` to `-1`. This will cause `UnifiedLog.deleteRetentionMsBreachedSegments` and `UnifiedLog.deleteRetentionSizeBreachedSegments` to short circuit instead of deleting segments.
Without this changes the included test would fail. This happens because `deleteRetentionMsBreachedSegments` is able to delete past the `logStartOffset`. Deleting past the `logStartOffset` would violate the invariant that if the `logStartOffset` is greater than 0 then there is a snapshot with an end offset greater than or equal to the log start offset.
Reviewers: Luke Chen <sh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 26 ++++++++---
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 54 +++++++++++++++++++++-
2 files changed, 72 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 83b8bee444..d112f3b581 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -382,7 +382,7 @@ final class KafkaMetadataLog private (
}
/**
- * Perform cleaning of old snapshots and log segments based on size.
+ * Perform cleaning of old snapshots and log segments based on size and time.
*
* If our configured retention size has been violated, we perform cleaning as follows:
*
@@ -556,15 +556,29 @@ object KafkaMetadataLog {
config: MetadataLogConfig
): KafkaMetadataLog = {
val props = new Properties()
- props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
- props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
- props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
- props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
+ props.setProperty(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
+ props.setProperty(LogConfig.SegmentBytesProp, config.logSegmentBytes.toString)
+ props.setProperty(LogConfig.SegmentMsProp, config.logSegmentMillis.toString)
+ props.setProperty(LogConfig.FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString)
+
+ // Disable time and byte retention when deleting segments
+ props.setProperty(LogConfig.RetentionMsProp, "-1")
+ props.setProperty(LogConfig.RetentionBytesProp, "-1")
LogConfig.validateValues(props)
val defaultLogConfig = LogConfig(props)
if (config.logSegmentBytes < config.logSegmentMinBytes) {
- throw new InvalidConfigurationException(s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+ throw new InvalidConfigurationException(
+ s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
+ )
+ } else if (defaultLogConfig.retentionMs >= 0) {
+ throw new InvalidConfigurationException(
+ s"Cannot set ${LogConfig.RetentionMsProp} above -1: ${defaultLogConfig.retentionMs}."
+ )
+ } else if (defaultLogConfig.retentionSize >= 0) {
+ throw new InvalidConfigurationException(
+ s"Cannot set ${LogConfig.RetentionBytesProp} above -1: ${defaultLogConfig.retentionSize}."
+ )
}
val log = UnifiedLog(
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index bf37719f73..49b1206606 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -66,13 +66,13 @@ final class KafkaMetadataLogTest {
props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
val kafkaConfig = KafkaConfig.fromProps(props)
- val metadataConfig = MetadataLogConfig.apply(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+ val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
buildMetadataLog(tempDir, mockTime, metadataConfig)
})
props.put(MetadataLogSegmentMinBytesProp, Int.box(10240))
val kafkaConfig = KafkaConfig.fromProps(props)
- val metadataConfig = MetadataLogConfig.apply(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+ val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
buildMetadataLog(tempDir, mockTime, metadataConfig)
}
@@ -869,6 +869,56 @@ final class KafkaMetadataLogTest {
})
})
}
+
+ @Test
+ def testSegmentsLessThanLatestSnapshot(): Unit = {
+ val config = DefaultMetadataLogConfig.copy(
+ logSegmentBytes = 10240,
+ logSegmentMinBytes = 10240,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 10240,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = 200
+ )
+ val log = buildMetadataLog(tempDir, mockTime, config)
+
+ // Generate enough data to cause a segment roll
+ for (_ <- 0 to 2000) {
+ append(log, 10, 1)
+ }
+ log.updateHighWatermark(new LogOffsetMetadata(log.endOffset.offset))
+
+ // The clean up code requires that there are at least two snapshots
+ // Generate first snapshots that includes the first segment by using the base offset of the second segment
+ val snapshotId1 = new OffsetAndEpoch(
+ log.log.logSegments.drop(1).head.baseOffset,
+ 1
+ )
+ TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot =>
+ snapshot.freeze()
+ }
+ // Generate second snapshots that includes the second segment by using the base offset of the third segment
+ val snapshotId2 = new OffsetAndEpoch(
+ log.log.logSegments.drop(2).head.baseOffset,
+ 1
+ )
+ TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot =>
+ snapshot.freeze()
+ }
+
+ // Sleep long enough to trigger a possible segment delete because of the default retention
+ val defaultLogRetentionMs = Defaults.RetentionMs * 2
+ mockTime.sleep(defaultLogRetentionMs)
+
+ assertTrue(log.maybeClean())
+ assertEquals(1, log.snapshotCount())
+ assertTrue(log.startOffset > 0, s"${log.startOffset} must be greater than 0")
+ val latestSnapshotOffset = log.latestSnapshotId().get.offset
+ assertTrue(
+ latestSnapshotOffset >= log.startOffset,
+ s"latest snapshot offset ($latestSnapshotOffset) must be >= log start offset (${log.startOffset})"
+ )
+ }
}
object KafkaMetadataLogTest {