You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/24 10:04:06 UTC

[iotdb] branch master updated: [IOTDB-4027] Support cross disk link in RatisConsensus (#7056)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba111cebb [IOTDB-4027] Support cross disk link in RatisConsensus (#7056)
7ba111cebb is described below

commit 7ba111cebbd6e2861086f780470715db2bc37b4d
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Aug 24 18:04:00 2022 +0800

    [IOTDB-4027] Support cross disk link in RatisConsensus (#7056)
---
 .../org/apache/iotdb/consensus/IStateMachine.java  | 12 ++--
 .../org/apache/iotdb/consensus/common/Utils.java   | 77 ++++++++++++++++++++++
 .../iotdb/consensus/ratis/SnapshotStorage.java     | 51 ++------------
 .../apache/iotdb/consensus/ratis/SnapshotTest.java | 63 ++++++++++++++++++
 .../statemachine/DataRegionStateMachine.java       |  6 +-
 5 files changed, 157 insertions(+), 52 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 3989272dee..9ce532c999 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -24,11 +24,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.Utils;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.function.Function;
 
@@ -76,18 +78,18 @@ public interface IStateMachine {
   void loadSnapshot(File latestSnapshotRootDir);
 
   /**
-   * given a snapshot dir, ask statemachine to provide all snapshot files.
+   * given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list
+   * all files recursively under latestSnapshotDir
    *
    * <p>DataRegion may take snapshot at a different disk and only store a log file containing file
    * paths. So statemachine is required to read the log file and provide the real snapshot file
    * paths.
    *
    * @param latestSnapshotRootDir dir where the latest snapshot sits
-   * @return List of real snapshot files. If the returned list is null, consensus implementations
-   *     will visit and add all files under this give latestSnapshotRootDir.
+   * @return List of real snapshot files.
    */
-  default List<File> getSnapshotFiles(File latestSnapshotRootDir) {
-    return null;
+  default List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
+    return Utils.listAllRegularFilesRecursively(latestSnapshotRootDir);
   }
 
   /** An optional API for event notifications. */
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
new file mode 100644
index 0000000000..96d381949e
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.FileVisitor;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+  private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+  public static List<Path> listAllRegularFilesRecursively(File rootDir) {
+    List<Path> allFiles = new ArrayList<>();
+    try {
+      Files.walkFileTree(
+          rootDir.toPath(),
+          new FileVisitor<Path>() {
+            @Override
+            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+                throws IOException {
+              return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+                throws IOException {
+              if (attrs.isRegularFile()) {
+                allFiles.add(file);
+              }
+              return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+              logger.info("visit file {} failed due to {}", file.toAbsolutePath(), exc);
+              return FileVisitResult.TERMINATE;
+            }
+
+            @Override
+            public FileVisitResult postVisitDirectory(Path dir, IOException exc)
+                throws IOException {
+              return FileVisitResult.CONTINUE;
+            }
+          });
+    } catch (IOException ioException) {
+      logger.error("IOException occurred during listing snapshot directory: ", ioException);
+      return Collections.emptyList();
+    }
+    return allFiles;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index de0f70a427..60d117dee3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -34,14 +34,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
-import java.nio.file.FileVisitResult;
-import java.nio.file.FileVisitor;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 public class SnapshotStorage implements StateMachineStorage {
@@ -107,46 +103,6 @@ public class SnapshotStorage implements StateMachineStorage {
     return i < 0 ? null : snapshots[i].toFile();
   }
 
-  private List<Path> getAllFilesUnder(File rootDir) {
-    List<Path> allFiles = new ArrayList<>();
-    try {
-      Files.walkFileTree(
-          rootDir.toPath(),
-          new FileVisitor<Path>() {
-            @Override
-            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
-                throws IOException {
-              return FileVisitResult.CONTINUE;
-            }
-
-            @Override
-            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
-                throws IOException {
-              if (attrs.isRegularFile()) {
-                allFiles.add(file);
-              }
-              return FileVisitResult.CONTINUE;
-            }
-
-            @Override
-            public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
-              logger.info("visit file {} failed due to {}", file.toAbsolutePath(), exc);
-              return FileVisitResult.TERMINATE;
-            }
-
-            @Override
-            public FileVisitResult postVisitDirectory(Path dir, IOException exc)
-                throws IOException {
-              return FileVisitResult.CONTINUE;
-            }
-          });
-    } catch (IOException ioException) {
-      logger.error("IOException occurred during listing snapshot directory: ", ioException);
-      return Collections.emptyList();
-    }
-    return allFiles;
-  }
-
   @Override
   public SnapshotInfo getLatestSnapshot() {
     File latestSnapshotDir = findLatestSnapshotDir();
@@ -155,8 +111,13 @@ public class SnapshotStorage implements StateMachineStorage {
     }
     TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir);
 
+    List<Path> actualSnapshotFiles = applicationStateMachine.getSnapshotFiles(latestSnapshotDir);
+    if (actualSnapshotFiles == null) {
+      return null;
+    }
+
     List<FileInfo> fileInfos = new ArrayList<>();
-    for (Path file : getAllFilesUnder(latestSnapshotDir)) {
+    for (Path file : actualSnapshotFiles) {
       if (file.endsWith(".md5")) {
         continue;
       }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index 204a6725c4..fc89eebd4d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -30,7 +30,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Scanner;
 
 public class SnapshotTest {
 
@@ -132,4 +139,60 @@ public class SnapshotTest {
         + ".ratis_meta."
         + termIndexMeta;
   }
+
+  static class CrossDiskLinkStatemachine extends TestUtils.IntegerCounter {
+    @Override
+    public boolean takeSnapshot(File snapshotDir) {
+      /*
+       * Simulate the cross disk link snapshot
+       * create a real snapshot file and a log file recording real snapshot file path
+       */
+      File snapshotRaw = new File(snapshotDir.getAbsolutePath() + File.separator + "snapshot");
+      File snapshotRecord = new File(snapshotDir.getAbsolutePath() + File.separator + "record");
+      try {
+        Assert.assertTrue(snapshotRaw.createNewFile());
+        FileWriter writer = new FileWriter(snapshotRecord);
+        writer.write(snapshotRaw.getAbsolutePath());
+        writer.close();
+      } catch (IOException ioException) {
+        ioException.printStackTrace();
+      }
+      return true;
+    }
+
+    @Override
+    public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
+      File log = new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + "record");
+      Assert.assertTrue(log.exists());
+      Scanner scanner = null;
+      String actualSnapshotPath = null;
+      try {
+        scanner = new Scanner(log);
+        actualSnapshotPath = scanner.nextLine();
+        scanner.close();
+      } catch (FileNotFoundException e) {
+        e.printStackTrace();
+      }
+      Assert.assertNotNull(scanner);
+
+      return Collections.singletonList(Paths.get(actualSnapshotPath));
+    }
+  }
+
+  @Test
+  public void testCrossDiskLinkSnapshot() throws Exception {
+    ApplicationStateMachineProxy proxy =
+        new ApplicationStateMachineProxy(new CrossDiskLinkStatemachine(), null);
+
+    proxy.initialize(null, null, new EmptyStorageWithOnlySMDir());
+    proxy.notifyTermIndexUpdated(20, 1005);
+    proxy.takeSnapshot();
+    String actualSnapshotName =
+        CrossDiskLinkStatemachine.ensureSnapshotFileName(testDir, "20_1005");
+    File actualSnapshotFile = new File(actualSnapshotName);
+    Assert.assertEquals(proxy.getLatestSnapshot().getFiles().size(), 1);
+    Assert.assertEquals(
+        proxy.getLatestSnapshot().getFiles().get(0).getPath().toFile().getAbsolutePath(),
+        actualSnapshotFile.getAbsolutePath());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 6c52fa73c3..d5cf5e4533 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -57,6 +58,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -258,13 +260,13 @@ public class DataRegionStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public List<File> getSnapshotFiles(File latestSnapshotRootDir) {
+  public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
     try {
       return new SnapshotLoader(
               latestSnapshotRootDir.getAbsolutePath(),
               region.getStorageGroupName(),
               region.getDataRegionId())
-          .getSnapshotFileInfo();
+          .getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
     } catch (IOException e) {
       logger.error(
           "Meets error when getting snapshot files for {}-{}",