You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/28 19:30:59 UTC

[GitHub] [kafka] jsancio opened a new pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

jsancio opened a new pull request #10786:
URL: https://github.com/apache/kafka/pull/10786


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10786:
URL: https://github.com/apache/kafka/pull/10786


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649273990



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -233,18 +233,40 @@ final class KafkaMetadataLog private (
     log.topicId.get
   }
 
-  override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = {
-    // Do not let the state machine create snapshots older than the latest snapshot
-    latestSnapshotId().ifPresent { latest =>
-      if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) {
-        // Since snapshots are less than the high-watermark absolute offset comparison is okay.
-        throw new IllegalArgumentException(
-          s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)"
-        )
-      }
+  override def createSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
+    if (snapshots.contains(snapshotId)) {
+      Optional.empty()
+    } else {
+      Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)))
+    }
+  }
+
+  override def createSnapshotFromEndOffset(endOffset: Long): Optional[RawSnapshotWriter] = {
+    val highWatermarkOffset = highWatermark.offset
+    if (endOffset > highWatermarkOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
+      )
+    }
+
+    if (endOffset < startOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)"
+      )
+    }
+
+    val epoch = log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match {
+      case Some(epochEntry) =>
+        epochEntry.epoch
+      case None =>
+        // Assume that the end offset falls in the current epoch since based on the check above:

Review comment:
       I remove this code. To avoid scanning the leader epoch cache, I reverted the snapshot creation API so that both the offset and the epoch is pass to `createSnapshot`. The new code just validates that the given offset and epoch are valid according to the record batches in the log and leader epoch cache.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651387182



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException {
         appendBatch(numberOfRecords, epoch);
         log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
 
-        try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
+        try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, true).get()) {
             snapshot.freeze();
         }
 
         RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get();
         assertEquals(0, snapshot.sizeInBytes());
     }
 
+    @Test
+    public void testCreateSnapshotValidation() {
+        int numberOfRecords = 10;
+        int firstEpoch = 1;
+        int secondEpoch = 3;
+
+        appendBatch(numberOfRecords, firstEpoch);
+        appendBatch(numberOfRecords, secondEpoch);
+        log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords));
+
+        // Test snapshot id for the first epoch
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(1, firstEpoch), true).get()) { }
+
+        // Test snapshot id for the second epoch
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { }
+    }
+
+    @Test
+    public void testCreateSnapshotLaterThanHighWatermark() {
+        int numberOfRecords = 10;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+        log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch), true)
+        );
+    }
+
+    @Test
+    public void testCreateSnapshotBeforeLogStartOffset() {

Review comment:
       Added few more tests one for much larger epoch, one for much smaller epoch and one for missing epoch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651387879



##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
      * Create a writable snapshot for the given snapshot id.
      *
      * See {@link RawSnapshotWriter} for details on how to use this object. The caller of
-     * this method is responsible for invoking {@link RawSnapshotWriter#close()}.
+     * this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a
+     * snapshot already exists then return an {@link Optional#empty()}.
      *
      * @param snapshotId the end offset and epoch that identifies the snapshot
-     * @return a writable snapshot
+     * @param validate validate the snapshot id against the log
+     * @return a writable snapshot if it doesn't already exists
+     * @throws IllegalArgumentException if validate is true and end offset is greater than the
+     *         high-watermark
+     * @throws IllegalArgumentException if validate is true and end offset is less than the log
+     *         start offset
      */
-    RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId);
+    Optional<RawSnapshotWriter> createSnapshot(OffsetAndEpoch snapshotId, boolean validate);

Review comment:
       Done. Used these suggestions. I couldn't think of better names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r647596275



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -247,6 +247,34 @@ final class KafkaMetadataLog private (
     FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
+  override def createSnapshotFromEndOffset(endOffset: Long): RawSnapshotWriter = {
+    val highWatermarkOffset = highWatermark.offset
+    if (endOffset > highWatermarkOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
+      )
+    }
+
+    if (endOffset < startOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) less or equal to the log start offset ($startOffset)"

Review comment:
       Fixed as part of the merge commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651330764



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
         return snapshot.snapshotId();
     }
 
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastOffsetFromLog() {

Review comment:
       Maybe something like `lastIncludedOffset` or `lastContainedOffset`?

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException {
         appendBatch(numberOfRecords, epoch);
         log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
 
-        try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
+        try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, true).get()) {
             snapshot.freeze();
         }
 
         RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get();
         assertEquals(0, snapshot.sizeInBytes());
     }
 
+    @Test
+    public void testCreateSnapshotValidation() {
+        int numberOfRecords = 10;
+        int firstEpoch = 1;
+        int secondEpoch = 3;
+
+        appendBatch(numberOfRecords, firstEpoch);
+        appendBatch(numberOfRecords, secondEpoch);
+        log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords));
+
+        // Test snapshot id for the first epoch
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(1, firstEpoch), true).get()) { }
+
+        // Test snapshot id for the second epoch
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { }
+    }
+
+    @Test
+    public void testCreateSnapshotLaterThanHighWatermark() {
+        int numberOfRecords = 10;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+        log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch), true)
+        );
+    }
+
+    @Test
+    public void testCreateSnapshotBeforeLogStartOffset() {

Review comment:
       Worth adding any test cases for an invalid epoch?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -180,14 +181,17 @@ default void beginShutdown() {}
     void resign(int epoch);
 
     /**
-     * Create a writable snapshot file for a given offset and epoch.
+     * Create a writable snapshot file for a commmitted offset.

Review comment:
       nit: one extra 'm'

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1101,7 +1101,7 @@ private boolean handleFetchResponse(
                         partitionResponse.snapshotId().epoch()
                     );
 
-                    state.setFetchingSnapshot(Optional.of(log.createSnapshot(snapshotId)));
+                    state.setFetchingSnapshot(log.createSnapshot(snapshotId, false));

Review comment:
       Might be worth a brief comment that the snapshot is expected to be well ahead of the current log, so we have to skip validation.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
      * Create a writable snapshot for the given snapshot id.
      *
      * See {@link RawSnapshotWriter} for details on how to use this object. The caller of
-     * this method is responsible for invoking {@link RawSnapshotWriter#close()}.
+     * this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a
+     * snapshot already exists then return an {@link Optional#empty()}.
      *
      * @param snapshotId the end offset and epoch that identifies the snapshot
-     * @return a writable snapshot
+     * @param validate validate the snapshot id against the log
+     * @return a writable snapshot if it doesn't already exists
+     * @throws IllegalArgumentException if validate is true and end offset is greater than the
+     *         high-watermark
+     * @throws IllegalArgumentException if validate is true and end offset is less than the log
+     *         start offset
      */
-    RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId);
+    Optional<RawSnapshotWriter> createSnapshot(OffsetAndEpoch snapshotId, boolean validate);

Review comment:
       I don't feel too strongly about it, but I wonder if it is worth having a separate API instead of the boolean parameter here. For example, maybe `createNewSnapshot` vs `storeSnapshot` or something like that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651374491



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
         return snapshot.snapshotId();
     }
 
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastOffsetFromLog() {

Review comment:
       One of the users of this API had some confusion with this offset and the offsets return in the `Batch` by the `SnapshotReader` iterator. I wanted to make it clear that this offset and epoch refers to the offset and epoch found in the `ReplicatedLog` or `handleCommit`.
   
   While the offsets reported by the `Batch` for the `SnapshotReader` iterator are unrelated to the log's offsets.
   
   How about `lastContainedLogOffset` and `lastContainedLogEpoch`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651390233



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2285,7 +2287,11 @@ QuorumState quorum() {
     }
 
     public OptionalLong highWatermark() {
-        return quorum.highWatermark().isPresent() ? OptionalLong.of(quorum.highWatermark().get().offset) : OptionalLong.empty();
+        if (quorum.highWatermark().isPresent()) {
+            return OptionalLong.of(quorum.highWatermark().get().offset);
+        } else {
+            return OptionalLong.empty();
+        }

Review comment:
       No functional change here. Just a formatting change. Always found this line hard to read and I had to fix it :smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#issuecomment-858848250


   Created a Jira for renaming the types SnapshotWriter and SnapshotReader, and to instead add interface with the same name.
   https://issues.apache.org/jira/browse/KAFKA-12932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649273990



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -233,18 +233,40 @@ final class KafkaMetadataLog private (
     log.topicId.get
   }
 
-  override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = {
-    // Do not let the state machine create snapshots older than the latest snapshot
-    latestSnapshotId().ifPresent { latest =>
-      if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) {
-        // Since snapshots are less than the high-watermark absolute offset comparison is okay.
-        throw new IllegalArgumentException(
-          s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)"
-        )
-      }
+  override def createSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
+    if (snapshots.contains(snapshotId)) {
+      Optional.empty()
+    } else {
+      Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)))
+    }
+  }
+
+  override def createSnapshotFromEndOffset(endOffset: Long): Optional[RawSnapshotWriter] = {
+    val highWatermarkOffset = highWatermark.offset
+    if (endOffset > highWatermarkOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
+      )
+    }
+
+    if (endOffset < startOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)"
+      )
+    }
+
+    val epoch = log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match {
+      case Some(epochEntry) =>
+        epochEntry.epoch
+      case None =>
+        // Assume that the end offset falls in the current epoch since based on the check above:

Review comment:
       I removed this code. To avoid scanning the leader epoch cache, I reverted the snapshot creation API so that both the offset and the epoch is pass to `createSnapshot`. The new code just validates that the given offset and epoch are valid according to the record batches in the log and leader epoch cache.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#issuecomment-858750625


   @hachikuji thanks for the review. Updated the PR to address your comments.
   
   cc @cmccabe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651378054



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
         return snapshot.snapshotId();
     }
 
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastOffsetFromLog() {

Review comment:
       Yeah, makes sense. I was sort of considering if it would be useful to have a `SnapshotId` object. Currently we use `OffsetAndEpoch` in other cases, but maybe a separate object would let us have better names. It would also let us define inclusive and exclusive methods.
   
    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649320552



##########
File path: metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
##########
@@ -74,56 +74,49 @@ String name() {
         this.batch = null;
         this.section = null;
         this.numRecords = 0;
-        this.numWriteTries = 0;
     }
 
     /**
      * Returns the epoch of the snapshot that we are generating.
      */
     long epoch() {
-        return writer.epoch();
+        return writer.lastOffset();

Review comment:
       Yes but the names are not great. Updated the names of `SnapshotGenerator.epoch` and `SnapshotWriter.lastOffset` to `lastOffsetFromLog`. This should make it clear that the offset of the batches in the snapshots are independent of the last offset from the log that is included in the snapshot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649281362



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1009,7 +999,7 @@ private QuorumController(LogContext logContext,
             snapshotRegistry, sessionTimeoutNs, replicaPlacer);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
-        this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
+        this.snapshotGeneratorManager = new SnapshotGeneratorManager(raftClient::createSnapshot);

Review comment:
       Fair enough. Removing the `BiFunction` from the constructor. `SnapshotGeneratorManager` is an inner class so it should have access to the `raftClient`.
   
   > Was this done for testing or something?
   
   I am not sure why this was added. It is not used on tests. I think the previous code didn't have access to the `raftClient` because this code was merged before reversing the dependency between the `metadata` project and the `raft` project.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r646282221



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -247,6 +247,34 @@ final class KafkaMetadataLog private (
     FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
+  override def createSnapshotFromEndOffset(endOffset: Long): RawSnapshotWriter = {
+    val highWatermarkOffset = highWatermark.offset
+    if (endOffset > highWatermarkOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
+      )
+    }
+
+    if (endOffset < startOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) less or equal to the log start offset ($startOffset)"

Review comment:
       nit: the code is `endOffset < startOffset` but the log is `less or equal to`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r647803084



##########
File path: metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
##########
@@ -74,56 +74,49 @@ String name() {
         this.batch = null;
         this.section = null;
         this.numRecords = 0;
-        this.numWriteTries = 0;
     }
 
     /**
      * Returns the epoch of the snapshot that we are generating.
      */
     long epoch() {
-        return writer.epoch();
+        return writer.lastOffset();

Review comment:
       Is this correct? Seems likely to cause confusion if it is.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1009,7 +999,7 @@ private QuorumController(LogContext logContext,
             snapshotRegistry, sessionTimeoutNs, replicaPlacer);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
-        this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
+        this.snapshotGeneratorManager = new SnapshotGeneratorManager(raftClient::createSnapshot);

Review comment:
       Passing through the function is a tad odd. We actually could just use the implicit reference to `raftClient`. Was this done for testing or something?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -233,18 +233,40 @@ final class KafkaMetadataLog private (
     log.topicId.get
   }
 
-  override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = {
-    // Do not let the state machine create snapshots older than the latest snapshot
-    latestSnapshotId().ifPresent { latest =>
-      if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) {
-        // Since snapshots are less than the high-watermark absolute offset comparison is okay.
-        throw new IllegalArgumentException(
-          s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)"
-        )
-      }
+  override def createSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
+    if (snapshots.contains(snapshotId)) {
+      Optional.empty()
+    } else {
+      Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)))
+    }
+  }
+
+  override def createSnapshotFromEndOffset(endOffset: Long): Optional[RawSnapshotWriter] = {
+    val highWatermarkOffset = highWatermark.offset
+    if (endOffset > highWatermarkOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
+      )
+    }
+
+    if (endOffset < startOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)"
+      )
+    }
+
+    val epoch = log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match {
+      case Some(epochEntry) =>
+        epochEntry.epoch
+      case None =>
+        // Assume that the end offset falls in the current epoch since based on the check above:

Review comment:
       This confuses me a little bit. The logic in `findEpochEntryByEndOffset` returns the first epoch which has a start offset less than the end offset. Wouldn't that already cover the case of the current epoch? It seems like the case that is uncovered is when the offset is smaller than the start offset of the first cached epoch, but that should be an error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651387678



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
         return snapshot.snapshotId();
     }
 
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastOffsetFromLog() {

Review comment:
       I am up for that. Do you mind if I file a jira for this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org