You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/24 11:35:03 UTC

[iotdb] branch master updated: [IOTDB-5010] DataRegion StateMachine support for Ratis Snapshot (#8110)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ac3b408da8 [IOTDB-5010] DataRegion StateMachine support for Ratis Snapshot (#8110)
ac3b408da8 is described below

commit ac3b408da857217b4f5112d0dd322bb905c4e0fd
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Thu Nov 24 19:34:58 2022 +0800

    [IOTDB-5010] DataRegion StateMachine support for Ratis Snapshot (#8110)
---
 consensus/pom.xml                                  |  9 ++++++-
 .../org/apache/iotdb/consensus/IStateMachine.java  | 13 ++++++++-
 .../ratis/ApplicationStateMachineProxy.java        |  3 ++-
 .../iotdb/consensus/ratis/SnapshotStorage.java     | 31 ++++++++++++++++++----
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +++++++++
 .../statemachine/DataRegionStateMachine.java       | 15 +++++++++--
 .../iotdb/db/engine/snapshot/SnapshotTaker.java    | 17 +++++++-----
 7 files changed, 83 insertions(+), 17 deletions(-)

diff --git a/consensus/pom.xml b/consensus/pom.xml
index 35785f12bd..618087fb2c 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -32,11 +32,18 @@
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
-        <ratis.version>2.4.0</ratis.version>
+        <ratis.version>2.4.1</ratis.version>
         <consensus.test.skip>false</consensus.test.skip>
         <consensus.it.skip>${consensus.test.skip}</consensus.it.skip>
         <consensus.ut.skip>${consensus.test.skip}</consensus.ut.skip>
     </properties>
+    <repositories>
+        <repository>
+            <id>ratis-241-rc2</id>
+            <name>Apache Ratis 2.4.1 rc2</name>
+            <url>https://repository.apache.org/content/repositories/orgapacheratis-1134</url>
+        </repository>
+    </repositories>
     <dependencies>
         <!-- https://mvnrepository.com/artifact/org.apache.ratis/ratis-server -->
         <dependency>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 72d8b50e55..75f50721f9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -74,10 +74,11 @@ public interface IStateMachine {
    * data of the snapshot will be stored under `data folder/snapshot/snapshotId`.
    *
    * @param snapshotDir required storage dir
+   * @param snapshotTmpId temporary id of the snapshot
    * @param snapshotId the id of the snapshot
    * @return true if snapshot is successfully taken
    */
-  default boolean takeSnapshot(File snapshotDir, String snapshotId) {
+  default boolean takeSnapshot(File snapshotDir, String snapshotTmpId, String snapshotId) {
     return takeSnapshot(snapshotDir);
   }
 
@@ -170,4 +171,14 @@ public interface IStateMachine {
   default IStateMachine.EventApi event() {
     return (IStateMachine.EventApi) this;
   }
+
+  /**
+   * Since Ratis 2.4.1, RatisConsensus allows statemachine to customize its own snapshot storage.
+   * Currently only DataRegionStateMachine will use this interface.
+   *
+   * @return statemachine snapshot root
+   */
+  default File getSnapshotRoot() {
+    return null;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 76d958466c..4a503e7ce2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -209,7 +209,8 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
     }
 
     boolean applicationTakeSnapshotSuccess =
-        applicationStateMachine.takeSnapshot(snapshotTmpDir, metadata);
+        applicationStateMachine.takeSnapshot(
+            snapshotTmpDir, snapshotStorage.getSnapshotTmpId(metadata), metadata);
     if (!applicationTakeSnapshotSuccess) {
       deleteIncompleteSnapshot(snapshotTmpDir);
       return RaftLog.INVALID_LOG_INDEX;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 2917ee7e19..f1f64c21c9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -39,6 +39,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SnapshotStorage implements StateMachineStorage {
@@ -63,7 +64,7 @@ public class SnapshotStorage implements StateMachineStorage {
 
   private Path[] getSortedSnapshotDirPaths() {
     ArrayList<Path> snapshotPaths = new ArrayList<>();
-    try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateMachineDir.toPath())) {
+    try (DirectoryStream<Path> stream = Files.newDirectoryStream(getStateMachineDir().toPath())) {
       for (Path path : stream) {
         if (path.toFile().isDirectory() && !path.toFile().getName().startsWith(TMP_PREFIX)) {
           snapshotPaths.add(path);
@@ -111,7 +112,13 @@ public class SnapshotStorage implements StateMachineStorage {
       if (file.endsWith(".md5")) {
         continue;
       }
-      FileInfo fileInfo = new FileInfo(file, null);
+      FileInfo fileInfo = null;
+      try {
+        fileInfo = new FileInfo(file.toRealPath(), null);
+      } catch (IOException e) {
+        logger.warn("{} cannot resolve real path of {} due to {}", this, file, e);
+        return null;
+      }
       fileInfos.add(fileInfo);
     }
 
@@ -161,15 +168,29 @@ public class SnapshotStorage implements StateMachineStorage {
   }
 
   public File getStateMachineDir() {
-    return stateMachineDir;
+    return Optional.ofNullable(getSnapshotDir()).orElse(stateMachineDir);
   }
 
   public File getSnapshotDir(String snapshotMetadata) {
-    return new File(stateMachineDir.getAbsolutePath() + File.separator + snapshotMetadata);
+    return new File(getStateMachineDir().getAbsolutePath() + File.separator + snapshotMetadata);
   }
 
   public File getSnapshotTmpDir(String snapshotMetadata) {
     return new File(
-        stateMachineDir.getAbsolutePath() + File.separator + TMP_PREFIX + snapshotMetadata);
+        getStateMachineDir().getAbsolutePath() + File.separator + TMP_PREFIX + snapshotMetadata);
+  }
+
+  public String getSnapshotTmpId(String snapshotMetadata) {
+    return TMP_PREFIX + snapshotMetadata;
+  }
+
+  @Override
+  public File getSnapshotDir() {
+    return applicationStateMachine.getSnapshotRoot();
+  }
+
+  @Override
+  public File getTmpDir() {
+    return getSnapshotDir() == null ? null : new File(getSnapshotDir(), TMP_PREFIX);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6f84eb6608..1500ba927f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -293,6 +293,13 @@ public class IoTDBConfig {
   /** Strategy of multiple directories. */
   private String multiDirStrategyClassName = null;
 
+  private String ratisDataRegionSnapshotDir =
+      IoTDBConstant.DEFAULT_BASE_DIR
+          + File.separator
+          + IoTDBConstant.DATA_FOLDER_NAME
+          + File.separator
+          + IoTDBConstant.SNAPSHOT_FOLDER_NAME;
+
   /** Consensus directory. */
   private String consensusDir = IoTDBConstant.DEFAULT_BASE_DIR + File.separator + "consensus";
 
@@ -1148,6 +1155,7 @@ public class IoTDBConfig {
     tracingDir = addDataHomeDir(tracingDir);
     consensusDir = addDataHomeDir(consensusDir);
     dataRegionConsensusDir = addDataHomeDir(dataRegionConsensusDir);
+    ratisDataRegionSnapshotDir = addDataHomeDir(ratisDataRegionSnapshotDir);
     schemaRegionConsensusDir = addDataHomeDir(schemaRegionConsensusDir);
     indexRootFolder = addDataHomeDir(indexRootFolder);
     extDir = addDataHomeDir(extDir);
@@ -1350,6 +1358,10 @@ public class IoTDBConfig {
     this.queryDir = queryDir;
   }
 
+  public String getRatisDataRegionSnapshotDir() {
+    return ratisDataRegionSnapshotDir;
+  }
+
   public String getConsensusDir() {
     return consensusDir;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 6d2516964d..d605a2fd06 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -112,10 +112,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public boolean takeSnapshot(File snapshotDir, String snapshotId) {
+  public boolean takeSnapshot(File snapshotDir, String snapshotTmpId, String snapshotId) {
     try {
       return new SnapshotTaker(region)
-          .takeFullSnapshot(snapshotDir.getAbsolutePath(), snapshotId, true);
+          .takeFullSnapshot(snapshotDir.getAbsolutePath(), snapshotTmpId, snapshotId, true);
     } catch (Exception e) {
       logger.error(
           "Exception occurs when taking snapshot for {}-{} in {}",
@@ -444,4 +444,15 @@ public class DataRegionStateMachine extends BaseStateMachine {
     // TODO implement this
     return super.getSleepTime();
   }
+
+  @Override
+  public File getSnapshotRoot() {
+    String snapshotDir = IoTDBDescriptor.getInstance().getConfig().getRatisDataRegionSnapshotDir();
+    try {
+      return new File(snapshotDir).getCanonicalFile();
+    } catch (IOException e) {
+      logger.warn("{}: cannot get the canonical file of {} due to {}", this, snapshotDir, e);
+      return null;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
index 175b8c5344..80132d5e60 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
@@ -58,11 +58,14 @@ public class SnapshotTaker {
       throws DirectoryNotLegalException, IOException {
     File snapshotDir = new File(snapshotDirPath);
     String snapshotId = snapshotDir.getName();
-    return takeFullSnapshot(snapshotDirPath, snapshotId, flushBeforeSnapshot);
+    return takeFullSnapshot(snapshotDirPath, snapshotId, snapshotId, flushBeforeSnapshot);
   }
 
   public boolean takeFullSnapshot(
-      String snapshotDirPath, String snapshotId, boolean flushBeforeSnapshot)
+      String snapshotDirPath,
+      String tempSnapshotId,
+      String finalSnapshotId,
+      boolean flushBeforeSnapshot)
       throws DirectoryNotLegalException, IOException {
     File snapshotDir = new File(snapshotDirPath);
     if (snapshotDir.exists()
@@ -81,7 +84,7 @@ public class SnapshotTaker {
     try {
       snapshotLogger = new SnapshotLogger(snapshotLog);
       boolean success;
-      snapshotLogger.logSnapshotId(snapshotId);
+      snapshotLogger.logSnapshotId(finalSnapshotId);
 
       try {
         readLockTheFile();
@@ -93,8 +96,8 @@ public class SnapshotTaker {
             dataRegion.writeUnlock();
           }
         }
-        success = createSnapshot(seqFiles, snapshotId);
-        success = createSnapshot(unseqFiles, snapshotId) && success;
+        success = createSnapshot(seqFiles, tempSnapshotId);
+        success = success && createSnapshot(unseqFiles, tempSnapshotId);
       } finally {
         readUnlockTheFile();
       }
@@ -104,14 +107,14 @@ public class SnapshotTaker {
             "Failed to take snapshot for {}-{}, clean up",
             dataRegion.getStorageGroupName(),
             dataRegion.getDataRegionId());
-        cleanUpWhenFail(snapshotId);
+        cleanUpWhenFail(finalSnapshotId);
       } else {
         snapshotLogger.logEnd();
         LOGGER.info(
             "Successfully take snapshot for {}-{}, snapshot directory is {}",
             dataRegion.getStorageGroupName(),
             dataRegion.getDataRegionId(),
-            snapshotDir.getParentFile().getAbsolutePath() + File.separator + snapshotId);
+            snapshotDir.getParentFile().getAbsolutePath() + File.separator + finalSnapshotId);
       }
 
       return success;