You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/27 17:12:57 UTC

[GitHub] [kafka] jsancio opened a new pull request #9512: [DRAFT] - KAFKA-10394: generate snapshot

jsancio opened a new pull request #9512:
URL: https://github.com/apache/kafka/pull/9512


   This PR depends on https://github.com/apache/kafka/pull/9505. I will rebase this PR once https://github.com/apache/kafka/pull/9505 has been merged.
   
   This PR adds support for generating snapshot for KIP-630.
   
   1. It introduces the interface `SnapshotWriter` and implementation `KafkaSnapshotWriter` for creating snapshots on disk.
   2. It introduces the interface `SnapshotReader` and implementation `KafkaSnapshotReader for reading snapshot from disk.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r518882604



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {

Review comment:
       Ok, I think I see what you're saying. I guess I was not expecting that we would need a separate low-level interface when we already have `FileRecords` and `MemoryRecords`. I think `Records` already gives us a way to read from arbitrary positions:
   ```java
       long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;
   ```
   But there is no `Records` implementation currently that allows unaligned writes. We only have this:
   ```java
       public int append(MemoryRecords records) throws IOException;
   ```
   Perhaps it would be possible to extend `Records` to provide what we need instead of creating a new interface?
   
   As far as the naming, I wonder if we can reserve the nicer `SnapshotXXX` names for the state machine. Really what I would like is a common type that can be used by both `handleSnapshot` and `handleCommit` since both callbacks just need to provide a way to read through a set of records. I was thinking it could be `BatchReader`, but it doesn't have to be if there is something better.
   
   (By the way, it's not too clear to me why we need `freeze` when we already have `close`.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535574184



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path tempSnapshotPath;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path tempSnapshotPath,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.tempSnapshotPath = tempSnapshotPath;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!tempSnapshotPath.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
+        }
+
+        Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+        Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       Also, I would not rely on reading the code to assume one way or another. You'd want to test it too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {

Review comment:
       This comment applies to some of your other observations. At high-level there are 4 use cases that we need to design and implement. Two use cases are for the Raft implementation. Two use cases are for the state machine.
   
   ### Raft implementation
   These types are internal to the raft implementation and don't have to be exposed to the state machine.
   #### Leader Use Case
   The leader needs to be able to send a part of the snapshot over the network. Something like this
   ```java
   interface SnapshotReader extends Closeable {
       long transferTo(long position, long maxBytes, WritableChannel channel);
       int read(ByteBuffer buffer, long position);
   }
   ```
   #### Follower Use Case
   The followers need to be able to copy bytes from the network and validate the snapshot on disk when fetching is done.
   ```java
   interface SnapshotWriter extends Closeable {
       void append(ByteBuffer buffer);
       void validate();
       void freeze();
   }
   ```
   ### State machine implementation
   These types are exposed to the state machine.
   #### Load Snapshot
   The state machine needs to be able to load/scan the entire snapshot. The state machine can use `close` to tell the raft client that it finished loading the snapshot. This will be implemented in a future PR but it could look like this:
   ```java
   interface BatchedSnapshotReader<T> extends Iterable<Iterable<T>>, Closeable {
   }
   ```
   #### Generate Snapshot
   The state machine needs to be able to generate a snapshot by appending records/values and marking the snapshot as immutable (`freeze`) when it is done.
   ```java
   interface BatchdSnapshotWriter<T> extends Closeable {
      void append(Iterable<T> records);
      void freeze();
   }
   ```
   
   `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will have a real implementation and a mocked implementation for testing. These two types are internal to raft and are not exposed to the state machine.
   
   `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on `SnapshotWriter` and `SnapshotReade` to reuse some code but this is not strictly required. These two type don't have to be `interfaces` if they delegate the IO to `SnapshotWriter` and `SnapshotReader`.
   
   What do you think @hachikuji?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534486002



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       Not sure if the snapshots will have much of an impact on this. It's not like we'll be storing more than a couple of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534590620



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       @jsancio @hachikuji we saw DeleteRecords calls became pretty expensive and tied up request handler threads when made on big partition directories and it scanned the directory to find deletable snapshot files. If we keep references to these snapshots in memory after log open this won't be a big deal to place them in the same directory. That's what we ended up doing for the producer state snapshots and it worked well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703625



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {
+
+    public OffsetAndEpoch snapshotId();
+
+    public long sizeInBytes();
+
+    public Iterator<RecordBatch> iterator();
+
+    public int read(ByteBuffer buffer, long position) throws IOException;

Review comment:
       I think I address this in this comment https://github.com/apache/kafka/pull/9512#discussion_r517703078. If you agree let's move the conversation there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535482262



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -74,18 +74,19 @@ public void freeze() throws IOException {
         frozen = true;
 
         // Set readonly and ignore the result
-        if (!path.toFile().setReadOnly()) {
-            throw new IOException(String.format("Unable to set file %s as read-only", path));
+        if (!tempSnapshotPath.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
         }
 
-        Path destination = Snapshots.moveRename(path, snapshotId);
-        Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);
+        Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+        Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE);
     }
 
     @Override
     public void close() throws IOException {
         channel.close();
-        Files.deleteIfExists(path);
+        // This is a noop if freeze was called before calling close
+        Files.deleteIfExists(tempSnapshotPath);

Review comment:
       Yes!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535572033



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path tempSnapshotPath;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path tempSnapshotPath,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.tempSnapshotPath = tempSnapshotPath;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!tempSnapshotPath.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
+        }
+
+        Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+        Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       It's not only Windows, NFS also has some restrictions. Why don't you want to use the utility method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534581287



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -472,4 +489,110 @@ private EpochStartOffset(int epoch, long startOffset) {
         }
     }
 
+    final class MockRawSnapshotWriter implements RawSnapshotWriter {
+        private final OffsetAndEpoch snapshotId;
+        private ByteBuffer data;

Review comment:
       Cool. Yeah, let's use that. So many util goodies.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519211721



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+/**
+ * Interface for reading snapshots as a sequence of records.
+ */
+public interface RawSnapshotReader extends Closeable, Iterable<RecordBatch> {
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId();
+
+    /**
+     * Returns the number of bytes for the snapshot.
+     *
+     * @throws IOException for any IO error while reading the size
+     */
+    public long sizeInBytes() throws IOException;
+
+    /**
+     * Reads bytes from position into the given buffer.
+     *
+     * It is not guarantee that the given buffer will be filled.
+     *
+     * @param buffer byte buffer to put the read files
+     * @param position the starting position in the snapshot to read
+     * @return the number of bytes read
+     * @throws IOException for any IO error while reading the snapshot
+     */
+    public int read(ByteBuffer buffer, long position) throws IOException;

Review comment:
       As @hachikuji and I discussed offline, it is very like that this method will change to:
   
   ```java
   public BaseRecords slice(long position) throws IOException;
   ```
   
   After we implement https://issues.apache.org/jira/browse/KAFKA-10694




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534587344



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       Done. Removed the `snapshots` directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533829583



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";
+    private static final String SUFFIX =  ".snapshot";

Review comment:
       Good catch. I mentioned using `.checkpoint` in KIP-630. I forgot to change it here. I'll change it to that but let me know if you have a preference.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534607264



##########
File path: raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.stream.IntStream;
+import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public final class FileRawSnapshotTest {
+    @Test
+    public void testWritingSnapshot() throws IOException {
+        Path tempDir = TestUtils.tempDirectory().toPath();
+        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
+        int bufferSize = 256;
+        int batches = 10;
+        int expectedSize = 0;
+
+        try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
+            assertEquals(0, snapshot.sizeInBytes());
+
+            MemoryRecords records = buildRecords(new ByteBuffer[] {ByteBuffer.wrap(randomBytes(bufferSize))});

Review comment:
       nit: can just do `buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)))`

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path tempSnapshotPath;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path tempSnapshotPath,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.tempSnapshotPath = tempSnapshotPath;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!tempSnapshotPath.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
+        }
+
+        Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+        Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       It might be useful to review the history behind `Utils.atomicMoveWithFallback`. It's not clear to me why this case is different from some of the other situations that it is used.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -74,18 +74,19 @@ public void freeze() throws IOException {
         frozen = true;
 
         // Set readonly and ignore the result
-        if (!path.toFile().setReadOnly()) {
-            throw new IOException(String.format("Unable to set file %s as read-only", path));
+        if (!tempSnapshotPath.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
         }
 
-        Path destination = Snapshots.moveRename(path, snapshotId);
-        Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);
+        Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+        Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE);
     }
 
     @Override
     public void close() throws IOException {
         channel.close();
-        Files.deleteIfExists(path);
+        // This is a noop if freeze was called before calling close
+        Files.deleteIfExists(tempSnapshotPath);

Review comment:
       Would it be useful to call this in a `finally`?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Iterator;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotReader implements RawSnapshotReader {
+    private final FileRecords fileRecords;
+    private final OffsetAndEpoch snapshotId;
+
+    private FileRawSnapshotReader(FileRecords fileRecords, OffsetAndEpoch snapshotId) {
+        this.fileRecords = fileRecords;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() {
+        return fileRecords.sizeInBytes();
+    }
+
+    @Override
+    public Iterator<RecordBatch> iterator() {
+        return new Iterator<RecordBatch>() {

Review comment:
       In that case, I would probably just do the cast to avoid the useless wrapper. Maybe we could create a helper like this in `Utils`:
   
   ```java
       @SuppressWarnings("unchecked")
       private <S, T extends S> Iterator<S> covariantCast(Iterator<T> iterator) {
           return (Iterator<S>) iterator;
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517672247



##########
File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.snapshot
+
+import java.nio.ByteBuffer
+import java.nio.file.Path
+import java.util.{Iterator => JIterator}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.snapshot.SnapshotReader
+
+final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: OffsetAndEpoch) extends SnapshotReader {

Review comment:
       Sounds good. I'll move this code over and implement it in Java.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r518396177



##########
File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.snapshot
+
+import java.nio.ByteBuffer
+import java.nio.file.Path
+import java.util.{Iterator => JIterator}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.snapshot.SnapshotReader
+
+final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: OffsetAndEpoch) extends SnapshotReader {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r537757487



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -472,4 +490,106 @@ private EpochStartOffset(int epoch, long startOffset) {
         }
     }
 
+    final class MockRawSnapshotWriter implements RawSnapshotWriter {
+        private final OffsetAndEpoch snapshotId;
+        private ByteBufferOutputStream data;
+        private boolean frozen;
+
+        public MockRawSnapshotWriter(OffsetAndEpoch snapshotId) {
+            this.snapshotId = snapshotId;
+            this.data = new ByteBufferOutputStream(0);
+            this.frozen = false;
+        }
+
+        @Override
+        public OffsetAndEpoch snapshotId() {
+            return snapshotId;
+        }
+
+        @Override
+        public long sizeInBytes() {
+            return data.position();
+        }
+
+        @Override
+        public void append(ByteBuffer buffer) {
+            if (frozen) {
+                throw new RuntimeException("Snapshot is already frozen " + snapshotId);
+            }
+
+            data.write(buffer);
+        }
+
+        @Override
+        public boolean isFrozen() {
+            return frozen;
+        }
+
+        @Override
+        public void freeze() {
+            if (frozen) {
+                throw new RuntimeException("Snapshot is already frozen " + snapshotId);
+            }
+
+            frozen = true;
+            ByteBuffer buffer = data.buffer();
+            buffer.flip();
+
+            snapshots.putIfAbsent(snapshotId, new MockRawSnapshotReader(snapshotId, buffer));
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    final static class MockRawSnapshotReader implements RawSnapshotReader {
+        private final OffsetAndEpoch snapshotId;
+        private final MemoryRecords data;
+
+        MockRawSnapshotReader(OffsetAndEpoch snapshotId, ByteBuffer data) {
+            this.snapshotId = snapshotId;
+            this.data = MemoryRecords.readableRecords(data);
+        }
+
+        @Override
+        public OffsetAndEpoch snapshotId() {
+            return snapshotId;
+        }
+
+        @Override
+        public long sizeInBytes() {
+            return data.sizeInBytes();
+        }
+
+        @Override
+        public Iterator<RecordBatch> iterator() {
+            return new Iterator<RecordBatch>() {

Review comment:
       nit: we cab use `covariantCast`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534485293



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";
+    private static final String SUFFIX =  ".snapshot";

Review comment:
       Sounds fine to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r518916392



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {

Review comment:
       > Perhaps it would be possible to extend Records to provide what we need instead of creating a new interface?
   
   I think you are right that we don't need `SnapshotReader` as `Records` provides all of the functionality we need. `SnapshotReader` was added so that we didn't expose all of the mutating APIs in `FileRecords`. Snapshot are supposed to be immutable once frozen.
   
   I think we still want `SnapshotWriter` or something similar as it provides 1. raw writes of bytes and 2. `freeze` which optionally marks the snapshot as immutable if the validation passes.
   
   For 1., I don't think we should add raw writes to `Records` or `FileRecords` as in essence we are exposing an unsafe API and the user needs to make sure that they are writing the correct data.
   
   For 2., I think we can get away from introducing a new type/interface and instead add that functionality as a static method in `Snapshots.java`. Unittest (mock tests) maybe difficult with this code organization.
   
   > As far as the naming, I wonder if we can reserve the nicer SnapshotXXX names for the state machine.
   
   Yes.I'll use the prettier name for the type exposed to the state machine.
   
   > Really what I would like is a common type that can be used by both handleSnapshot and handleCommit since both callbacks just need to provide a way to read through a set of records. I was thinking it could be BatchReader, but it doesn't have to be if there is something better.
   
   I think this should be possible. I haven't implemented this part so I don't have all of the details. I think the requirement for the type sent through `handleSnapshot` are a subset of the requirements for the type sent through `handleCommit`. In particular, the Raft Client (`ListenerContext`) only needs to know when the snapshot has been read fully (e.g. `close`). The Raft Client doesn't need so to know what is the "last seen offset + 1" as it already knows the `SnapshotId` sent through `handleSnapshot`.
   
   > (By the way, it's not too clear to me why we need freeze when we already have close.)
   
   I wanted to allow the user to abort a snapshot. If `close` is called without calling `freeze` then the partial snapshot is deleted and not made immutable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r516989380



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {
+
+    public OffsetAndEpoch snapshotId();
+
+    public long sizeInBytes();
+
+    public Iterator<RecordBatch> iterator();
+
+    public int read(ByteBuffer buffer, long position) throws IOException;

Review comment:
       So I guess we need this in order to be able to serve `FetchSnapshot` requests. I am wondering if it would be better to pull it into a separate object. Maybe we can have something like this:
   ```java
   class SnapshotReader extends BatchReader;
   
   class Snapshot extends Iterable<Batch> {
     public SnapshotReader iterator();
     public int read(ByteBuffer buffer, long position) throws IOException;
   }
   ```

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {

Review comment:
       It would be nice if we can figure out how to consolidate this and `BatchReader`. It seems like it should be doable. 

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/BatchedSnapshotWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+
+// TODO: Write documentation for this type and all of the methods
+final public class BatchedSnapshotWriter<T> implements Closeable {

Review comment:
       The types surprised me a bit. I would have expected this to extend `SnapshotWriter`. I'm wondering why we don't push the type `<T>` up to `SnapshotWriter` and try to keep `Records` out of the interfaces?

##########
File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.snapshot
+
+import java.nio.ByteBuffer
+import java.nio.file.Path
+import java.util.{Iterator => JIterator}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.snapshot.SnapshotReader
+
+final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: OffsetAndEpoch) extends SnapshotReader {

Review comment:
       So far we have preferred to keep implementations in the `raft` package except in cases where we depended on something that was in the server code (such as `Log`). Is there any reason to do snapshots differently? I think we would still be able to use the snapshot code from `raft` for other internal use cases (like __consumer_offsets).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533833428



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       I think this depends on if we need to scan the snapshot directory. Unfortunately, I don't have a concrete answer at the moment. When we implement the changes to the rest of the raft client. Log truncation, updating the start offset and LEO, we may need to scan the snapshot/checkpoint folder to determine the greatest log start offset and LEO. @lbradstreet suggested storing them in a different directory as part of the KIP-630 review process as Kafka already have a few files in the partition log directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9512:
URL: https://github.com/apache/kafka/pull/9512


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519211586



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+/**
+ * Interface for writing snapshot as a sequence of records.
+ */
+public interface RawSnapshotWriter extends Closeable {
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId();
+
+    /**
+     * Returns the number of bytes for the snapshot.
+     *
+     * @throws IOException for any IO error while reading the size
+     */
+    public long sizeInBytes() throws IOException;
+
+    /**
+     * Fully appends the buffer to the snapshot.
+     *
+     * If the method returns without an exception the given buffer was fully writing the
+     * snapshot.
+     *
+     * @param buffer the buffer to append
+     * @throws IOException for any IO error during append
+     */
+    public void append(ByteBuffer buffer) throws IOException;

Review comment:
       As @hachikuji and I discussed offline, it is very like that this method will change to:
   
   ```java
   public void append(MemoryRecords records) throws IOException;
   ```
   
   After we implement https://issues.apache.org/jira/browse/KAFKA-10694




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534592624



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -100,4 +102,15 @@ default void handleResign() {}
      */
     CompletableFuture<Void> shutdown(int timeoutMs);
 
+    /**
+     * Create a writable snapshot file for a given offset and epoch.
+     *
+     * The RaftClient assumes that the snapshot return will contain the records up to but
+     * not including the end offset in the snapshot id. See {@link SnapshotWriter} for
+     * details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
       Create https://issues.apache.org/jira/browse/KAFKA-10800




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519213671



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {

Review comment:
       @hachikuji and I discussed this offline. The latest PR should reflect our discussion.
   
   The interfaces `RawSnapshotWriter` and `RawSnapshotReader` are expected to change after we address https://issues.apache.org/jira/browse/KAFKA-10694




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533835857



##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch endOffset) {
         return OptionalLong.of(truncationOffset);
     }
 
+    /**
+     * Create a writable snapshot for the given snapshot id.
+     *
+     * See {@link RawSnapshotWriter} for details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot
+     * @return a writable snapshot
+     */
+    RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException;

Review comment:
       Yeah. I think we will need that when we implement deleting snapshot. Do you mind if I add this later?
   
   Also, I think we are going to need a `readLatestSnapshot()` when the state machine (controller or broker) needs to load the latest valid snapshot. I was planning to add this later when the case was clear to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519211390



##########
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##########
@@ -32,25 +32,25 @@
  * This class is responsible for managing the current state of this node and ensuring only
  * valid state transitions.
  *
- * Unattached =>
+ * Unattached transitions to:

Review comment:
       Changes to this file are unrelated to this PR. Made them here so that `docsJar` would succeed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535474040



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path tempSnapshotPath;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path tempSnapshotPath,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.tempSnapshotPath = tempSnapshotPath;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!tempSnapshotPath.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
+        }
+
+        Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+        Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       I investigated the issue a bit. It look like this utility was introduce to replace these lines in `OffsetCheckpoint.scala`:
   ```scala
   // swap new offset checkpoint file with previous one
   if(!temp.renameTo(file)) {
       // renameTo() fails on Windows if the destination file exists.
       file.delete()
       if(!temp.renameTo(file))
           throw new IOException(...)
   }
   ```
   
   Looking at the JDK implementation in Windows, `ATOMIC_MOVE` should work in Windows if the target file exists: https://github.com/openjdk/jdk/blob/master/src/java.base/windows/classes/sun/nio/fs/WindowsFileCopy.java#L298-L312
   
   In other words, I think we can keep the code as is in this PR.
   
   cc @ijuma 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534577659



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path path;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path path,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.path = path;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already frozen: id = %s; path = %s", snapshotId, path)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!path.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file %s as read-only", path));
+        }
+
+        Path destination = Snapshots.moveRename(path, snapshotId);
+        Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       I prefer to keep it this way. The atomic move (rename) should always succeed because we guarantee it is within the same directory. I think I prefer for Kafka to throw an exception than for the raft client or state machine to see a partial snapshot file because it performed a file copy instead of a file rename.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534590620



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       @jsancio @hachikuji we saw DeleteRecords calls became pretty expensive and tied up request handler threads when made on big partition directories and it scanned the directory to find deletable snapshot files. If we keep references to these snapshots in memory after log open this won't be a big deal. That's what we ended up doing for the producer state snapshots and it worked well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534487681



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       My preference is probably to keep things flat for now until we figure out what we want the long-term structure to look like. I guess it comes down to the implementation, but intuitively, the only time we'd need to scan would be on startup and we have to do that anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533732789



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";
+    private static final String SUFFIX =  ".snapshot";

Review comment:
       This suffix is used for producer state snapshots already. Maybe we could use `.snap` or something like that.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch endOffset) {
         return OptionalLong.of(truncationOffset);
     }
 
+    /**
+     * Create a writable snapshot for the given snapshot id.
+     *
+     * See {@link RawSnapshotWriter} for details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot
+     * @return a writable snapshot
+     */
+    RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException;

Review comment:
       Do we also need an api to list snapshots?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path path;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path path,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.path = path;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already frozen: id = %s; path = %s", snapshotId, path)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!path.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file %s as read-only", path));
+        }
+
+        Path destination = Snapshots.moveRename(path, snapshotId);
+        Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       Wonder if we should consider using `Utils.atomicMoveWithFallback`?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       Is there a tangible benefit to separating snapshots into a new directory? Currently the log directory is a flat structure. I'm wondering if we should stick with convention.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -100,4 +102,15 @@ default void handleResign() {}
      */
     CompletableFuture<Void> shutdown(int timeoutMs);
 
+    /**
+     * Create a writable snapshot file for a given offset and epoch.
+     *
+     * The RaftClient assumes that the snapshot return will contain the records up to but
+     * not including the end offset in the snapshot id. See {@link SnapshotWriter} for
+     * details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
       Do we assume the offset is below the high watermark?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Iterator;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotReader implements RawSnapshotReader {
+    private final FileRecords fileRecords;
+    private final OffsetAndEpoch snapshotId;
+
+    private FileRawSnapshotReader(FileRecords fileRecords, OffsetAndEpoch snapshotId) {
+        this.fileRecords = fileRecords;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() {
+        return fileRecords.sizeInBytes();
+    }
+
+    @Override
+    public Iterator<RecordBatch> iterator() {
+        return new Iterator<RecordBatch>() {

Review comment:
       nit: kind of annoying to wrap the iterator just to avoid the generic warning. An alternative might be to use `<? extends RecordBatch>` in the interface.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+
+/**
+ * A type for writing a snapshot fora given end offset and epoch.
+ *
+ * A snapshot writer can be used to append objects until freeze is called. When freeze is
+ * called the snapshot is validated and marked as immutable. After freeze is called any
+ * append will fail with an exception.
+ *
+ * It is assumed that the content of the snapshot represents all of the records for the
+ * topic partition from offset 0 up to but not including the end offset in the snapshot
+ * id.
+ *
+ * @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch)
+ */
+final public class SnapshotWriter<T> implements Closeable {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+
+    /**
+     * Initializes a new instance of the class.
+     *
+     * @param snapshot the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     */
+    public SnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns true if the snapshot has been frozen, otherwise false is returned.
+     *
+     * Modification to the snapshot are not allowed once it is frozen.
+     */
+    public boolean isFrozen() {
+        return snapshot.isFrozen();
+    }
+
+    /**
+     * Appends a list of values to the snapshot.
+     *
+     * The list of record passed are guaranteed to get written together.
+     *
+     * @param records the list of records to append to the snapshot
+     * @throws IOException for any IO error while appending
+     * @throws IllegalStateException if append is called when isFrozen is true
+     */
+    public void append(List<T> records) throws IOException {
+        if (snapshot.isFrozen()) {
+            String message = String.format(
+                "Append not supported. Snapshot is already frozen: id = {}.",
+                snapshot.snapshotId()
+            );
+
+            throw new IllegalStateException(message);
+        }
+
+        accumulator.append(snapshot.snapshotId().epoch, records);
+
+        if (!accumulator.needsDrain(time.milliseconds())) {

Review comment:
       nit: I think this would be more natural:
   ```java
   if (accumulator.needsDrain(time.milliseconds())) {
     appendBatches(accumulator.drain());
   }
   ```

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path path;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path path,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.path = path;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already frozen: id = %s; path = %s", snapshotId, path)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!path.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file %s as read-only", path));
+        }
+
+        Path destination = Snapshots.moveRename(path, snapshotId);
+        Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);
+    }
+
+    @Override
+    public void close() throws IOException {
+        channel.close();
+        Files.deleteIfExists(path);

Review comment:
       Maybe we can just a better name for `path` since it makes this code look suspicious.

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -472,4 +489,110 @@ private EpochStartOffset(int epoch, long startOffset) {
         }
     }
 
+    final class MockRawSnapshotWriter implements RawSnapshotWriter {
+        private final OffsetAndEpoch snapshotId;
+        private ByteBuffer data;

Review comment:
       We could probably use `ByteBufferOutputStream` which already handles expansion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534485088



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -100,4 +102,15 @@ default void handleResign() {}
      */
     CompletableFuture<Void> shutdown(int timeoutMs);
 
+    /**
+     * Create a writable snapshot file for a given offset and epoch.
+     *
+     * The RaftClient assumes that the snapshot return will contain the records up to but
+     * not including the end offset in the snapshot id. See {@link SnapshotWriter} for
+     * details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
       Just curious. Thought it might be worth mentioning the expectation in the javadoc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703953



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/BatchedSnapshotWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+
+// TODO: Write documentation for this type and all of the methods
+final public class BatchedSnapshotWriter<T> implements Closeable {

Review comment:
       I cover some of the motivation for this here: https://github.com/apache/kafka/pull/9512#discussion_r517703078.
   
   Let's move the conversation there if you agree.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable<RecordBatch> {

Review comment:
       This comment applies to some of your other observations. At high-level there are 4 use cases that we need to design and implement. Two use cases are for the Raft implementation. Two use cases are for the state machine.
   
   ### Raft implementation
   These types are internal to the raft implementation and don't have to be exposed to the state machine.
   #### Leader Use Case
   The leader needs to be able to send a part of the snapshot over the network. Something like this
   ```java
   interface SnapshotReader extends Closeable {
       long transferTo(long position, long maxBytes, WritableChannel channel);
       int read(ByteBuffer buffer, long position);
   }
   ```
   #### Follower Use Case
   The followers need to be able to copy bytes from the network and validate the snapshot on disk when fetching is done.
   ```java
   interface SnapshotWriter extends Closeable {
       void append(ByteBuffer buffer);
       void validate();
       void freeze();
   }
   ```
   ### State machine implementation
   These types are exposed to the state machine.
   #### Load Snapshot
   The state machine needs to be able to load/scan the entire snapshot. The state machine can use `close` to tell the raft client that it finished loading the snapshot. This will be implemented in a future PR but it could look like this:
   ```java
   interface BatchedSnapshotReader<T> extends Iterable<Iterable<T>>, Closeable {
   }
   ```
   #### Generate Snapshot
   The state machine needs to be able to generate a snapshot by appending records/values and marking the snapshot as immutable (`freeze`) when it is done.
   ```java
   interface BatchdSnapshotWriter<T> extends Closeable {
      void append(Iterable<T> records);
      void freeze();
   }
   ```
   
   ### Notes
   
   `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will have a real implementation and a mocked implementation for testing. These two types are internal to raft and are not exposed to the state machine.
   
   `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on `SnapshotWriter` and `SnapshotReade` to reuse some code but this is not strictly required. These two type don't have to be `interfaces` if they delegate the IO to `SnapshotWriter` and `SnapshotReader`.
   
   What do you think @hachikuji?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533828329



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -100,4 +102,15 @@ default void handleResign() {}
      */
     CompletableFuture<Void> shutdown(int timeoutMs);
 
+    /**
+     * Create a writable snapshot file for a given offset and epoch.
+     *
+     * The RaftClient assumes that the snapshot return will contain the records up to but
+     * not including the end offset in the snapshot id. See {@link SnapshotWriter} for
+     * details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
       Is there a specific reason why you are asking this? We don't currently check for this. I will add a check for this and we can relax this later if we need to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534489751



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -100,4 +102,15 @@ default void handleResign() {}
      */
     CompletableFuture<Void> shutdown(int timeoutMs);
 
+    /**
+     * Create a writable snapshot file for a given offset and epoch.
+     *
+     * The RaftClient assumes that the snapshot return will contain the records up to but
+     * not including the end offset in the snapshot id. See {@link SnapshotWriter} for
+     * details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
       Yeah. I'll create a Jira to validate the snapshot id. We should perform at least the following validations:
   1. it is less than the high-watermark.
   2. it is a valid epoch and end offset based on the log. I think we can use the leader epoch cache to check this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534572296



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Iterator;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotReader implements RawSnapshotReader {
+    private final FileRecords fileRecords;
+    private final OffsetAndEpoch snapshotId;
+
+    private FileRawSnapshotReader(FileRecords fileRecords, OffsetAndEpoch snapshotId) {
+        this.fileRecords = fileRecords;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() {
+        return fileRecords.sizeInBytes();
+    }
+
+    @Override
+    public Iterator<RecordBatch> iterator() {
+        return new Iterator<RecordBatch>() {

Review comment:
       I am not sure if we can do that. The interface `RawSnapshotReader` extends `Iterable<RecordBatch>`. `Iterable` defines that method as `Iterator<T> iterator()`. As you point out, they should have defined it as `Iterator<? extends T>  iterator()` since `Iterator` is covariant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org