You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/10 19:50:14 UTC

[GitHub] [kafka] mumrah opened a new pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

mumrah opened a new pull request #10864:
URL: https://github.com/apache/kafka/pull/10864


   This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support periodic cleaning of old log segments and snapshots. 
   
   TODO the rest of the description
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662517293



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,34 +16,41 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
   topicPartition: TopicPartition,
   maxFetchSizeInBytes: Int,
-  val fileDeleteDelayMs: Long // Visible for testing,
+  // Visible for testing

Review comment:
       I'll clean this up by moving some of these into the MetadataLogConfig object




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660880726



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => Boolean): Boolean = {
+    if (snapshots.size < 2)
+      return false;
+
+    var didClean = false
+    snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+      case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+        if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, nextSnapshot)) {
+          didClean = true
+          true
+        } else {
+          false
+        }
+      case _ => false // Shouldn't get here with sliding(2)
+    }
+    didClean
+  }

Review comment:
       @jsancio how does this look? The `sliding(2)` will cover the case of not deleting the last snapshot. Now the predicate and deleteSnapshot can get rid of the optional.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660837401



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -358,6 +485,7 @@ final class KafkaMetadataLog private (
     expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
   ): Unit = {
     expiredSnapshots.foreach { case (snapshotId, _) =>
+      info(s"Marking snapshot $snapshotId for deletion")

Review comment:
       I assumed the class included some common log info like that, but looks like it doesn't. I'll add it here and open a separate PR to add some basic fields to the logIdent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662470041



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,34 +16,41 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
   topicPartition: TopicPartition,
   maxFetchSizeInBytes: Int,
-  val fileDeleteDelayMs: Long // Visible for testing,
+  // Visible for testing
+  val fileDeleteDelayMs: Long,
+  val retentionSize: Long,
+  val retentionMs: Long,
 ) extends ReplicatedLog with Logging {
 
+  this.logIdent = s"[MetadataLog partition=$topicPartition, dir=${log.dir.getParent}] "

Review comment:
       You need to include the node ID here. Otherwise you'll have a bunch of confusing identical log prefixes for each node when running junit tests.
   
   I also think including the log directory here is a bad idea, since it can be very long, and there is only one per node anyway. So it's just noise, really.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662643681



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1048,9 +1058,14 @@ object KafkaConfig {
       .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
       .defineInternal(BrokerHeartbeatIntervalMsProp, INT, Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
       .defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
-      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
       .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc)
       .defineInternal(SaslMechanismControllerProtocolProp, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SaslMechanismControllerProtocolDoc)
+      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
+      .defineInternal(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+      .defineInternal(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
+      .defineInternal(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
+      .defineInternal(MetadataMaxRetentionMillisProp, LONG, null, null, HIGH, MetadataMaxRetentionMillisDoc)

Review comment:
       They'll be public in 3.0.
   
   We can do it in this PR or later -- doesn't matter to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662494186



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {

Review comment:
       I think we're not populating that field yet, so we'd have to fix that first...
   let's do this in a follow-on, as we discussed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662623797



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {

Review comment:
       I think it's mostly a side effect of how this PR evolved. Now that we don't really even need `nextSnapshotId` when determining if a snapshot can be deleted, I can probably change this delete method to be more like it was originally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662469119



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,34 +16,41 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
   topicPartition: TopicPartition,
   maxFetchSizeInBytes: Int,
-  val fileDeleteDelayMs: Long // Visible for testing,
+  // Visible for testing

Review comment:
       These values are immutable, so do they even need to be private, even if testing wasn't an issue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662635099



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2153,14 +2154,36 @@ private boolean maybeCompleteShutdown(long currentTimeMs) {
         return false;
     }
 
-    private void maybeDeleteBeforeSnapshot() {
-        log.latestSnapshotId().ifPresent(snapshotId -> {
-            quorum.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset >= snapshotId.offset) {
-                    log.deleteBeforeSnapshot(snapshotId);
+    /**
+     * A simple timer based log cleaner
+     */
+    private static class RaftMetadataLogCleaner {
+        private final Logger logger;
+        private final Timer timer;
+        private final long delayMs;
+        private final Runnable cleaner;
+
+        RaftMetadataLogCleaner(Logger logger, Time time, long delayMs, Runnable cleaner) {
+            this.logger = logger;
+            this.timer = time.timer(delayMs);
+            this.delayMs = delayMs;
+            this.cleaner = cleaner;
+        }
+
+        public boolean maybeClean(long currentTimeMs) {
+            timer.update(currentTimeMs);
+            if (timer.isExpired()) {

Review comment:
       I think `cleaner#clean` is expensive since it looks at all file sizes (among other things). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660836052



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = {
+    val snapshotIterator = snapshots.keys.iterator
+    var snapshotOpt = Log.nextOption(snapshotIterator)
+    var didClean = false
+    while (snapshotOpt.isDefined) {
+      val snapshot = snapshotOpt.get
+      val nextOpt = Log.nextOption(snapshotIterator)
+      if (predicate(snapshot, nextOpt)) {
+        if (deleteSnapshot(snapshot, nextOpt)) {

Review comment:
       That's a good point, I'll clean this up a bit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r654253929



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1392,7 +1392,7 @@ class Log(@volatile private var _dir: File,
    *                  (if there is one) and returns true iff it is deletable
    * @return The number of segments deleted
    */
-  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
+   def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
                                 reason: SegmentDeletionReason): Int = {

Review comment:
       nit: fix indentation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662470041



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,34 +16,41 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
   topicPartition: TopicPartition,
   maxFetchSizeInBytes: Int,
-  val fileDeleteDelayMs: Long // Visible for testing,
+  // Visible for testing
+  val fileDeleteDelayMs: Long,
+  val retentionSize: Long,
+  val retentionMs: Long,
 ) extends ReplicatedLog with Logging {
 
+  this.logIdent = s"[MetadataLog partition=$topicPartition, dir=${log.dir.getParent}] "

Review comment:
       You need to include the node ID here. Otherwise you'll have a bunch of confusing identical log prefixes for each node when running junit tests.
   
   I also think including the log directory here is a bad idea. I suppose if there are multiple metadata log directories then this could be OK, but we don't support that configuration yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662550101



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1048,9 +1058,14 @@ object KafkaConfig {
       .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
       .defineInternal(BrokerHeartbeatIntervalMsProp, INT, Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
       .defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
-      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
       .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc)
       .defineInternal(SaslMechanismControllerProtocolProp, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SaslMechanismControllerProtocolDoc)
+      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
+      .defineInternal(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+      .defineInternal(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
+      .defineInternal(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
+      .defineInternal(MetadataMaxRetentionMillisProp, LONG, null, null, HIGH, MetadataMaxRetentionMillisDoc)

Review comment:
       Are we going to keep the properties added in this PR as internal after 3.0. If not, let's just make them public now.
   
   Do we need a default value to the retention milliseconds property?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2651,7 +2651,7 @@ object Log extends Logging {
    * @tparam T the type of object held within the iterator
    * @return Some(iterator.next) if a next element exists, None otherwise.
    */
-  private def nextOption[T](iterator: Iterator[T]): Option[T] = {
+  def nextOption[T](iterator: Iterator[T]): Option[T] = {

Review comment:
       I don't think you need to make this public. Scala's `Iterator` has `nextOption`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {

Review comment:
       Why change and replace the implementation for `deleteBeforeSnapshot`? For example, why not always delete every snapshot that is less than `nextSnapshotId` when the `if` statement predicate is true?
   
   For example, `log.deleteOldSegments()` deletes every segment that is less than the log start offset. Why not also delete every snapshot that is less than the log start offset which is the same as `nextSnapshotId`?
   
   This deletes both segments and snapshot. The old name of `deleteBeforeSnapshot` seems more accurate.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -817,11 +863,19 @@ object KafkaMetadataLogTest {
     }
   }
 
+  val DefaultMetadataLogConfig = new MetadataLogConfig(

Review comment:
       This is a `case class` so new not needed or recommended.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2153,14 +2154,36 @@ private boolean maybeCompleteShutdown(long currentTimeMs) {
         return false;
     }
 
-    private void maybeDeleteBeforeSnapshot() {
-        log.latestSnapshotId().ifPresent(snapshotId -> {
-            quorum.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset >= snapshotId.offset) {
-                    log.deleteBeforeSnapshot(snapshotId);
+    /**
+     * A simple timer based log cleaner
+     */
+    private static class RaftMetadataLogCleaner {
+        private final Logger logger;
+        private final Timer timer;
+        private final long delayMs;
+        private final Runnable cleaner;
+
+        RaftMetadataLogCleaner(Logger logger, Time time, long delayMs, Runnable cleaner) {
+            this.logger = logger;
+            this.timer = time.timer(delayMs);
+            this.delayMs = delayMs;
+            this.cleaner = cleaner;
+        }
+
+        public boolean maybeClean(long currentTimeMs) {
+            timer.update(currentTimeMs);
+            if (timer.isExpired()) {

Review comment:
       Do we need this timer because `log.maybeClean` is expensive even when there are no snapshots to clean?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {

Review comment:
       Sounds good. Can we move this implementation to `SnapshotReader`? It will match what we are doing here https://github.com/apache/kafka/pull/10946/files#diff-83295dbf4af9755c79987b390f72f53dda47af9f82eb8305755d90da18d5b9f2R85

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2089,7 +2090,7 @@ private long pollUnattachedAsObserver(UnattachedState state, long currentTimeMs)
     }
 
     private long pollCurrentState(long currentTimeMs) {
-        maybeDeleteBeforeSnapshot();
+        snapshotCleaner.maybeClean(currentTimeMs);

Review comment:
       We need to communicate the cleaner's timeout to the poll method so that knows for how long to wait in the message queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662470041



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,34 +16,41 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
   topicPartition: TopicPartition,
   maxFetchSizeInBytes: Int,
-  val fileDeleteDelayMs: Long // Visible for testing,
+  // Visible for testing
+  val fileDeleteDelayMs: Long,
+  val retentionSize: Long,
+  val retentionMs: Long,
 ) extends ReplicatedLog with Logging {
 
+  this.logIdent = s"[MetadataLog partition=$topicPartition, dir=${log.dir.getParent}] "

Review comment:
       You need to include the node ID here. Otherwise you'll have a bunch of confusing identical log prefixes for each node when running junit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r649482956



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -294,19 +298,14 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
+  override def deleteBeforeSnapshot(deleteBeforeSnapshotId: OffsetAndEpoch): Boolean = {

Review comment:
       Since this is no longer used to increase the Log Start Offset, I think this method can go away.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660808793



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,32 +16,37 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
-  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
+  val snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],

Review comment:
       I don't think we should do this. Access to this data needs to be synchronized.
   
   It is also very likely that this is an implementation detail that will change in the near future.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -244,10 +246,10 @@ public void testFetchRequestOffsetLessThanLogStart() throws Exception {
             assertEquals(snapshotId, snapshot.snapshotId());
             snapshot.freeze();
         }
-
+        context.log.deleteBeforeSnapshot(snapshotId);
         context.client.poll();
 
-        assertEquals(snapshotId.offset, context.log.startOffset());
+        //context.log.logStartOffset(snapshotId.offset);

Review comment:
       Commented out code.

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -316,6 +316,11 @@ public void flush() {
         lastFlushedOffset = endOffset().offset;
     }
 
+    @Override
+    public boolean maybeClean() {
+        return false;
+    }

Review comment:
       Are you planning to implement this and use this instead of `deleteBeforeSnapshot` below? Most of the tests in the `raft` module depend on `MockLog` having the same semantic as `KafkaMetadataLog`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = {
+    val snapshotIterator = snapshots.keys.iterator
+    var snapshotOpt = Log.nextOption(snapshotIterator)
+    var didClean = false
+    while (snapshotOpt.isDefined) {
+      val snapshot = snapshotOpt.get
+      val nextOpt = Log.nextOption(snapshotIterator)
+      if (predicate(snapshot, nextOpt)) {
+        if (deleteSnapshot(snapshot, nextOpt)) {

Review comment:
       `deleteSnapshot` is a noop if `nextOpt` is `None`. If we move that check here then I think we can simplify a bit of the code an for example change the predicate to `(OffsetAndEpoch, OffsetAndEpoch) => Boolean`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = {
+    val snapshotIterator = snapshots.keys.iterator
+    var snapshotOpt = Log.nextOption(snapshotIterator)
+    var didClean = false
+    while (snapshotOpt.isDefined) {
+      val snapshot = snapshotOpt.get
+      val nextOpt = Log.nextOption(snapshotIterator)
+      if (predicate(snapshot, nextOpt)) {
+        if (deleteSnapshot(snapshot, nextOpt)) {
+          didClean = true
+          snapshotOpt = nextOpt
+        } else {
+          snapshotOpt = None
+        }
+      } else {
+        snapshotOpt = None
+      }
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (retentionMs < 0)
+      return false
+
+    // If the timestamp of the first batch in the _next_ snapshot exceeds retention time, then we infer that
+    // the current snapshot also exceeds retention time. We make this inference to avoid reading the full snapshot.

Review comment:
       I see. Is this code simplified now that we include `LastContainedLogTimestamp` in the header of the snapshot?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {

Review comment:
       PR https://github.com/apache/kafka/pull/10899 added a header that includes the `LastContainedLogTimestamp`. Should we change this method to use that instead?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -358,6 +485,7 @@ final class KafkaMetadataLog private (
     expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
   ): Unit = {
     expiredSnapshots.foreach { case (snapshotId, _) =>
+      info(s"Marking snapshot $snapshotId for deletion")

Review comment:
       Let's either include the topic partition or the log dir of this log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r663445933



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,28 +311,151 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
+  /**
+   * Delete snapshots that come before a given snapshot ID. This is done by advancing the log start offset to the given
+   * snapshot and cleaning old log segments.
+   *
+   * This will only happen if the following invariants all hold true:
+   *
+   * <li>The given snapshot precedes the latest snapshot</li>
+   * <li>The offset of the given snapshot is greater than the log start offset</li>
+   * <li>The log layer can advance the offset to the given snapshot</li>
+   *
+   * This method is thread-safe
+   */
+  override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
     val (deleted, forgottenSnapshots) = snapshots synchronized {
       latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
-
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case Some(latestSnapshotId) if
+          snapshots.contains(snapshotId) &&
+          startOffset < snapshotId.offset &&
+          snapshotId.offset <= latestSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(snapshotId.offset, SnapshotGenerated) =>
+            // Delete all segments that have a "last offset" less than the log start offset
+            log.deleteOldSegments()
+            // Remove older snapshots from the snapshots cache
+            (true, forgetSnapshotsBefore(snapshotId))
         case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+            (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
       }
     }
-
     removeSnapshots(forgottenSnapshots)
     deleted
   }
 
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
+
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val batchIterator = reader.records().batchIterator()
+
+      val firstBatch = batchIterator.next()
+      val records = firstBatch.streamingIterator(new BufferSupplier.GrowableBufferSupplier())
+      if (firstBatch.isControlBatch) {
+        val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+        Some(header.lastContainedLogTimestamp())
+      } else {
+        warn("Did not find control record at beginning of snapshot")
+        None
+      }
+    }
+  }
+
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch) => Boolean): Boolean = {
+    if (snapshots.size < 2)
+      return false;

Review comment:
       nit: unnecessary ";"

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,28 +311,151 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
+  /**
+   * Delete snapshots that come before a given snapshot ID. This is done by advancing the log start offset to the given
+   * snapshot and cleaning old log segments.
+   *
+   * This will only happen if the following invariants all hold true:
+   *
+   * <li>The given snapshot precedes the latest snapshot</li>
+   * <li>The offset of the given snapshot is greater than the log start offset</li>
+   * <li>The log layer can advance the offset to the given snapshot</li>
+   *
+   * This method is thread-safe
+   */
+  override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
     val (deleted, forgottenSnapshots) = snapshots synchronized {
       latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
-
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case Some(latestSnapshotId) if
+          snapshots.contains(snapshotId) &&
+          startOffset < snapshotId.offset &&
+          snapshotId.offset <= latestSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(snapshotId.offset, SnapshotGenerated) =>
+            // Delete all segments that have a "last offset" less than the log start offset
+            log.deleteOldSegments()
+            // Remove older snapshots from the snapshots cache
+            (true, forgetSnapshotsBefore(snapshotId))
         case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+            (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
       }
     }
-
     removeSnapshots(forgottenSnapshots)
     deleted
   }
 
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
+
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val batchIterator = reader.records().batchIterator()
+
+      val firstBatch = batchIterator.next()
+      val records = firstBatch.streamingIterator(new BufferSupplier.GrowableBufferSupplier())
+      if (firstBatch.isControlBatch) {
+        val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+        Some(header.lastContainedLogTimestamp())
+      } else {
+        warn("Did not find control record at beginning of snapshot")
+        None
+      }
+    }
+  }
+
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()

Review comment:
       can we use | to replace || here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r664594269



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,28 +311,151 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
+  /**
+   * Delete snapshots that come before a given snapshot ID. This is done by advancing the log start offset to the given
+   * snapshot and cleaning old log segments.
+   *
+   * This will only happen if the following invariants all hold true:
+   *
+   * <li>The given snapshot precedes the latest snapshot</li>
+   * <li>The offset of the given snapshot is greater than the log start offset</li>
+   * <li>The log layer can advance the offset to the given snapshot</li>
+   *
+   * This method is thread-safe
+   */
+  override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
     val (deleted, forgottenSnapshots) = snapshots synchronized {
       latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
-
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case Some(latestSnapshotId) if
+          snapshots.contains(snapshotId) &&
+          startOffset < snapshotId.offset &&
+          snapshotId.offset <= latestSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(snapshotId.offset, SnapshotGenerated) =>
+            // Delete all segments that have a "last offset" less than the log start offset
+            log.deleteOldSegments()
+            // Remove older snapshots from the snapshots cache
+            (true, forgetSnapshotsBefore(snapshotId))
         case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+            (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
       }
     }
-
     removeSnapshots(forgottenSnapshots)
     deleted
   }
 
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
+
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val batchIterator = reader.records().batchIterator()
+
+      val firstBatch = batchIterator.next()
+      val records = firstBatch.streamingIterator(new BufferSupplier.GrowableBufferSupplier())
+      if (firstBatch.isControlBatch) {
+        val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+        Some(header.lastContainedLogTimestamp())
+      } else {
+        warn("Did not find control record at beginning of snapshot")
+        None
+      }
+    }
+  }
+
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()

Review comment:
       Good catch, @dengziming. We should always try to clean both ways. I'll rewrite this part (since I don't like using `|` as it's fairly uncommon)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660879928



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -358,6 +485,7 @@ final class KafkaMetadataLog private (
     expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
   ): Unit = {
     expiredSnapshots.foreach { case (snapshotId, _) =>
+      info(s"Marking snapshot $snapshotId for deletion")

Review comment:
       I went ahead and added a logIdent to this class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r664605539



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2153,14 +2154,36 @@ private boolean maybeCompleteShutdown(long currentTimeMs) {
         return false;
     }
 
-    private void maybeDeleteBeforeSnapshot() {
-        log.latestSnapshotId().ifPresent(snapshotId -> {
-            quorum.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset >= snapshotId.offset) {
-                    log.deleteBeforeSnapshot(snapshotId);
+    /**
+     * A simple timer based log cleaner
+     */
+    private static class RaftMetadataLogCleaner {
+        private final Logger logger;
+        private final Timer timer;
+        private final long delayMs;
+        private final Runnable cleaner;
+
+        RaftMetadataLogCleaner(Logger logger, Time time, long delayMs, Runnable cleaner) {
+            this.logger = logger;
+            this.timer = time.timer(delayMs);
+            this.delayMs = delayMs;
+            this.cleaner = cleaner;
+        }
+
+        public boolean maybeClean(long currentTimeMs) {
+            timer.update(currentTimeMs);
+            if (timer.isExpired()) {

Review comment:
       Yea, it seemed prudent to only check for the cleaning conditions no more often as we are doing the actual cleaning. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662506190



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => Boolean): Boolean = {
+    if (snapshots.size < 2)
+      return false;
+
+    var didClean = false
+    snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+      case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+        if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, nextSnapshot)) {
+          didClean = true
+          true
+        } else {
+          false
+        }
+      case _ => false // Shouldn't get here with sliding(2)
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (retentionMs < 0)
+      return false
+
+    // If the timestamp of the first batch in the _next_ snapshot exceeds retention time, then we infer that
+    // the current snapshot also exceeds retention time. We make this inference to avoid reading the full snapshot.
+    def shouldClean(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+      val now = time.milliseconds()
+      firstBatchMaxTimestamp(nextSnapshotId).exists { timestamp =>
+        if (now - timestamp > retentionMs) {
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    cleanSnapshots(shouldClean)
+  }
+
+  private def cleanSnapshotsRetentionSize(): Boolean = {

Review comment:
       Down a few lines when we decide if we should clean a snapshot we are looking at the total size of logs and snapshots. https://github.com/apache/kafka/pull/10864/files/af08e3c62e2953feee5afc07e5b35962f8b306d4#diff-b332f85b04775c821226b6f704e91d51f9647f29ba73dace65b99cf36f6b9ceaR444-R449
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r664870271



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -316,6 +316,11 @@ public void flush() {
         lastFlushedOffset = endOffset().offset;
     }
 
+    @Override
+    public boolean maybeClean() {
+        return false;
+    }

Review comment:
       I'm going to address this further in a follow-up PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662643922



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {

Review comment:
       Yes, it seems reasonable to just keep `deleteBeforeSnapshot`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#issuecomment-875089636


   Merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660839303



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,32 +16,37 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
-  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
+  val snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],

Review comment:
       Fair enough. I need a way to get the number of snapshots from the test code. Maybe just a package private accessor like 
   
   ```scala
   private[raft] def snapshotCount(): Int = { ... } 
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe edited a comment on pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe edited a comment on pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#issuecomment-875089636


   Merged to trunk and 3.0.
   
   Thanks, @mumrah.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660836532



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = {
+    val snapshotIterator = snapshots.keys.iterator
+    var snapshotOpt = Log.nextOption(snapshotIterator)
+    var didClean = false
+    while (snapshotOpt.isDefined) {
+      val snapshot = snapshotOpt.get
+      val nextOpt = Log.nextOption(snapshotIterator)
+      if (predicate(snapshot, nextOpt)) {
+        if (deleteSnapshot(snapshot, nextOpt)) {
+          didClean = true
+          snapshotOpt = nextOpt
+        } else {
+          snapshotOpt = None
+        }
+      } else {
+        snapshotOpt = None
+      }
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (retentionMs < 0)
+      return false
+
+    // If the timestamp of the first batch in the _next_ snapshot exceeds retention time, then we infer that
+    // the current snapshot also exceeds retention time. We make this inference to avoid reading the full snapshot.

Review comment:
       Yes, we can and should make use of that timestamp. This was my interim solution in case that change didn't make 3.0 :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662517391



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,34 +16,41 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
   topicPartition: TopicPartition,
   maxFetchSizeInBytes: Int,
-  val fileDeleteDelayMs: Long // Visible for testing,
+  // Visible for testing
+  val fileDeleteDelayMs: Long,
+  val retentionSize: Long,
+  val retentionMs: Long,
 ) extends ReplicatedLog with Logging {
 
+  this.logIdent = s"[MetadataLog partition=$topicPartition, dir=${log.dir.getParent}] "

Review comment:
       Sounds good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662505337



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {

Review comment:
       This is calling `FileRawSnapshotReader#sizeInBytes` which in turn calls `FileRecords#sizeInBytes` which is just calling the `FileChannel` method you mention. The readSnapshot method doesn't actually read the snapshot, it just returns the reader object (maybe we should rename that for clarity)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662622540



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1048,9 +1058,14 @@ object KafkaConfig {
       .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
       .defineInternal(BrokerHeartbeatIntervalMsProp, INT, Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
       .defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
-      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
       .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc)
       .defineInternal(SaslMechanismControllerProtocolProp, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SaslMechanismControllerProtocolDoc)
+      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
+      .defineInternal(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+      .defineInternal(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
+      .defineInternal(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
+      .defineInternal(MetadataMaxRetentionMillisProp, LONG, null, null, HIGH, MetadataMaxRetentionMillisDoc)

Review comment:
       I'm fine making the properties non-internal in this PR.
   
   As for the retention millis default, how about we set it to 7 days (same as default log retention)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe closed pull request #10864:
URL: https://github.com/apache/kafka/pull/10864


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662498528



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => Boolean): Boolean = {
+    if (snapshots.size < 2)
+      return false;
+
+    var didClean = false
+    snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+      case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+        if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, nextSnapshot)) {
+          didClean = true
+          true
+        } else {
+          false
+        }
+      case _ => false // Shouldn't get here with sliding(2)
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (retentionMs < 0)
+      return false
+
+    // If the timestamp of the first batch in the _next_ snapshot exceeds retention time, then we infer that
+    // the current snapshot also exceeds retention time. We make this inference to avoid reading the full snapshot.
+    def shouldClean(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+      val now = time.milliseconds()
+      firstBatchMaxTimestamp(nextSnapshotId).exists { timestamp =>
+        if (now - timestamp > retentionMs) {
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    cleanSnapshots(shouldClean)
+  }
+
+  private def cleanSnapshotsRetentionSize(): Boolean = {

Review comment:
       It seems like we should be looking at the size of all files in the log directory, right? If we just look at snapshot sizes, log sizes could push us over our max.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662623797



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {

Review comment:
       I think it's mostly a side effect of how this PR evolved. 
   
   Edit: 
   I can revert most of this method to be like it was originally. Since we'll be calling it for one snapshot at a time starting with the oldest, i think the behavior will be the same. 
   
   Are you thinking of other use cases for `deleteBeforeSnapshot`, like truncation maybe?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662620593



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2651,7 +2651,7 @@ object Log extends Logging {
    * @tparam T the type of object held within the iterator
    * @return Some(iterator.next) if a next element exists, None otherwise.
    */
-  private def nextOption[T](iterator: Iterator[T]): Option[T] = {
+  def nextOption[T](iterator: Iterator[T]): Option[T] = {

Review comment:
       I was originally using the Scala one, but it wasn't compiling for some reason. Turns out I don't need it anyways so I can revert this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r664604339



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2089,7 +2090,7 @@ private long pollUnattachedAsObserver(UnattachedState state, long currentTimeMs)
     }
 
     private long pollCurrentState(long currentTimeMs) {
-        maybeDeleteBeforeSnapshot();
+        snapshotCleaner.maybeClean(currentTimeMs);

Review comment:
       Ok, I see the issue. I'll add something like:
   
   ```java
           long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
           long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
           long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);
   ```
   
   in the poll method. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662493849



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {

Review comment:
       Let's just look at the file size on disk. Reading and deserializing the entire snapshot seems like huge overkill.
   
   There should be a `FileChannel` method that can get the file size in O(1)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662522112



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => Boolean): Boolean = {
+    if (snapshots.size < 2)
+      return false;
+
+    var didClean = false
+    snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+      case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+        if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, nextSnapshot)) {
+          didClean = true
+          true
+        } else {
+          false
+        }
+      case _ => false // Shouldn't get here with sliding(2)
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (retentionMs < 0)
+      return false
+
+    // If the timestamp of the first batch in the _next_ snapshot exceeds retention time, then we infer that
+    // the current snapshot also exceeds retention time. We make this inference to avoid reading the full snapshot.
+    def shouldClean(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+      val now = time.milliseconds()
+      firstBatchMaxTimestamp(nextSnapshotId).exists { timestamp =>
+        if (now - timestamp > retentionMs) {
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    cleanSnapshots(shouldClean)
+  }
+
+  private def cleanSnapshotsRetentionSize(): Boolean = {

Review comment:
       Thanks. I missed that earlier.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662643427



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2089,7 +2090,7 @@ private long pollUnattachedAsObserver(UnattachedState state, long currentTimeMs)
     }
 
     private long pollCurrentState(long currentTimeMs) {
-        maybeDeleteBeforeSnapshot();
+        snapshotCleaner.maybeClean(currentTimeMs);

Review comment:
       Yes, it would be good if maybeClean could return how long the cleaner(manager) wants to wait.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r652028042



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -253,7 +253,8 @@ class KafkaRaftManager[T](
       time,
       scheduler,
       maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-      maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+      maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+      config = MetadataLogConfig(config)

Review comment:
       should maxBatchSizeInBytes be part of MetadataLogConfig?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662509722



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2153,14 +2154,36 @@ private boolean maybeCompleteShutdown(long currentTimeMs) {
         return false;
     }
 
-    private void maybeDeleteBeforeSnapshot() {
-        log.latestSnapshotId().ifPresent(snapshotId -> {
-            quorum.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset >= snapshotId.offset) {
-                    log.deleteBeforeSnapshot(snapshotId);
+    /**
+     * A simple timer based log cleaner
+     */
+    private static class RaftMetadataLogCleaner {

Review comment:
       How about "CleanerManager" instead of "Cleaner"? The Runnable is what actually does the cleaning, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org