You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2021/03/10 02:27:16 UTC

[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2641: Add checksum validation for SST files

dlg99 commented on a change in pull request #2641:
URL: https://github.com/apache/bookkeeper/pull/2641#discussion_r590871590



##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    CheckpointFile(File checkpointDir, String filename) {
+        this.file = new File(checkpointDir, filename);
+        this.checksum = null;
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    CheckpointFile(File checkpointDir, FileInfo fileInfo) {
+        this.file = new File(checkpointDir, fileInfo.getName());
+        this.checksum = fileInfo.getChecksum();
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review comment:
       fyi `@EqualsAndHashCode` lombok annotation on the class can take care of both equals and hashCode

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
##########
@@ -144,64 +149,38 @@ public String checkpoint(byte[] txid) throws StateStoreException {
         }
     }
 
-    private List<File> getFilesToCopy(File checkpointedDir) throws IOException {
-        File[] files = checkpointedDir.listFiles();
-
-        List<File> fileToCopy = Lists.newArrayListWithExpectedSize(files.length);
-        for (File file : files) {
-            if (RocksUtils.isSstFile(file)) {
-                // sst files
-                String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
-                // TODO: do more validation on the file
-                if (!checkpointStore.fileExists(destSstPath)) {
-                    fileToCopy.add(file);
-                }
-            } else {
-                fileToCopy.add(file);
-            }
-        }
-        return fileToCopy;
-    }
-
-    private void copyFilesToDest(String checkpointId, List<File> files) throws IOException {
-        for (File file : files) {
-            copyFileToDest(checkpointId, file);
-        }
-
-    }
-
     /**
      * All sst files are copied to checkpoint location first.
      */
-    private void copyFileToDest(String checkpointId, File file) throws IOException {
-        String destPath = RocksUtils.getDestPath(dbPrefix, checkpointId, file);
-        try (OutputStream os = checkpointStore.openOutputStream(destPath)) {
-            Files.copy(file, os);
+    private void copyFilesToDest(String checkpointId, List<CheckpointFile> files) throws IOException {
+        for (CheckpointFile file : files) {
+            file.copyToRemote(checkpointStore, dbPrefix, checkpointId);
         }
     }
 
     /**
      * Move the sst files to a common location.
      */
-    private void finalizeCopyFiles(String checkpointId, List<File> files) throws IOException {
-        for (File file : files) {
-            if (RocksUtils.isSstFile(file)) {
-                String destSstTempPath = RocksUtils.getDestTempSstPath(
-                    dbPrefix, checkpointId, file);
-                String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
-                checkpointStore.rename(destSstTempPath, destSstPath);
-            }
+    private void finalizeCopyFiles(String checkpointId,
+                                   List<CheckpointFile> files,
+                                   boolean enableChecksum) throws IOException {
+        for (CheckpointFile file : files) {
+            file.finalize(checkpointStore, dbPrefix, checkpointId, enableChecksum);
         }
     }
 
-    private void finalizeCheckpoint(String checkpointId,
-                                    File checkpointedDir,
-                                    byte[] txid) throws IOException {
-        File[] files = checkpointedDir.listFiles();
+    private void finalizeCheckpoint(List<CheckpointFile> files,
+                                    String checkpointId,
+                                    byte[] txid,
+                                    boolean enableChecksum) throws IOException {
 
         CheckpointMetadata.Builder metadataBuilder = CheckpointMetadata.newBuilder();
-        for (File file : files) {
-            metadataBuilder.addFiles(file.getName());
+        for (CheckpointFile file : files) {
+            if (enableChecksum) {
+                metadataBuilder.addFileInfos(file.getFileInfo());
+            } else {
+                metadataBuilder.addFiles(file.getName());

Review comment:
       As I understand, this (moving into `else` part) + use of getNameWithChecksum() is not backwards compatible.
   As in: BK v.N (without checksum support) vs v.N+1 (with checksum)
   upgrade from N to N+1, experiment, downgrade => bookie cannot restore the checkpoint.
   This is a major version upgrade, no path to downgrade etc.
   If this is the only option to proceed we'll need to discuss this in the mailing list + provide documentation/upgrade instructions (backup etc), possibly do something else.
   
   Making this backwards compatible also lets you avoid the need for `enableChecksum` check everywhere.
   Always save the checksum (FileInfos) and Files, always validate if checksums are present but the trick with getNameWithChecksum() won't work. 
   One option would be to save both files - with and without checksum in the name unless "disableCompatibility" is explicitly on in the config.
   

##########
File path: stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
##########
@@ -349,6 +347,65 @@ public void testCheckpointOrder() throws Exception {
         }
     }
 
+    @Test
+    public void testStaleSSTFile() throws Exception {
+        final int numKvs = 100;
+        TestStateStore testStore = new TestStateStore(
+            runtime.getMethodName(), localDir, remoteDir, true, false);
+
+        store.close();
+
+        testStore.enableCheckpoints(true);
+        testStore.init();
+
+        testStore.addNumKVs("transaction-1", numKvs, 0);
+        // create base checkpoint
+        String baseCheckpoint = testStore.checkpoint("checkpoint-1");
+        testStore.restore();
+
+        testStore.addNumKVs("transaction-2", numKvs, 100);
+        // create failed checkpoint
+        String failedCheckpoint = testStore.checkpoint("checkpoint-2");
+        // Remove metadata from the checkpoint to signal failure
+
+        CheckpointInfo checkpoint = testStore.getLatestCheckpoint();
+        testStore.corruptCheckpoint(checkpoint);
+
+        // restore : this should restore from base checkpoint
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-1", testStore.get("transaction-id"));
+
+        testStore.addNumKVs("transaction-3", numKvs * 3, 200);
+        // create another test checkpoint
+        String newCheckpoint = testStore.checkpoint("checkpoint-3");
+
+        // Ensure latest checkpoint can be restored.
+        testStore.destroyLocal();
+        testStore.restore();
+        assertEquals("transaction-3", testStore.get("transaction-id"));
+    }
+
+    @Test
+    public void testRestoreOldCheckpointWithoutChecksum() throws Exception {

Review comment:
       Please add a test for backwards compatibility (checkpoint with checksum, restore when checksum is disabled). or a backwards compat test

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
##########
@@ -67,65 +67,10 @@ public void restore(String checkpointId, CheckpointMetadata metadata) throws Sta
         }
     }
 
-    private List<String> getFilesToCopy(String checkpointId,
-                                        File checkpointedDir,
-                                        CheckpointMetadata metadata) throws IOException {
-        if (!checkpointedDir.exists()) {
-            Files.createDirectories(
-                Paths.get(checkpointedDir.getAbsolutePath()));
-        }
-
-        List<String> filesToCopy = Lists.newArrayListWithExpectedSize(metadata.getFilesCount());
-        for (String fileName : metadata.getFilesList()) {
-            File localFile = new File(checkpointedDir, fileName);
-            if (!localFile.exists()) {
-                filesToCopy.add(fileName);
-                continue;
-            }
-
-            String srcFile;
-            if (RocksUtils.isSstFile(localFile)) {
-                srcFile = RocksUtils.getDestSstPath(dbPrefix, localFile);
-            } else {
-                srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
-            }
-
-            long srcFileLength = checkpointStore.getFileLength(srcFile);
-            long localFileLength = localFile.length();
-            if (srcFileLength != localFileLength) {
-                filesToCopy.add(fileName);
-            }
-        }
-
-        return filesToCopy;
-    }
-
     private void copyFilesFromRemote(String checkpointId,
-                                     File checkpointedDir,
-                                     List<String> remoteFiles) throws IOException {
-        for (String file : remoteFiles) {
-            copyFileFromRemote(checkpointId, checkpointedDir, file);
+                                     List<CheckpointFile> remoteFiles) throws IOException {
+        for (CheckpointFile file : remoteFiles) {
+            file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);

Review comment:
       I am probably missing something obvious.
   I expected checksum validation after file copy from remote, is it happening at some other place (implicitly via equals, as part of some map/set check?)? 

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();

Review comment:
       what if checksum is `"invalid"` as returned by `computeChecksum()` in case of error?
   I.e. it is invalid when saved to the checkpoint and when verified later, will the code pass the checksum check?
   "invalid_<timestamp_here>" will avoid that, I bet there is a more elegant solution.

##########
File path: stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
##########
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
+
+import com.google.common.hash.Hashing;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
+import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
+import org.apache.bookkeeper.stream.proto.kv.store.FileInfo;
+
+/**
+ * CheckpointFile encapsulates the attributes and operations for a file in checkpoint.
+ */
+@Slf4j
+public class CheckpointFile {
+    private final File file;
+    private final String checksum;
+    private final boolean isSstFile;
+
+    CheckpointFile(File file) {
+        this.file = file;
+        this.checksum = computeChecksum();
+        this.isSstFile = RocksUtils.isSstFile(this.file);
+    }
+
+    CheckpointFile(File checkpointDir, String filename) {
+        this.file = new File(checkpointDir, filename);
+        this.checksum = null;

Review comment:
       This is a very non-obvious side-effect of using this constructor. 
   I'd love to see it expressed more explicitly but don't have better ideas than doing something like  
   ```
   // checksum
   CheckpointFile cp = (new CheckpointFile(.....)).withChecksum();
   
   // no checksum
   CheckpointFile cp = new CheckpointFile(.....);
   
   ```
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org