You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/28 12:33:01 UTC

[GitHub] [kafka] socutes opened a new pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

socutes opened a new pull request #11138:
URL: https://github.com/apache/kafka/pull/11138


   Change the snapshot API so that SnapshotWriter and SnapshotReader are interfaces. Change the existing types SnapshotWriter and SnapshotReader to use a different name and to implement the interfaces introduced by this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes edited a comment on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes edited a comment on pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#issuecomment-898870304


   Thank you for your advice. At the code level, the final result should be the same. It's just that changing the number of lines of code might make it less, and I don't know if I have to.  @dengziming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#issuecomment-897275322


   Currently we add a `FileSnapshotReader` class and copy some original code to it, a better way is to rename `SnapshotReader` to `FileSnapshotReader` and add a `SnapshotReader` Interface which can reduce code line changes tremendously.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on a change in pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on a change in pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#discussion_r686544266



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -17,218 +17,67 @@
 
 package org.apache.kafka.snapshot;
 
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.serialization.RecordSerde;
-import org.apache.kafka.raft.internals.BatchAccumulator;
-import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
-import org.apache.kafka.common.record.ControlRecordUtils;
 
-import java.util.Optional;
 import java.util.List;
-import java.util.function.Supplier;
 
 /**
- * A type for writing a snapshot for a given end offset and epoch.
- *
- * A snapshot writer can be used to append objects until freeze is called. When freeze is
- * called the snapshot is validated and marked as immutable. After freeze is called any
- * append will fail with an exception.
- *
- * It is assumed that the content of the snapshot represents all of the records for the
- * topic partition from offset 0 up to but not including the end offset in the snapshot
- * id.
- *
- * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
+ * Interface of the snapshot writer
  */
-final public class SnapshotWriter<T> implements AutoCloseable {
-    final private RawSnapshotWriter snapshot;
-    final private BatchAccumulator<T> accumulator;
-    final private Time time;
-    final private long lastContainedLogTimestamp;
-
-    private SnapshotWriter(
-        RawSnapshotWriter snapshot,
-        int maxBatchSize,
-        MemoryPool memoryPool,
-        Time time,
-        long lastContainedLogTimestamp,
-        CompressionType compressionType,
-        RecordSerde<T> serde
-    ) {
-        this.snapshot = snapshot;
-        this.time = time;
-        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
-
-        this.accumulator = new BatchAccumulator<>(
-            snapshot.snapshotId().epoch,
-            0,
-            Integer.MAX_VALUE,
-            maxBatchSize,
-            memoryPool,
-            time,
-            compressionType,
-            serde
-        );
-    }
-
-    /**
-     * Adds a {@link SnapshotHeaderRecord} to snapshot
-     *
-     * @throws IllegalStateException if the snapshot is not empty
-     */
-    private void initializeSnapshotWithHeader() {
-        if (snapshot.sizeInBytes() != 0) {
-            String message = String.format(
-                "Initializing writer with a non-empty snapshot: id = '%s'.",
-                snapshot.snapshotId()
-            );
-            throw new IllegalStateException(message);
-        }
-
-        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
-            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
-            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
-        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
-        accumulator.forceDrain();
-    }
-
-    /**
-     * Adds a {@link SnapshotFooterRecord} to the snapshot
-     *
-     * No more records should be appended to the snapshot after calling this method
-     */
-    private void finalizeSnapshotWithFooter() {
-        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
-            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
-        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
-        accumulator.forceDrain();
-    }
-
-    /**
-     * Create an instance of this class and initialize
-     * the underlying snapshot with {@link SnapshotHeaderRecord}
-     *
-     * @param snapshot a lambda to create the low level snapshot writer
-     * @param maxBatchSize the maximum size in byte for a batch
-     * @param memoryPool the memory pool for buffer allocation
-     * @param time the clock implementation
-     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
-     * @param compressionType the compression algorithm to use
-     * @param serde the record serialization and deserialization implementation
-     * @return {@link Optional}{@link SnapshotWriter}
-     */
-    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
-        Supplier<Optional<RawSnapshotWriter>> supplier,
-        int maxBatchSize,
-        MemoryPool memoryPool,
-        Time snapshotTime,
-        long lastContainedLogTimestamp,
-        CompressionType compressionType,
-        RecordSerde<T> serde
-    ) {
-        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
-            return new SnapshotWriter<T>(
-                    snapshot,
-                    maxBatchSize,
-                    memoryPool,
-                    snapshotTime,
-                    lastContainedLogTimestamp,
-                    CompressionType.NONE,
-                    serde);
-        });
-        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
-        return writer;
-    }
+public interface SnapshotWriter<T> extends AutoCloseable {
 
     /**
      * Returns the end offset and epoch for the snapshot.
      */
-    public OffsetAndEpoch snapshotId() {
-        return snapshot.snapshotId();
-    }
+    OffsetAndEpoch snapshotId();
 
     /**
      * Returns the last log offset which is represented in the snapshot.
      */
-    public long lastContainedLogOffset() {
-        return snapshot.snapshotId().offset - 1;
-    }
+    long lastContainedLogOffset();
 
     /**
      * Returns the epoch of the last log offset which is represented in the snapshot.
      */
-    public int lastContainedLogEpoch() {
-        return snapshot.snapshotId().epoch;
-    }
+    int lastContainedLogEpoch();
 
     /**
      * Returns true if the snapshot has been frozen, otherwise false is returned.
-     *
+     * <p>
      * Modification to the snapshot are not allowed once it is frozen.
      */
-    public boolean isFrozen() {
-        return snapshot.isFrozen();
-    }
+    boolean isFrozen();
 
     /**
      * Appends a list of values to the snapshot.
-     *
+     * <p>
      * The list of record passed are guaranteed to get written together.
      *
      * @param records the list of records to append to the snapshot
      * @throws IllegalStateException if append is called when isFrozen is true
      */
-    public void append(List<T> records) {
-        if (snapshot.isFrozen()) {
-            String message = String.format(
-                "Append not supported. Snapshot is already frozen: id = '%s'.",
-                snapshot.snapshotId()
-            );
-
-            throw new IllegalStateException(message);
-        }
-
-        accumulator.append(snapshot.snapshotId().epoch, records);
-
-        if (accumulator.needsDrain(time.milliseconds())) {
-            appendBatches(accumulator.drain());
-        }
-    }
+    void append(List<T> records);
 
     /**
      * Freezes the snapshot by flushing all pending writes and marking it as immutable.
-     *
+     * <p>
      * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
      */
-    public void freeze() {
-        finalizeSnapshotWithFooter();
-        appendBatches(accumulator.drain());
-        snapshot.freeze();
-        accumulator.close();
-    }
+    void freeze();
 
     /**
      * Closes the snapshot writer.
      *
      * If close is called without first calling freeze the snapshot is aborted.
      */
-    public void close() {
-        snapshot.close();
-        accumulator.close();
-    }
+    void close();
 
-    private void appendBatches(List<CompletedBatch<T>> batches) {
-        try {
-            for (CompletedBatch<T> batch : batches) {
-                snapshot.append(batch.data);
-            }
-        } finally {
-            batches.forEach(CompletedBatch::release);
-        }
-    }
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    void initializeSnapshotWithHeader();

Review comment:
       Thanks for the warning. It really shouldn't be done. fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on a change in pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on a change in pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#discussion_r686544266



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -17,218 +17,67 @@
 
 package org.apache.kafka.snapshot;
 
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.serialization.RecordSerde;
-import org.apache.kafka.raft.internals.BatchAccumulator;
-import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
-import org.apache.kafka.common.record.ControlRecordUtils;
 
-import java.util.Optional;
 import java.util.List;
-import java.util.function.Supplier;
 
 /**
- * A type for writing a snapshot for a given end offset and epoch.
- *
- * A snapshot writer can be used to append objects until freeze is called. When freeze is
- * called the snapshot is validated and marked as immutable. After freeze is called any
- * append will fail with an exception.
- *
- * It is assumed that the content of the snapshot represents all of the records for the
- * topic partition from offset 0 up to but not including the end offset in the snapshot
- * id.
- *
- * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
+ * Interface of the snapshot writer
  */
-final public class SnapshotWriter<T> implements AutoCloseable {
-    final private RawSnapshotWriter snapshot;
-    final private BatchAccumulator<T> accumulator;
-    final private Time time;
-    final private long lastContainedLogTimestamp;
-
-    private SnapshotWriter(
-        RawSnapshotWriter snapshot,
-        int maxBatchSize,
-        MemoryPool memoryPool,
-        Time time,
-        long lastContainedLogTimestamp,
-        CompressionType compressionType,
-        RecordSerde<T> serde
-    ) {
-        this.snapshot = snapshot;
-        this.time = time;
-        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
-
-        this.accumulator = new BatchAccumulator<>(
-            snapshot.snapshotId().epoch,
-            0,
-            Integer.MAX_VALUE,
-            maxBatchSize,
-            memoryPool,
-            time,
-            compressionType,
-            serde
-        );
-    }
-
-    /**
-     * Adds a {@link SnapshotHeaderRecord} to snapshot
-     *
-     * @throws IllegalStateException if the snapshot is not empty
-     */
-    private void initializeSnapshotWithHeader() {
-        if (snapshot.sizeInBytes() != 0) {
-            String message = String.format(
-                "Initializing writer with a non-empty snapshot: id = '%s'.",
-                snapshot.snapshotId()
-            );
-            throw new IllegalStateException(message);
-        }
-
-        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
-            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
-            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
-        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
-        accumulator.forceDrain();
-    }
-
-    /**
-     * Adds a {@link SnapshotFooterRecord} to the snapshot
-     *
-     * No more records should be appended to the snapshot after calling this method
-     */
-    private void finalizeSnapshotWithFooter() {
-        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
-            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
-        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
-        accumulator.forceDrain();
-    }
-
-    /**
-     * Create an instance of this class and initialize
-     * the underlying snapshot with {@link SnapshotHeaderRecord}
-     *
-     * @param snapshot a lambda to create the low level snapshot writer
-     * @param maxBatchSize the maximum size in byte for a batch
-     * @param memoryPool the memory pool for buffer allocation
-     * @param time the clock implementation
-     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
-     * @param compressionType the compression algorithm to use
-     * @param serde the record serialization and deserialization implementation
-     * @return {@link Optional}{@link SnapshotWriter}
-     */
-    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
-        Supplier<Optional<RawSnapshotWriter>> supplier,
-        int maxBatchSize,
-        MemoryPool memoryPool,
-        Time snapshotTime,
-        long lastContainedLogTimestamp,
-        CompressionType compressionType,
-        RecordSerde<T> serde
-    ) {
-        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
-            return new SnapshotWriter<T>(
-                    snapshot,
-                    maxBatchSize,
-                    memoryPool,
-                    snapshotTime,
-                    lastContainedLogTimestamp,
-                    CompressionType.NONE,
-                    serde);
-        });
-        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
-        return writer;
-    }
+public interface SnapshotWriter<T> extends AutoCloseable {
 
     /**
      * Returns the end offset and epoch for the snapshot.
      */
-    public OffsetAndEpoch snapshotId() {
-        return snapshot.snapshotId();
-    }
+    OffsetAndEpoch snapshotId();
 
     /**
      * Returns the last log offset which is represented in the snapshot.
      */
-    public long lastContainedLogOffset() {
-        return snapshot.snapshotId().offset - 1;
-    }
+    long lastContainedLogOffset();
 
     /**
      * Returns the epoch of the last log offset which is represented in the snapshot.
      */
-    public int lastContainedLogEpoch() {
-        return snapshot.snapshotId().epoch;
-    }
+    int lastContainedLogEpoch();
 
     /**
      * Returns true if the snapshot has been frozen, otherwise false is returned.
-     *
+     * <p>
      * Modification to the snapshot are not allowed once it is frozen.
      */
-    public boolean isFrozen() {
-        return snapshot.isFrozen();
-    }
+    boolean isFrozen();
 
     /**
      * Appends a list of values to the snapshot.
-     *
+     * <p>
      * The list of record passed are guaranteed to get written together.
      *
      * @param records the list of records to append to the snapshot
      * @throws IllegalStateException if append is called when isFrozen is true
      */
-    public void append(List<T> records) {
-        if (snapshot.isFrozen()) {
-            String message = String.format(
-                "Append not supported. Snapshot is already frozen: id = '%s'.",
-                snapshot.snapshotId()
-            );
-
-            throw new IllegalStateException(message);
-        }
-
-        accumulator.append(snapshot.snapshotId().epoch, records);
-
-        if (accumulator.needsDrain(time.milliseconds())) {
-            appendBatches(accumulator.drain());
-        }
-    }
+    void append(List<T> records);
 
     /**
      * Freezes the snapshot by flushing all pending writes and marking it as immutable.
-     *
+     * <p>
      * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
      */
-    public void freeze() {
-        finalizeSnapshotWithFooter();
-        appendBatches(accumulator.drain());
-        snapshot.freeze();
-        accumulator.close();
-    }
+    void freeze();
 
     /**
      * Closes the snapshot writer.
      *
      * If close is called without first calling freeze the snapshot is aborted.
      */
-    public void close() {
-        snapshot.close();
-        accumulator.close();
-    }
+    void close();
 
-    private void appendBatches(List<CompletedBatch<T>> batches) {
-        try {
-            for (CompletedBatch<T> batch : batches) {
-                snapshot.append(batch.data);
-            }
-        } finally {
-            batches.forEach(CompletedBatch::release);
-        }
-    }
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    void initializeSnapshotWithHeader();

Review comment:
       Thanks for the warning. It really shouldn't be done. fixed! @showuon 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes edited a comment on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes edited a comment on pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#issuecomment-898870304


   Thank you for your advice. I know what you mean. At the code level, the final result should be the same. It's just that changing the number of lines of code might make it less, and I don't know if I have to.  @dengziming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#discussion_r685850971



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -17,218 +17,67 @@
 
 package org.apache.kafka.snapshot;
 
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.serialization.RecordSerde;
-import org.apache.kafka.raft.internals.BatchAccumulator;
-import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
-import org.apache.kafka.common.record.ControlRecordUtils;
 
-import java.util.Optional;
 import java.util.List;
-import java.util.function.Supplier;
 
 /**
- * A type for writing a snapshot for a given end offset and epoch.
- *
- * A snapshot writer can be used to append objects until freeze is called. When freeze is
- * called the snapshot is validated and marked as immutable. After freeze is called any
- * append will fail with an exception.
- *
- * It is assumed that the content of the snapshot represents all of the records for the
- * topic partition from offset 0 up to but not including the end offset in the snapshot
- * id.
- *
- * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
+ * Interface of the snapshot writer
  */
-final public class SnapshotWriter<T> implements AutoCloseable {
-    final private RawSnapshotWriter snapshot;
-    final private BatchAccumulator<T> accumulator;
-    final private Time time;
-    final private long lastContainedLogTimestamp;
-
-    private SnapshotWriter(
-        RawSnapshotWriter snapshot,
-        int maxBatchSize,
-        MemoryPool memoryPool,
-        Time time,
-        long lastContainedLogTimestamp,
-        CompressionType compressionType,
-        RecordSerde<T> serde
-    ) {
-        this.snapshot = snapshot;
-        this.time = time;
-        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
-
-        this.accumulator = new BatchAccumulator<>(
-            snapshot.snapshotId().epoch,
-            0,
-            Integer.MAX_VALUE,
-            maxBatchSize,
-            memoryPool,
-            time,
-            compressionType,
-            serde
-        );
-    }
-
-    /**
-     * Adds a {@link SnapshotHeaderRecord} to snapshot
-     *
-     * @throws IllegalStateException if the snapshot is not empty
-     */
-    private void initializeSnapshotWithHeader() {
-        if (snapshot.sizeInBytes() != 0) {
-            String message = String.format(
-                "Initializing writer with a non-empty snapshot: id = '%s'.",
-                snapshot.snapshotId()
-            );
-            throw new IllegalStateException(message);
-        }
-
-        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
-            .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
-            .setLastContainedLogTimestamp(lastContainedLogTimestamp);
-        accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
-        accumulator.forceDrain();
-    }
-
-    /**
-     * Adds a {@link SnapshotFooterRecord} to the snapshot
-     *
-     * No more records should be appended to the snapshot after calling this method
-     */
-    private void finalizeSnapshotWithFooter() {
-        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
-            .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
-        accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
-        accumulator.forceDrain();
-    }
-
-    /**
-     * Create an instance of this class and initialize
-     * the underlying snapshot with {@link SnapshotHeaderRecord}
-     *
-     * @param snapshot a lambda to create the low level snapshot writer
-     * @param maxBatchSize the maximum size in byte for a batch
-     * @param memoryPool the memory pool for buffer allocation
-     * @param time the clock implementation
-     * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
-     * @param compressionType the compression algorithm to use
-     * @param serde the record serialization and deserialization implementation
-     * @return {@link Optional}{@link SnapshotWriter}
-     */
-    public static <T> Optional<SnapshotWriter<T>> createWithHeader(
-        Supplier<Optional<RawSnapshotWriter>> supplier,
-        int maxBatchSize,
-        MemoryPool memoryPool,
-        Time snapshotTime,
-        long lastContainedLogTimestamp,
-        CompressionType compressionType,
-        RecordSerde<T> serde
-    ) {
-        Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
-            return new SnapshotWriter<T>(
-                    snapshot,
-                    maxBatchSize,
-                    memoryPool,
-                    snapshotTime,
-                    lastContainedLogTimestamp,
-                    CompressionType.NONE,
-                    serde);
-        });
-        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
-        return writer;
-    }
+public interface SnapshotWriter<T> extends AutoCloseable {
 
     /**
      * Returns the end offset and epoch for the snapshot.
      */
-    public OffsetAndEpoch snapshotId() {
-        return snapshot.snapshotId();
-    }
+    OffsetAndEpoch snapshotId();
 
     /**
      * Returns the last log offset which is represented in the snapshot.
      */
-    public long lastContainedLogOffset() {
-        return snapshot.snapshotId().offset - 1;
-    }
+    long lastContainedLogOffset();
 
     /**
      * Returns the epoch of the last log offset which is represented in the snapshot.
      */
-    public int lastContainedLogEpoch() {
-        return snapshot.snapshotId().epoch;
-    }
+    int lastContainedLogEpoch();
 
     /**
      * Returns true if the snapshot has been frozen, otherwise false is returned.
-     *
+     * <p>
      * Modification to the snapshot are not allowed once it is frozen.
      */
-    public boolean isFrozen() {
-        return snapshot.isFrozen();
-    }
+    boolean isFrozen();
 
     /**
      * Appends a list of values to the snapshot.
-     *
+     * <p>
      * The list of record passed are guaranteed to get written together.
      *
      * @param records the list of records to append to the snapshot
      * @throws IllegalStateException if append is called when isFrozen is true
      */
-    public void append(List<T> records) {
-        if (snapshot.isFrozen()) {
-            String message = String.format(
-                "Append not supported. Snapshot is already frozen: id = '%s'.",
-                snapshot.snapshotId()
-            );
-
-            throw new IllegalStateException(message);
-        }
-
-        accumulator.append(snapshot.snapshotId().epoch, records);
-
-        if (accumulator.needsDrain(time.milliseconds())) {
-            appendBatches(accumulator.drain());
-        }
-    }
+    void append(List<T> records);
 
     /**
      * Freezes the snapshot by flushing all pending writes and marking it as immutable.
-     *
+     * <p>
      * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
      */
-    public void freeze() {
-        finalizeSnapshotWithFooter();
-        appendBatches(accumulator.drain());
-        snapshot.freeze();
-        accumulator.close();
-    }
+    void freeze();
 
     /**
      * Closes the snapshot writer.
      *
      * If close is called without first calling freeze the snapshot is aborted.
      */
-    public void close() {
-        snapshot.close();
-        accumulator.close();
-    }
+    void close();
 
-    private void appendBatches(List<CompletedBatch<T>> batches) {
-        try {
-            for (CompletedBatch<T> batch : batches) {
-                snapshot.append(batch.data);
-            }
-        } finally {
-            batches.forEach(CompletedBatch::release);
-        }
-    }
+    /**
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    void initializeSnapshotWithHeader();

Review comment:
       This method was originally `private` method. Why should we expose it as an interface method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes closed pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes closed pull request #11138:
URL: https://github.com/apache/kafka/pull/11138


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#issuecomment-888270734


   @showuon @jsancio please help review this pr. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#issuecomment-895843976


   @dengziming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] socutes commented on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

Posted by GitBox <gi...@apache.org>.
socutes commented on pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#issuecomment-898870304


   Thank you for your advice. At the code level, the final result should be the same. It's just that changing the number of lines of code might make it less, and I don't know if I have to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org