You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/08 21:24:31 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r647803084



##########
File path: metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
##########
@@ -74,56 +74,49 @@ String name() {
         this.batch = null;
         this.section = null;
         this.numRecords = 0;
-        this.numWriteTries = 0;
     }
 
     /**
      * Returns the epoch of the snapshot that we are generating.
      */
     long epoch() {
-        return writer.epoch();
+        return writer.lastOffset();

Review comment:
       Is this correct? Seems likely to cause confusion if it is.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1009,7 +999,7 @@ private QuorumController(LogContext logContext,
             snapshotRegistry, sessionTimeoutNs, replicaPlacer);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
-        this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
+        this.snapshotGeneratorManager = new SnapshotGeneratorManager(raftClient::createSnapshot);

Review comment:
       Passing through the function is a tad odd. We actually could just use the implicit reference to `raftClient`. Was this done for testing or something?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -233,18 +233,40 @@ final class KafkaMetadataLog private (
     log.topicId.get
   }
 
-  override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = {
-    // Do not let the state machine create snapshots older than the latest snapshot
-    latestSnapshotId().ifPresent { latest =>
-      if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) {
-        // Since snapshots are less than the high-watermark absolute offset comparison is okay.
-        throw new IllegalArgumentException(
-          s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)"
-        )
-      }
+  override def createSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
+    if (snapshots.contains(snapshotId)) {
+      Optional.empty()
+    } else {
+      Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)))
+    }
+  }
+
+  override def createSnapshotFromEndOffset(endOffset: Long): Optional[RawSnapshotWriter] = {
+    val highWatermarkOffset = highWatermark.offset
+    if (endOffset > highWatermarkOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
+      )
+    }
+
+    if (endOffset < startOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)"
+      )
+    }
+
+    val epoch = log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match {
+      case Some(epochEntry) =>
+        epochEntry.epoch
+      case None =>
+        // Assume that the end offset falls in the current epoch since based on the check above:

Review comment:
       This confuses me a little bit. The logic in `findEpochEntryByEndOffset` returns the first epoch which has a start offset less than the end offset. Wouldn't that already cover the case of the current epoch? It seems like the case that is uncovered is when the offset is smaller than the start offset of the first cached epoch, but that should be an error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org