You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2021/03/10 00:18:49 UTC

[GitHub] [bookkeeper] sursingh opened a new pull request #2641: Add checksum validation for SST files

sursingh opened a new pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641


   
   ### Motivation
   
   Normally the SST file are immutable. A SST file from previous checkpoint can
   be reused in subsequent checkpoints. This fact is used to avoid unnecessary
   upload of SST files.
   
   However there are scenarios in which just the name comparison doesn't work.
   It is possible that the checkpoint process doesn't complete (due to
   crash/restart). In such cases the stale SST files are left behind. When the
   storage container is restarted, it will be correctly restored from previous
   checkpoint. When we do a checkpoint on this new state, a new SST files
   are created. Since we only compare the SST file names, we assume that files is
   already available in the checkpoint store.
   
   At best the size of the new files will mismatch, and restore will fail. But if
   the size of the files match, restore will succeed and we will have invalid
   data in the state store.
   
   ### Changes
   
   With this change we are adding the checksum for the SST files. The checksum
   will be appended to the name when the SST file is uploaded. This will ensure
   that the correct files are always uploaded
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] merlimat merged pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r591738969



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       No, that will not work. The files need to get renamed to *.sst for the rocksdb restore process to work. Also there are other places where we check if the name ends with ".sst" to determine if this is a `sst` file. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r592822551



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       I don't think that will help here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r591626406



##########
File path: stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
##########
@@ -349,6 +347,65 @@ public void testCheckpointOrder() throws Exception {
         }
     }
 
+    @Test
+    public void testStaleSSTFile() throws Exception {
+        final int numKvs = 100;
+        TestStateStore testStore = new TestStateStore(
+            runtime.getMethodName(), localDir, remoteDir, true, false);
+
+        store.close();
+
+        testStore.enableCheckpoints(true);
+        testStore.init();
+
+        testStore.addNumKVs("transaction-1", numKvs, 0);
+        // create base checkpoint
+        String baseCheckpoint = testStore.checkpoint("checkpoint-1");
+        testStore.restore();
+
+        testStore.addNumKVs("transaction-2", numKvs, 100);
+        // create failed checkpoint
+        String failedCheckpoint = testStore.checkpoint("checkpoint-2");
+        // Remove metadata from the checkpoint to signal failure
+
+        CheckpointInfo checkpoint = testStore.getLatestCheckpoint();
+        testStore.corruptCheckpoint(checkpoint);
+
+        // restore : this should restore from base checkpoint
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-1", testStore.get("transaction-id"));
+
+        testStore.addNumKVs("transaction-3", numKvs * 3, 200);
+        // create another test checkpoint
+        String newCheckpoint = testStore.checkpoint("checkpoint-3");
+
+        // Ensure latest checkpoint can be restored.
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-3", testStore.get("transaction-id"));
+    }
+
+    @Test
+    public void testRestoreOldCheckpointWithoutChecksum() throws Exception {

Review comment:
       @eolivelli : This is enabled by default. The change does take care of restoring from old checkpoints without checksum. Andrey's concern is about downgrading from new version to old version that doesn't support checksum. In that case, we will not be able to restore from the checkpoint. Will look into it and see if we can seamlessly support this.

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    CheckpointFile(File checkpointDir, String filename) {
+        this.file = new File(checkpointDir, filename);
+        this.checksum = null;

Review comment:
       Thanks. Will look into it.

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();

Review comment:
       I can add the timestamp to the invalid checksum




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#issuecomment-802085803


   rerun failed tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r591677921



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       I see, thank you. 
   
   Let's say the name+checksum gets added to both FileInfos and Files.
   Will this be good enough for the older bookie to use the checkpoint?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r592576912



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       would it help if we used relative path there? like "./{checksum}/../00009.sst"?
   I am not sure if [normalize()](https://docs.oracle.com/javase/8/docs/api/java/nio/file/Path.html#normalize--) happens automatically or not when the new file created.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r591625882



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
##########
@@ -144,64 +149,38 @@ public String checkpoint(byte[] txid) throws StateStoreException {
         }
     }
 
-    private List<File> getFilesToCopy(File checkpointedDir) throws IOException {
-        File[] files = checkpointedDir.listFiles();
-
-        List<File> fileToCopy = Lists.newArrayListWithExpectedSize(files.length);
-        for (File file : files) {
-            if (RocksUtils.isSstFile(file)) {
-                // sst files
-                String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
-                // TODO: do more validation on the file
-                if (!checkpointStore.fileExists(destSstPath)) {
-                    fileToCopy.add(file);
-                }
-            } else {
-                fileToCopy.add(file);
-            }
-        }
-        return fileToCopy;
-    }
-
-    private void copyFilesToDest(String checkpointId, List<File> files) throws IOException {
-        for (File file : files) {
-            copyFileToDest(checkpointId, file);
-        }
-
-    }
-
     /**
      * All sst files are copied to checkpoint location first.
      */
-    private void copyFileToDest(String checkpointId, File file) throws IOException {
-        String destPath = RocksUtils.getDestPath(dbPrefix, checkpointId, file);
-        try (OutputStream os = checkpointStore.openOutputStream(destPath)) {
-            Files.copy(file, os);
+    private void copyFilesToDest(String checkpointId, List<CheckpointFile> files) throws IOException {
+        for (CheckpointFile file : files) {
+            file.copyToRemote(checkpointStore, dbPrefix, checkpointId);
         }
     }
 
     /**
      * Move the sst files to a common location.
      */
-    private void finalizeCopyFiles(String checkpointId, List<File> files) throws IOException {
-        for (File file : files) {
-            if (RocksUtils.isSstFile(file)) {
-                String destSstTempPath = RocksUtils.getDestTempSstPath(
-                    dbPrefix, checkpointId, file);
-                String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
-                checkpointStore.rename(destSstTempPath, destSstPath);
-            }
+    private void finalizeCopyFiles(String checkpointId,
+                                   List<CheckpointFile> files,
+                                   boolean enableChecksum) throws IOException {
+        for (CheckpointFile file : files) {
+            file.finalize(checkpointStore, dbPrefix, checkpointId, enableChecksum);
         }
     }
 
-    private void finalizeCheckpoint(String checkpointId,
-                                    File checkpointedDir,
-                                    byte[] txid) throws IOException {
-        File[] files = checkpointedDir.listFiles();
+    private void finalizeCheckpoint(List<CheckpointFile> files,
+                                    String checkpointId,
+                                    byte[] txid,
+                                    boolean enableChecksum) throws IOException {
 
         CheckpointMetadata.Builder metadataBuilder = CheckpointMetadata.newBuilder();
-        for (File file : files) {
-            metadataBuilder.addFiles(file.getName());
+        for (CheckpointFile file : files) {
+            if (enableChecksum) {
+                metadataBuilder.addFileInfos(file.getFileInfo());
+            } else {
+                metadataBuilder.addFiles(file.getName());

Review comment:
       Will look into adding an option to retain the files without checksum and add config to control. This can be enabled by default.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r591626945



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       The rocksdb already has validation to verify the contents of the SST files. Here we want to ensure that we use the correct SST file with the checkpoint. Essentially we are giving the SST files a unique name based on their contents.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#issuecomment-797144957


   @dlg99 : Added support to allow downgrade to older version. There is a config knob that is enabled by default. This will create files with regular names to allow old versions to restore from the checkpoint.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] eolivelli commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r591165841



##########
File path: stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
##########
@@ -349,6 +347,65 @@ public void testCheckpointOrder() throws Exception {
         }
     }
 
+    @Test
+    public void testStaleSSTFile() throws Exception {
+        final int numKvs = 100;
+        TestStateStore testStore = new TestStateStore(
+            runtime.getMethodName(), localDir, remoteDir, true, false);
+
+        store.close();
+
+        testStore.enableCheckpoints(true);
+        testStore.init();
+
+        testStore.addNumKVs("transaction-1", numKvs, 0);
+        // create base checkpoint
+        String baseCheckpoint = testStore.checkpoint("checkpoint-1");
+        testStore.restore();
+
+        testStore.addNumKVs("transaction-2", numKvs, 100);
+        // create failed checkpoint
+        String failedCheckpoint = testStore.checkpoint("checkpoint-2");
+        // Remove metadata from the checkpoint to signal failure
+
+        CheckpointInfo checkpoint = testStore.getLatestCheckpoint();
+        testStore.corruptCheckpoint(checkpoint);
+
+        // restore : this should restore from base checkpoint
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-1", testStore.get("transaction-id"));
+
+        testStore.addNumKVs("transaction-3", numKvs * 3, 200);
+        // create another test checkpoint
+        String newCheckpoint = testStore.checkpoint("checkpoint-3");
+
+        // Ensure latest checkpoint can be restored.
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-3", testStore.get("transaction-id"));
+    }
+
+    @Test
+    public void testRestoreOldCheckpointWithoutChecksum() throws Exception {

Review comment:
       +1
   yes, please add a compatibility test,
   also can you please state in the description how to enable this feature ?
   
   would it be possible to enable this feature by default and deal with old SST files that are missing the checksum ?
   otherwise I bet that no one will use this feature




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r590871590



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    CheckpointFile(File checkpointDir, String filename) {
+        this.file = new File(checkpointDir, filename);
+        this.checksum = null;
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    CheckpointFile(File checkpointDir, FileInfo fileInfo) {
+        this.file = new File(checkpointDir, fileInfo.getName());
+        this.checksum = fileInfo.getChecksum();
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review comment:
       fyi `@EqualsAndHashCode` lombok annotation on the class can take care of both equals and hashCode

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
##########
@@ -144,64 +149,38 @@ public String checkpoint(byte[] txid) throws StateStoreException {
         }
     }
 
-    private List<File> getFilesToCopy(File checkpointedDir) throws IOException {
-        File[] files = checkpointedDir.listFiles();
-
-        List<File> fileToCopy = Lists.newArrayListWithExpectedSize(files.length);
-        for (File file : files) {
-            if (RocksUtils.isSstFile(file)) {
-                // sst files
-                String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
-                // TODO: do more validation on the file
-                if (!checkpointStore.fileExists(destSstPath)) {
-                    fileToCopy.add(file);
-                }
-            } else {
-                fileToCopy.add(file);
-            }
-        }
-        return fileToCopy;
-    }
-
-    private void copyFilesToDest(String checkpointId, List<File> files) throws IOException {
-        for (File file : files) {
-            copyFileToDest(checkpointId, file);
-        }
-
-    }
-
     /**
      * All sst files are copied to checkpoint location first.
      */
-    private void copyFileToDest(String checkpointId, File file) throws IOException {
-        String destPath = RocksUtils.getDestPath(dbPrefix, checkpointId, file);
-        try (OutputStream os = checkpointStore.openOutputStream(destPath)) {
-            Files.copy(file, os);
+    private void copyFilesToDest(String checkpointId, List<CheckpointFile> files) throws IOException {
+        for (CheckpointFile file : files) {
+            file.copyToRemote(checkpointStore, dbPrefix, checkpointId);
         }
     }
 
     /**
      * Move the sst files to a common location.
      */
-    private void finalizeCopyFiles(String checkpointId, List<File> files) throws IOException {
-        for (File file : files) {
-            if (RocksUtils.isSstFile(file)) {
-                String destSstTempPath = RocksUtils.getDestTempSstPath(
-                    dbPrefix, checkpointId, file);
-                String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
-                checkpointStore.rename(destSstTempPath, destSstPath);
-            }
+    private void finalizeCopyFiles(String checkpointId,
+                                   List<CheckpointFile> files,
+                                   boolean enableChecksum) throws IOException {
+        for (CheckpointFile file : files) {
+            file.finalize(checkpointStore, dbPrefix, checkpointId, enableChecksum);
         }
     }
 
-    private void finalizeCheckpoint(String checkpointId,
-                                    File checkpointedDir,
-                                    byte[] txid) throws IOException {
-        File[] files = checkpointedDir.listFiles();
+    private void finalizeCheckpoint(List<CheckpointFile> files,
+                                    String checkpointId,
+                                    byte[] txid,
+                                    boolean enableChecksum) throws IOException {
 
         CheckpointMetadata.Builder metadataBuilder = CheckpointMetadata.newBuilder();
-        for (File file : files) {
-            metadataBuilder.addFiles(file.getName());
+        for (CheckpointFile file : files) {
+            if (enableChecksum) {
+                metadataBuilder.addFileInfos(file.getFileInfo());
+            } else {
+                metadataBuilder.addFiles(file.getName());

Review comment:
       As I understand, this (moving into `else` part) + use of getNameWithChecksum() is not backwards compatible.
   As in: BK v.N (without checksum support) vs v.N+1 (with checksum)
   upgrade from N to N+1, experiment, downgrade => bookie cannot restore the checkpoint.
   This is a major version upgrade, no path to downgrade etc.
   If this is the only option to proceed we'll need to discuss this in the mailing list + provide documentation/upgrade instructions (backup etc), possibly do something else.
   
   Making this backwards compatible also lets you avoid the need for `enableChecksum` check everywhere.
   Always save the checksum (FileInfos) and Files, always validate if checksums are present but the trick with getNameWithChecksum() won't work. 
   One option would be to save both files - with and without checksum in the name unless "disableCompatibility" is explicitly on in the config.
   

##########
File path: stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
##########
@@ -349,6 +347,65 @@ public void testCheckpointOrder() throws Exception {
         }
     }
 
+    @Test
+    public void testStaleSSTFile() throws Exception {
+        final int numKvs = 100;
+        TestStateStore testStore = new TestStateStore(
+            runtime.getMethodName(), localDir, remoteDir, true, false);
+
+        store.close();
+
+        testStore.enableCheckpoints(true);
+        testStore.init();
+
+        testStore.addNumKVs("transaction-1", numKvs, 0);
+        // create base checkpoint
+        String baseCheckpoint = testStore.checkpoint("checkpoint-1");
+        testStore.restore();
+
+        testStore.addNumKVs("transaction-2", numKvs, 100);
+        // create failed checkpoint
+        String failedCheckpoint = testStore.checkpoint("checkpoint-2");
+        // Remove metadata from the checkpoint to signal failure
+
+        CheckpointInfo checkpoint = testStore.getLatestCheckpoint();
+        testStore.corruptCheckpoint(checkpoint);
+
+        // restore : this should restore from base checkpoint
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-1", testStore.get("transaction-id"));
+
+        testStore.addNumKVs("transaction-3", numKvs * 3, 200);
+        // create another test checkpoint
+        String newCheckpoint = testStore.checkpoint("checkpoint-3");
+
+        // Ensure latest checkpoint can be restored.
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-3", testStore.get("transaction-id"));
+    }
+
+    @Test
+    public void testRestoreOldCheckpointWithoutChecksum() throws Exception {

Review comment:
       Please add a test for backwards compatibility (checkpoint with checksum, restore when checksum is disabled). or a backwards compat test

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       I am probably missing something obvious.
   I expected checksum validation after file copy from remote, is it happening at some other place (implicitly via equals, as part of some map/set check?)? 

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();

Review comment:
       what if checksum is `"invalid"` as returned by `computeChecksum()` in case of error?
   I.e. it is invalid when saved to the checkpoint and when verified later, will the code pass the checksum check?
   "invalid_<timestamp_here>" will avoid that, I bet there is a more elegant solution.

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    CheckpointFile(File checkpointDir, String filename) {
+        this.file = new File(checkpointDir, filename);
+        this.checksum = null;

Review comment:
       This is a very non-obvious side-effect of using this constructor. 
   I'd love to see it expressed more explicitly but don't have better ideas than doing something like  
   ```
   // checksum
   CheckpointFile cp = (new CheckpointFile(.....)).withChecksum();
   
   // no checksum
   CheckpointFile cp = new CheckpointFile(.....);
   
   ```
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] sursingh commented on a change in pull request #2641: Add checksum validation for SST files

Posted by GitBox <gi...@apache.org>.
sursingh commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r592000403



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       Let me see if I can add some clarity.
   * Restore code get the list of files from the `metadata` file (which is the first file we restore). All files other than SST files are stored under the checkpoint directory. SST files however are stored under common "ssts" directory. The code uses the extension of the file to determine if the the file is SST or nor. If it is a SST file, it will copy it from common "ssts" folder. If we add checksum to names of file added to `files` field in metadata, this code will fail and will not be able to copy the files from the checkpoint store.
   * Now we can work around the above by prefixing the checksum (e.g 00009.sst -> {checksum}_00009.sst). The code will correctly detect this as a SST file and copy it from the common "ssts" folder. These files however would still need to be renamed ({checksum}_00009.sst -> 00009.sst) locally. This is the files that rocksdb will be looking for. Since there is no code in the previous version that can do that, it will not work.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org