You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/23 08:10:04 UTC

[GitHub] [kafka] socutes opened a new pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

socutes opened a new pull request #11529:
URL: https://github.com/apache/kafka/pull/11529


   Change the snapshot API so that SnapshotWriter and SnapshotReader are interfaces. Change the existing types SnapshotWriter and SnapshotReader to use a different name and to implement the interfaces introduced by this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on a change in pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on a change in pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#discussion_r757876976



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotReader.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+import java.util.Iterator;
+import org.apache.kafka.raft.Batch;
+
+/**
+ * Interface of the snapshot reader
+ */
+public interface FileSnapshotReader<T> extends AutoCloseable, Iterator<Batch<T>> {

Review comment:
       > Excuse the confusion as to what was expected in this change. Hope it is clear now.
   
   I'm really sorry to misunderstand what you mean. I think I know how to do that, okay!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#discussion_r756176949



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotReader.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+import java.util.Iterator;
+import org.apache.kafka.raft.Batch;
+
+/**
+ * Interface of the snapshot reader
+ */
+public interface FileSnapshotReader<T> extends AutoCloseable, Iterator<Batch<T>> {

Review comment:
       How about naming this interface `SnapshotReader` and renaming the class `SnapshotReader` to `RecordsSnapshotReader`

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotWriter.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+
+import java.util.List;
+public interface FileSnapshotWriter<T> extends AutoCloseable {

Review comment:
       How about naming this interface `SnapshotWriter` and renaming the class `SnapshotWriter` to `RecordsSnapshotWriter`

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotWriter.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+
+import java.util.List;
+public interface FileSnapshotWriter<T> extends AutoCloseable {

Review comment:
       Let's _move_ the documentation in `class SnapshotWriter` to this interface.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotWriter.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+
+import java.util.List;
+public interface FileSnapshotWriter<T> extends AutoCloseable {
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    OffsetAndEpoch snapshotId();
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    long lastContainedLogOffset();
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    int lastContainedLogEpoch();
+
+    /**
+     * Returns true if the snapshot has been frozen, otherwise false is returned.
+     * <p>
+     * Modification to the snapshot are not allowed once it is frozen.
+     */
+    boolean isFrozen();
+
+    /**
+     * Appends a list of values to the snapshot.
+     * <p>

Review comment:
       I think we can remove the `<p>` HTML tag. This occurs 3 times in this file.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotReader.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+import java.util.Iterator;
+import org.apache.kafka.raft.Batch;

Review comment:
       Please fix the import order by sorting these lines.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotWriter.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+
+import java.util.List;

Review comment:
       Please fix the import formatting. Need a space between the `package` line and the `import` lines. Need a space betwee the `import` lines and the `interface` line.
   
   Need to also short the import lines.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -74,6 +74,7 @@ private SnapshotWriter(
             compressionType,
             serde
         );
+        initializeSnapshotWithHeader();

Review comment:
       Ideally, we shouldn't change the semantic of this type in this PR. Why are we moving this here from the `static` method?

##########
File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##########
@@ -123,8 +123,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
     val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
+    val user4 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "segment")

Review comment:
       Are the changes to this file needed?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotReader.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+import java.util.Iterator;
+import org.apache.kafka.raft.Batch;
+
+/**
+ * Interface of the snapshot reader
+ */

Review comment:
       Let's _move_ the documentation in `class SnapshotReader` to this interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#discussion_r757955172



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshot.snapshotId().offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {
+        return snapshot.snapshotId().epoch;
+    }
+
+    /**
+     * Returns true if the snapshot has been frozen, otherwise false is returned.
+     *
+     * Modification to the snapshot are not allowed once it is frozen.
+     */
+    public boolean isFrozen() {
+        return snapshot.isFrozen();
+    }
+
+    /**
+     * Appends a list of values to the snapshot.
+     *
+     * The list of record passed are guaranteed to get written together.
+     *
+     * @param records the list of records to append to the snapshot
+     * @throws IllegalStateException if append is called when isFrozen is true
+     */
+    public void append(List<T> records) {
+        if (snapshot.isFrozen()) {
+            String message = String.format(
+                "Append not supported. Snapshot is already frozen: id = '%s'.",
+                snapshot.snapshotId()
+            );
+
+            throw new IllegalStateException(message);
+        }
+
+        accumulator.append(snapshot.snapshotId().epoch, records);
+
+        if (accumulator.needsDrain(time.milliseconds())) {
+            appendBatches(accumulator.drain());
+        }
+    }
+
+    /**
+     * Freezes the snapshot by flushing all pending writes and marking it as immutable.
+     *
+     * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
+     */
+    public void freeze() {
+        finalizeSnapshotWithFooter();
+        appendBatches(accumulator.drain());
+        snapshot.freeze();
+        accumulator.close();
+    }
+
+    /**
+     * Closes the snapshot writer.
+     *
+     * If close is called without first calling freeze the snapshot is aborted.
+     */
+    public void close() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -183,52 +65,25 @@ public boolean isFrozen() {
      * @param records the list of records to append to the snapshot
      * @throws IllegalStateException if append is called when isFrozen is true
      */
-    public void append(List<T> records) {
-        if (snapshot.isFrozen()) {
-            String message = String.format(
-                "Append not supported. Snapshot is already frozen: id = '%s'.",
-                snapshot.snapshotId()
-            );
-
-            throw new IllegalStateException(message);
-        }
-
-        accumulator.append(snapshot.snapshotId().epoch, records);
-
-        if (accumulator.needsDrain(time.milliseconds())) {
-            appendBatches(accumulator.drain());
-        }
-    }
+    void append(List<T> records);
 
     /**
      * Freezes the snapshot by flushing all pending writes and marking it as immutable.
      *
      * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
      */
-    public void freeze() {
-        finalizeSnapshotWithFooter();
-        appendBatches(accumulator.drain());
-        snapshot.freeze();
-        accumulator.close();
-    }
+    void freeze();
 
     /**
      * Closes the snapshot writer.
      *
      * If close is called without first calling freeze the snapshot is aborted.
      */
-    public void close() {
-        snapshot.close();
-        accumulator.close();
-    }
+    void close();
+
+    /**
+     * Initialize the snapshot with header
+     */
+    void initializeSnapshotWithHeader();

Review comment:
       Let's remove this method `initializeSnapshotWithHeader` from the interface.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshot.snapshotId().offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {
+        return snapshot.snapshotId().epoch;
+    }
+
+    /**
+     * Returns true if the snapshot has been frozen, otherwise false is returned.
+     *
+     * Modification to the snapshot are not allowed once it is frozen.
+     */
+    public boolean isFrozen() {
+        return snapshot.isFrozen();
+    }
+
+    /**
+     * Appends a list of values to the snapshot.
+     *
+     * The list of record passed are guaranteed to get written together.
+     *
+     * @param records the list of records to append to the snapshot
+     * @throws IllegalStateException if append is called when isFrozen is true
+     */
+    public void append(List<T> records) {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshot.snapshotId().offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {
+        return snapshot.snapshotId().epoch;
+    }
+
+    /**
+     * Returns true if the snapshot has been frozen, otherwise false is returned.
+     *
+     * Modification to the snapshot are not allowed once it is frozen.
+     */
+    public boolean isFrozen() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {

Review comment:
       Let's keep this as a private method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshot.snapshotId().offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {
+        return snapshot.snapshotId().epoch;
+    }
+
+    /**
+     * Returns true if the snapshot has been frozen, otherwise false is returned.
+     *
+     * Modification to the snapshot are not allowed once it is frozen.
+     */
+    public boolean isFrozen() {
+        return snapshot.isFrozen();
+    }
+
+    /**
+     * Appends a list of values to the snapshot.
+     *
+     * The list of record passed are guaranteed to get written together.
+     *
+     * @param records the list of records to append to the snapshot
+     * @throws IllegalStateException if append is called when isFrozen is true
+     */
+    public void append(List<T> records) {
+        if (snapshot.isFrozen()) {
+            String message = String.format(
+                "Append not supported. Snapshot is already frozen: id = '%s'.",
+                snapshot.snapshotId()
+            );
+
+            throw new IllegalStateException(message);
+        }
+
+        accumulator.append(snapshot.snapshotId().epoch, records);
+
+        if (accumulator.needsDrain(time.milliseconds())) {
+            appendBatches(accumulator.drain());
+        }
+    }
+
+    /**
+     * Freezes the snapshot by flushing all pending writes and marking it as immutable.
+     *
+     * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
+     */
+    public void freeze() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */

Review comment:
       Please remove the documentation from all of the methods that override methods already documented in `SnapshotWriter`.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.RecordsIterator;
+
+public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
+    private final OffsetAndEpoch snapshotId;
+    private final RecordsIterator<T> iterator;
+
+    private Optional<Batch<T>> nextBatch = Optional.empty();
+    private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
+
+    private RecordsSnapshotReader(
+        OffsetAndEpoch snapshotId,
+        RecordsIterator<T> iterator
+    ) {
+        this.snapshotId = snapshotId;
+        this.iterator = iterator;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;

Review comment:
       When you make `initializeSnapshotWithHeader` private, you may need to slightly change this implementation. E.g.:
   
   ```java
           return supplier.get().map(snapshot -> {
               RecordsSnapshotWriter<T> writer = new RecordsSnapshotWriter<>(
                       snapshot,
                       maxBatchSize,
                       memoryPool,
                       snapshotTime,
                       lastContainedLogTimestamp,
                       CompressionType.NONE,
                       serde);
               writer.initializeSnapshotWithHeader();
   
               return writer;
           });
   ```

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.record.ControlRecordUtils;
+
+import java.util.Optional;
+import java.util.List;
+import java.util.function.Supplier;
+
+final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+    final private long lastContainedLogTimestamp;
+
+    private RecordsSnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    public void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            String message = String.format(
+                "Initializing writer with a non-empty snapshot: id = '%s'.",
+                snapshot.snapshotId()
+            );
+            throw new IllegalStateException(message);
+        }
+
+        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
+            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
+        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Adds a {@link SnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this method
+     */
+    private void finalizeSnapshotWithFooter() {
+        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
+            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
+        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Create an instance of this class and initialize
+     * the underlying snapshot with {@link SnapshotHeaderRecord}
+     *
+     * @param snapshot a lambda to create the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     * @return {@link Optional}{@link RecordsSnapshotWriter}
+     */
+    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
+        Supplier<Optional<RawSnapshotWriter>> supplier,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time snapshotTime,
+        long lastContainedLogTimestamp,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
+            return new RecordsSnapshotWriter<T>(
+                    snapshot,
+                    maxBatchSize,
+                    memoryPool,
+                    snapshotTime,
+                    lastContainedLogTimestamp,
+                    CompressionType.NONE,
+                    serde);
+        });
+        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
+        return writer;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshot.snapshotId().offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.RecordsIterator;
+
+public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
+    private final OffsetAndEpoch snapshotId;
+    private final RecordsIterator<T> iterator;
+
+    private Optional<Batch<T>> nextBatch = Optional.empty();
+    private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
+
+    private RecordsSnapshotReader(
+        OffsetAndEpoch snapshotId,
+        RecordsIterator<T> iterator
+    ) {
+        this.snapshotId = snapshotId;
+        this.iterator = iterator;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshotId.offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {
+        return snapshotId.epoch;
+    }
+
+    /**
+     * Returns the timestamp of the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogTimestamp() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.RecordsIterator;
+
+public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
+    private final OffsetAndEpoch snapshotId;
+    private final RecordsIterator<T> iterator;
+
+    private Optional<Batch<T>> nextBatch = Optional.empty();
+    private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
+
+    private RecordsSnapshotReader(
+        OffsetAndEpoch snapshotId,
+        RecordsIterator<T> iterator
+    ) {
+        this.snapshotId = snapshotId;
+        this.iterator = iterator;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshotId.offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.RecordsIterator;
+
+public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
+    private final OffsetAndEpoch snapshotId;
+    private final RecordsIterator<T> iterator;
+
+    private Optional<Batch<T>> nextBatch = Optional.empty();
+    private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
+
+    private RecordsSnapshotReader(
+        OffsetAndEpoch snapshotId,
+        RecordsIterator<T> iterator
+    ) {
+        this.snapshotId = snapshotId;
+        this.iterator = iterator;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {

Review comment:
       Add the `@Overrride` annotation to this method.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.RecordsIterator;
+
+public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
+    private final OffsetAndEpoch snapshotId;
+    private final RecordsIterator<T> iterator;
+
+    private Optional<Batch<T>> nextBatch = Optional.empty();
+    private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
+
+    private RecordsSnapshotReader(
+        OffsetAndEpoch snapshotId,
+        RecordsIterator<T> iterator
+    ) {
+        this.snapshotId = snapshotId;
+        this.iterator = iterator;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */

Review comment:
       Please remove the documentation from all of the methods that override methods already documented in `SnapshotReader`.
   

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.raft.internals.RecordsIterator;
+
+public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
+    private final OffsetAndEpoch snapshotId;
+    private final RecordsIterator<T> iterator;
+
+    private Optional<Batch<T>> nextBatch = Optional.empty();
+    private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
+
+    private RecordsSnapshotReader(
+        OffsetAndEpoch snapshotId,
+        RecordsIterator<T> iterator
+    ) {
+        this.snapshotId = snapshotId;
+        this.iterator = iterator;
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogOffset() {
+        return snapshotId.offset - 1;
+    }
+
+    /**
+     * Returns the epoch of the last log offset which is represented in the snapshot.
+     */
+    public int lastContainedLogEpoch() {
+        return snapshotId.epoch;
+    }
+
+    /**
+     * Returns the timestamp of the last log offset which is represented in the snapshot.
+     */
+    public long lastContainedLogTimestamp() {
+        if (!lastContainedLogTimestamp.isPresent()) {
+            nextBatch.ifPresent(batch -> {
+                throw new IllegalStateException(
+                    String.format(
+                        "nextBatch was present when last contained log timestamp was not present",
+                        batch
+                    )
+                );
+            });
+            nextBatch = nextBatch();
+        }
+
+        return lastContainedLogTimestamp.getAsLong();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!nextBatch.isPresent()) {
+            nextBatch = nextBatch();
+        }
+
+        return nextBatch.isPresent();
+    }
+
+    @Override
+    public Batch<T> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException("Snapshot reader doesn't have any more elements");
+        }
+
+        Batch<T> batch = nextBatch.get();
+        nextBatch = Optional.empty();
+
+        return batch;
+    }
+
+    /**
+     * Closes the snapshot reader.
+     */
+    public void close() {

Review comment:
       Add the `@Overrride` annotation to this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
jsancio merged pull request #11529:
URL: https://github.com/apache/kafka/pull/11529


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes edited a comment on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes edited a comment on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-982225008


   > Looks great @socutes . Let's resolve the conflict against `trunk` and I think we should be ready to merge after that.
   
   PTAL. Thank you very much for your review. I think I took care of it on my end.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on a change in pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on a change in pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#discussion_r757224604



##########
File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##########
@@ -123,8 +123,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
     val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
+    val user4 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "segment")

Review comment:
       Yes, you're right! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-981312323


   @jsancio PTAL. The recommended changes have been updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#discussion_r756184332



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileSnapshotWriter.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+
+import java.util.List;

Review comment:
       Please fix the import formatting. Need a space between the `package` line and the `import` lines. Need a space betwee the `import` lines and the `interface` line.
   
   Need to also sort the import lines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#discussion_r757663204



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
##########
@@ -14,17 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.snapshot;
 
+import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.OffsetAndEpoch;
+
 import java.util.Iterator;
-import org.apache.kafka.raft.Batch;
 
 /**
- * Interface of the snapshot reader
+ * A type for reading an immutable snapshot.
+ *
+ * A snapshot reader can be used to scan through all of the objects T in a snapshot. It
+ * is assumed that the content of the snapshot represents all of the objects T for the topic
+ * partition from offset 0 up to but not including the end offset in the snapshot id.
+ *
+ * The offsets ({@code baseOffset()} and {@code lastOffset()} stored in {@code Batch<T>}
+ * objects returned by this iterator are independent of the offset of the records in the
+ * log used to generate this batch.
+ *
+ * Use {@code lastContainedLogOffset()} and {@code lastContainedLogEpoch()} to query which
+ * offsets and epoch from the log are included in this snapshot. Both of these values are
+ * inclusive.
  */
-public interface FileSnapshotReader<T> extends AutoCloseable, Iterator<Batch<T>> {
+public interface RecordsSnapshotReader<T> extends AutoCloseable, Iterator<Batch<T>> {

Review comment:
       This interface should be named `public interface SnapshotReader<T> ...`.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -37,14 +52,12 @@
 
     /**
      * Returns true if the snapshot has been frozen, otherwise false is returned.
-     * <p>

Review comment:
       Please a newline between the two Java Doc section. This comment applies to two other places in this file. E.g.
   ```
       /**
        * Returns true if the snapshot has been frozen, otherwise false is returned.
        *
        * Modification to the snapshot are not allowed once it is frozen.
        */
   ```

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -366,15 +367,15 @@ void createSnapshotGenerator(long committedOffset, int committedEpoch, long comm
                     )
                 );
             }
-            Optional<SnapshotWriter<ApiMessageAndVersion>> writer = raftClient.createSnapshot(
+            Optional<RecordsSnapshotWriter<ApiMessageAndVersion>> writer = raftClient.createSnapshot(
                 committedOffset,
                 committedEpoch,
                 committedTimestamp
             );
             if (writer.isPresent()) {
                 generator = new SnapshotGenerator(
                     logContext,
-                    writer.get(),
+                        (SnapshotWriter<ApiMessageAndVersion>) writer.get(),

Review comment:
       Please remove this cast. With the changes I suggested in my other comments the type of `writer` will be `Optional<SnapshotWriter>`. `SnapshotGenerator` should expect the type `SnapshotWriter<ApiMessageAndVersion>` for the second argument to the constructor.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -15,11 +15,26 @@
  * limitations under the License.
  */
 package org.apache.kafka.snapshot;
+
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
 
 import java.util.List;
-public interface FileSnapshotWriter<T> extends AutoCloseable {
+
+/**
+ * A type for writing a snapshot for a given end offset and epoch.
+ *
+ * A snapshot writer can be used to append objects until freeze is called. When freeze is
+ * called the snapshot is validated and marked as immutable. After freeze is called any
+ * append will fail with an exception.
+ *
+ * It is assumed that the content of the snapshot represents all of the records for the
+ * topic partition from offset 0 up to but not including the end offset in the snapshot
+ * id.
+ *
+ * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
+ */
+public interface RecordsSnapshotWriter<T> extends AutoCloseable {

Review comment:
       This interface should be name `public interface SnapshotWriter<T> ...`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -49,23 +49,23 @@
 
         /**
          * Callback which is invoked when the Listener needs to load a snapshot.
-         * It is the responsibility of this implementation to invoke {@link FileSnapshotReader#close()}
+         * It is the responsibility of this implementation to invoke {@link RecordsSnapshotReader#close()}
          * after consuming the reader.
          *
          * When handling this call, the implementation must assume that all previous calls
          * to {@link #handleCommit} contain invalid data.
          *
          * @param reader snapshot reader instance which must be iterated and closed
          */
-        void handleSnapshot(FileSnapshotReader<T> reader);
+        void handleSnapshot(RecordsSnapshotReader<T> reader);

Review comment:
       The type of the `reader` argument should be `SnapshotReader<T>`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -219,5 +219,5 @@ default void beginShutdown() {}
      * @throws IllegalArgumentException if the committed offset is greater than the high-watermark
      *         or less than the log start offset.
      */
-    Optional<FileSnapshotWriter<T>> createSnapshot(long committedOffset, int committedEpoch, long lastContainedLogTime);
+    Optional<RecordsSnapshotWriter<T>> createSnapshot(long committedOffset, int committedEpoch, long lastContainedLogTime);

Review comment:
       The returned type should be `Optional<SnapshotWriter<T>>`.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -27,22 +27,7 @@
 import org.apache.kafka.server.common.serialization.RecordSerde;
 import org.apache.kafka.raft.internals.RecordsIterator;
 
-/**
- * A type for reading an immutable snapshot.
- *
- * A snapshot reader can be used to scan through all of the objects T in a snapshot. It
- * is assumed that the content of the snapshot represents all of the objects T for the topic
- * partition from offset 0 up to but not including the end offset in the snapshot id.
- *
- * The offsets ({@code baseOffset()} and {@code lastOffset()} stored in {@code Batch<T>}
- * objects returned by this iterator are independent of the offset of the records in the
- * log used to generate this batch.
- *
- * Use {@code lastContainedLogOffset()} and {@code lastContainedLogEpoch()} to query which
- * offsets and epoch from the log are included in this snapshot. Both of these values are
- * inclusive.
- */
-public final class SnapshotReader<T> implements FileSnapshotReader<T> {
+public final class SnapshotReader<T> implements RecordsSnapshotReader<T> {

Review comment:
       Please change the name of this class to `RecordsSnapshotReader<T>`.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -32,20 +32,7 @@
 import java.util.List;
 import java.util.function.Supplier;
 
-/**
- * A type for writing a snapshot for a given end offset and epoch.
- *
- * A snapshot writer can be used to append objects until freeze is called. When freeze is
- * called the snapshot is validated and marked as immutable. After freeze is called any
- * append will fail with an exception.
- *
- * It is assumed that the content of the snapshot represents all of the records for the
- * topic partition from offset 0 up to but not including the end offset in the snapshot
- * id.
- *
- * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
- */
-final public class SnapshotWriter<T> implements FileSnapshotWriter<T> {
+final public class SnapshotWriter<T> implements RecordsSnapshotWriter<T> {

Review comment:
       Please change the name of this class to `RecordsSnapshotWriter<T>`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-982225008


   Thank you very much for your review. I think I took care of it on my end.
   
   > Looks great @socutes . Let's resolve the conflict against `trunk` and I think we should be ready to merge after that.
   
   PTAL. Thank you very much for your review. I think I took care of it on my end.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-976288375


   @jsancio @cmccabe  ,PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on a change in pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on a change in pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#discussion_r757882490



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
##########
@@ -37,14 +52,12 @@
 
     /**
      * Returns true if the snapshot has been frozen, otherwise false is returned.
-     * <p>

Review comment:
       Thank you. I often ignore this formatting problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-977674849


   @dajac @showuon  PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-979758153


   @jsancio  Thank you very much for your review. The modification has been made according to your suggestion, please help to review it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-981071032


   @jsancio PTAL.  I tried to modify it as I understood it, because changing SnapshotReader from a class to an interface required a lot of code to add or remove.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-981062241


   > Excuse the confusion as to what was expected in this change. Hope it is clear now.
   
   I'm really sorry to misunderstand what you mean. I think I know how to do that, okay!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11529: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #11529:
URL: https://github.com/apache/kafka/pull/11529#issuecomment-982159193


   Looks great @socutes . Let's resolve the conflict against `trunk` and I think we should be ready to merge after that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org