You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/01 20:07:10 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

jsancio commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662550101



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1048,9 +1058,14 @@ object KafkaConfig {
       .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
       .defineInternal(BrokerHeartbeatIntervalMsProp, INT, Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
       .defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
-      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
       .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc)
       .defineInternal(SaslMechanismControllerProtocolProp, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SaslMechanismControllerProtocolDoc)
+      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
+      .defineInternal(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+      .defineInternal(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
+      .defineInternal(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
+      .defineInternal(MetadataMaxRetentionMillisProp, LONG, null, null, HIGH, MetadataMaxRetentionMillisDoc)

Review comment:
       Are we going to keep the properties added in this PR as internal after 3.0. If not, let's just make them public now.
   
   Do we need a default value to the retention milliseconds property?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2651,7 +2651,7 @@ object Log extends Logging {
    * @tparam T the type of object held within the iterator
    * @return Some(iterator.next) if a next element exists, None otherwise.
    */
-  private def nextOption[T](iterator: Iterator[T]): Option[T] = {
+  def nextOption[T](iterator: Iterator[T]): Option[T] = {

Review comment:
       I don't think you need to make this public. Scala's `Iterator` has `nextOption`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {

Review comment:
       Why change and replace the implementation for `deleteBeforeSnapshot`? For example, why not always delete every snapshot that is less than `nextSnapshotId` when the `if` statement predicate is true?
   
   For example, `log.deleteOldSegments()` deletes every segment that is less than the log start offset. Why not also delete every snapshot that is less than the log start offset which is the same as `nextSnapshotId`?
   
   This deletes both segments and snapshot. The old name of `deleteBeforeSnapshot` seems more accurate.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -817,11 +863,19 @@ object KafkaMetadataLogTest {
     }
   }
 
+  val DefaultMetadataLogConfig = new MetadataLogConfig(

Review comment:
       This is a `case class` so new not needed or recommended.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2153,14 +2154,36 @@ private boolean maybeCompleteShutdown(long currentTimeMs) {
         return false;
     }
 
-    private void maybeDeleteBeforeSnapshot() {
-        log.latestSnapshotId().ifPresent(snapshotId -> {
-            quorum.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset >= snapshotId.offset) {
-                    log.deleteBeforeSnapshot(snapshotId);
+    /**
+     * A simple timer based log cleaner
+     */
+    private static class RaftMetadataLogCleaner {
+        private final Logger logger;
+        private final Timer timer;
+        private final long delayMs;
+        private final Runnable cleaner;
+
+        RaftMetadataLogCleaner(Logger logger, Time time, long delayMs, Runnable cleaner) {
+            this.logger = logger;
+            this.timer = time.timer(delayMs);
+            this.delayMs = delayMs;
+            this.cleaner = cleaner;
+        }
+
+        public boolean maybeClean(long currentTimeMs) {
+            timer.update(currentTimeMs);
+            if (timer.isExpired()) {

Review comment:
       Do we need this timer because `log.maybeClean` is expensive even when there are no snapshots to clean?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {

Review comment:
       Sounds good. Can we move this implementation to `SnapshotReader`? It will match what we are doing here https://github.com/apache/kafka/pull/10946/files#diff-83295dbf4af9755c79987b390f72f53dda47af9f82eb8305755d90da18d5b9f2R85

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2089,7 +2090,7 @@ private long pollUnattachedAsObserver(UnattachedState state, long currentTimeMs)
     }
 
     private long pollCurrentState(long currentTimeMs) {
-        maybeDeleteBeforeSnapshot();
+        snapshotCleaner.maybeClean(currentTimeMs);

Review comment:
       We need to communicate the cleaner's timeout to the poll method so that knows for how long to wait in the message queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org