You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/16 22:27:52 UTC

[GitHub] [kafka] jsancio opened a new pull request, #12655: Kafka 14238 kraft segment delete

jsancio opened a new pull request, #12655:
URL: https://github.com/apache/kafka/pull/12655

   Disable segment deletion based on size and time by setting the KRaft metadata log's `RetentionMsProp` and `RetentionBytesProp` to `-1`. This will cause `UnifiedLog.deleteRetentionMsBreachedSegments` and `UnifiedLog.deleteRetentionSizeBreachedSegments` to short circuit instead of deleting segments.
   
   Without this changes the included test would fail. This happens because `deleteRetentionMsBreachedSegments` is able to delete past the `logStartOffset`. Deleting past the `logStartOffset` would violate the invariant that if the `logStartOffset` is greater than 0 then there is a snapshot with an end offset greater than or equal to the log start offset.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12655:
URL: https://github.com/apache/kafka/pull/12655#discussion_r973518782


##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -556,15 +556,29 @@ object KafkaMetadataLog {
     config: MetadataLogConfig
   ): KafkaMetadataLog = {
     val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
-    props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
-    props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
-    props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
+    props.setProperty(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
+    props.setProperty(LogConfig.SegmentBytesProp, config.logSegmentBytes.toString)
+    props.setProperty(LogConfig.SegmentMsProp, config.logSegmentMillis.toString)
+    props.setProperty(LogConfig.FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString)
+
+    // Disable time and byte retention when deleting segments
+    props.setProperty(LogConfig.RetentionMsProp, "-1")
+    props.setProperty(LogConfig.RetentionBytesProp, "-1")
     LogConfig.validateValues(props)
     val defaultLogConfig = LogConfig(props)
 
     if (config.logSegmentBytes < config.logSegmentMinBytes) {
-      throw new InvalidConfigurationException(s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+      throw new InvalidConfigurationException(
+        s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
+      )
+    } else if (defaultLogConfig.retentionMs >= 0) {

Review Comment:
   I guess the behavior is the same since we always throw an exception in each block, but it seems a bit odd to use else if for unrelated conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12655:
URL: https://github.com/apache/kafka/pull/12655#issuecomment-1249973833

   > Rerunning build. One of the configuration didn't finish.
   
   I believe this PR will fix the issue: https://github.com/apache/kafka/pull/12639 . I think we should merge it soon to fix our PR gating and trunk/3.3 build status!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12655:
URL: https://github.com/apache/kafka/pull/12655#discussion_r973518261


##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -556,15 +556,29 @@ object KafkaMetadataLog {
     config: MetadataLogConfig
   ): KafkaMetadataLog = {
     val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
-    props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
-    props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
-    props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
+    props.setProperty(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
+    props.setProperty(LogConfig.SegmentBytesProp, config.logSegmentBytes.toString)
+    props.setProperty(LogConfig.SegmentMsProp, config.logSegmentMillis.toString)
+    props.setProperty(LogConfig.FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString)
+
+    // Disable time and byte retention when deleting segments
+    props.setProperty(LogConfig.RetentionMsProp, "-1")
+    props.setProperty(LogConfig.RetentionBytesProp, "-1")
     LogConfig.validateValues(props)
     val defaultLogConfig = LogConfig(props)
 
     if (config.logSegmentBytes < config.logSegmentMinBytes) {
-      throw new InvalidConfigurationException(s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+      throw new InvalidConfigurationException(
+        s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
+      )
+    } else if (defaultLogConfig.retentionMs >= 0) {

Review Comment:
   Is this really else if? Or should it be just if?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12655:
URL: https://github.com/apache/kafka/pull/12655#discussion_r973504381


##########
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala:
##########
@@ -869,6 +869,56 @@ final class KafkaMetadataLogTest {
       })
     })
   }
+
+  @Test
+  def testSegmentsLessThanLatestSnapshot(): Unit = {
+    val config = DefaultMetadataLogConfig.copy(
+      logSegmentBytes = 10240,
+      logSegmentMinBytes = 10240,
+      logSegmentMillis = 10 * 1000,
+      retentionMaxBytes = 10240,
+      retentionMillis = 60 * 1000,
+      maxBatchSizeInBytes = 200
+    )
+    val log = buildMetadataLog(tempDir, mockTime, config)
+
+    // Generate enough data to cause a segment roll
+    for (_ <- 0 to 2000) {
+      append(log, 10, 1)
+    }
+    log.updateHighWatermark(new LogOffsetMetadata(log.endOffset.offset))
+
+    // The clean up code requires that there are at least two snapshots
+    // Generate first snapshots that includes the first segment by using the base offset of the second segment
+    val snapshotId1 = new OffsetAndEpoch(
+      log.log.logSegments.drop(1).head.baseOffset,
+      1
+    )
+    TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot =>
+      snapshot.freeze()
+    }
+    // Generate second snapshots that includes the second segment by using the base offset of the third segment
+    val snapshotId2 = new OffsetAndEpoch(
+      log.log.logSegments.drop(2).head.baseOffset,
+      1
+    )
+    TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot =>
+      snapshot.freeze()
+    }
+
+    // Sleep long enough to trigger a possible segment delete because of the default retention
+    val defaultLogRetentionMs = Defaults.RetentionMs * 2
+    mockTime.sleep(defaultLogRetentionMs)
+
+    assertTrue(log.maybeClean())
+    assertEquals(1, log.snapshotCount())
+    assertTrue(log.startOffset > 0, s"${log.startOffset} must be greater than 0")
+    val latestSnapshotOffset = log.latestSnapshotId().get.offset
+    assertTrue(
+      latestSnapshotOffset >= log.startOffset,
+      s"latest snapshot offset ($latestSnapshotOffset) must be >= log start offset (${log.startOffset})"
+    )

Review Comment:
   Without this change this check fails with:
   ```
   > Task :core:test FAILED
   kafka.raft.KafkaMetadataLogTest.testSegmentLessThanLatestSnapshot() failed, log available in /home/jsancio/work/kafka/core/build/reports/testOutput/kafka.raft.KafkaMetadataLogTest.testSegmentLessThanLatestSnapshot().test.stdoutKafkaMetadataLogTest > testSegmentNotDeleteWithoutSnapshot() FAILED
       org.opentest4j.AssertionFailedError: latest snapshot offset (1440) must be >= log start offset (20010) ==> expected: <true> but was: <false>
           at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
           at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
           at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
           at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
           at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
           at kafka.raft.KafkaMetadataLogTest.testSegmentLessThanLatestSnapshot(KafkaMetadataLogTest.scala:921)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12655:
URL: https://github.com/apache/kafka/pull/12655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12655:
URL: https://github.com/apache/kafka/pull/12655#discussion_r973519728


##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -556,15 +556,29 @@ object KafkaMetadataLog {
     config: MetadataLogConfig
   ): KafkaMetadataLog = {
     val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
-    props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
-    props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
-    props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
+    props.setProperty(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
+    props.setProperty(LogConfig.SegmentBytesProp, config.logSegmentBytes.toString)
+    props.setProperty(LogConfig.SegmentMsProp, config.logSegmentMillis.toString)
+    props.setProperty(LogConfig.FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString)
+
+    // Disable time and byte retention when deleting segments
+    props.setProperty(LogConfig.RetentionMsProp, "-1")
+    props.setProperty(LogConfig.RetentionBytesProp, "-1")
     LogConfig.validateValues(props)
     val defaultLogConfig = LogConfig(props)
 
     if (config.logSegmentBytes < config.logSegmentMinBytes) {
-      throw new InvalidConfigurationException(s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+      throw new InvalidConfigurationException(
+        s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
+      )
+    } else if (defaultLogConfig.retentionMs >= 0) {

Review Comment:
   It is the same. I learned to do this because it lowers the cyclomatic complexity. It looks like the static analyzer understand `else if` but not `throw` when computing that value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on PR #12655:
URL: https://github.com/apache/kafka/pull/12655#issuecomment-1249966244

   Rerunning build. One of the configuration didn't finish.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12655:
URL: https://github.com/apache/kafka/pull/12655#discussion_r973503737


##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -556,15 +556,29 @@ object KafkaMetadataLog {
     config: MetadataLogConfig
   ): KafkaMetadataLog = {
     val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
-    props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
-    props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
-    props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
+    props.setProperty(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
+    props.setProperty(LogConfig.SegmentBytesProp, config.logSegmentBytes.toString)
+    props.setProperty(LogConfig.SegmentMsProp, config.logSegmentMillis.toString)
+    props.setProperty(LogConfig.FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString)
+
+    // Disable time and byte retention when deleting segments
+    props.setProperty(LogConfig.RetentionMsProp, "-1")
+    props.setProperty(LogConfig.RetentionBytesProp, "-1")

Review Comment:
   The long term solution is to also implement this feature documented in KIP-630: https://issues.apache.org/jira/browse/KAFKA-14241



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12655: KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12655:
URL: https://github.com/apache/kafka/pull/12655#discussion_r973518782


##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -556,15 +556,29 @@ object KafkaMetadataLog {
     config: MetadataLogConfig
   ): KafkaMetadataLog = {
     val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
-    props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
-    props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
-    props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
+    props.setProperty(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
+    props.setProperty(LogConfig.SegmentBytesProp, config.logSegmentBytes.toString)
+    props.setProperty(LogConfig.SegmentMsProp, config.logSegmentMillis.toString)
+    props.setProperty(LogConfig.FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString)
+
+    // Disable time and byte retention when deleting segments
+    props.setProperty(LogConfig.RetentionMsProp, "-1")
+    props.setProperty(LogConfig.RetentionBytesProp, "-1")
     LogConfig.validateValues(props)
     val defaultLogConfig = LogConfig(props)
 
     if (config.logSegmentBytes < config.logSegmentMinBytes) {
-      throw new InvalidConfigurationException(s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+      throw new InvalidConfigurationException(
+        s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
+      )
+    } else if (defaultLogConfig.retentionMs >= 0) {

Review Comment:
   I guess the behavior is the same since we always throw an exception in each block, but it seems a bit odd to use else if for unrelated conditions. Feel free to ignore though. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org