You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/26 12:40:44 UTC

[iotdb] branch IOTDB-3771 updated: finish

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3771
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3771 by this push:
     new 1de0ef1e8e finish
1de0ef1e8e is described below

commit 1de0ef1e8e149204df69ddc5386516a658bf14e3
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Jul 26 20:40:34 2022 +0800

    finish
---
 .../org/apache/iotdb/consensus/ratis/Utils.java    |   2 +
 server/pom.xml                                     |   6 +
 .../iotdb/db/engine/snapshot/SnapshotLoader.java   |  77 +++++-
 .../db/engine/snapshot/SnapshotLogAnalyzer.java    |  79 ++++++
 .../iotdb/db/engine/snapshot/SnapshotLogger.java   |  69 +++++
 .../iotdb/db/engine/snapshot/SnapshotTaker.java    | 306 +++++++++++++++------
 .../exception/DirectoryNotLegalException.java      |   7 +-
 7 files changed, 442 insertions(+), 104 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 7d3071ba98..9b8a80824b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -180,6 +180,8 @@ public class Utils {
         properties, config.getLeaderElection().getLeaderStepDownWaitTimeKey());
     RaftServerConfigKeys.LeaderElection.setPreVote(
         properties, config.getLeaderElection().isPreVote());
+    RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 5);
+    RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, true);
 
     RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
         properties, config.getSnapshot().isAutoTriggerEnabled());
diff --git a/server/pom.xml b/server/pom.xml
index 0d4756d628..d6f5a70b22 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -239,6 +239,12 @@
             <artifactId>guava</artifactId>
             <version>[${guava.version},)</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>node-commons</artifactId>
+            <version>0.14.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
index 8067da84d4..224af20e42 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -42,6 +44,7 @@ public class SnapshotLoader {
   private String storageGroupName;
   private String snapshotPath;
   private String dataRegionId;
+  private SnapshotLogAnalyzer logAnalyzer;
 
   public SnapshotLoader(String snapshotPath, String storageGroupName, String dataRegionId) {
     this.snapshotPath = snapshotPath;
@@ -77,27 +80,54 @@ public class SnapshotLoader {
         storageGroupName,
         dataRegionId,
         snapshotPath);
+    File sourceDataDir = new File(snapshotPath);
+
+    File snapshotLogFile = null;
+    if (sourceDataDir.exists()) {
+      File[] files =
+          sourceDataDir.listFiles(
+              new FilenameFilter() {
+                @Override
+                public boolean accept(File dir, String name) {
+                  return name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME);
+                }
+              });
+      if (files == null || files.length == 0) {
+        LOGGER.warn("Failed to find snapshot log file, cannot recover it");
+        return null;
+      } else if (files.length > 1) {
+        LOGGER.warn(
+            "Found more than one snapshot log file, cannot recover it. {}", Arrays.toString(files));
+        return null;
+      } else {
+        LOGGER.info("Reading snapshot log file {}", files[0]);
+        snapshotLogFile = files[0];
+      }
+    }
+
+    SnapshotLogger.SnapshotType type = null;
     try {
-      deleteAllFilesInDataDirs();
-      LOGGER.info("Remove all data files in original data dir");
-    } catch (IOException e) {
+      logAnalyzer = new SnapshotLogAnalyzer(snapshotLogFile);
+      type = logAnalyzer.getType();
+    } catch (Exception e) {
+      LOGGER.error("Exception occurs when reading snapshot file", e);
       return null;
     }
 
-    // move the snapshot data to data dir
-    File sourceDataDir = new File(snapshotPath);
-    if (sourceDataDir.exists()) {
+    try {
       try {
-        createLinksFromSnapshotDirToDataDir(sourceDataDir);
-        LOGGER.info("Move data files from snapshot to data directory");
-      } catch (IOException | DiskSpaceInsufficientException e) {
-        LOGGER.error(
-            "Exception occurs when creating links from snapshot directory to data directory", e);
+        deleteAllFilesInDataDirs();
+        LOGGER.info("Remove all data files in original data dir");
+      } catch (IOException e) {
         return null;
       }
-    }
 
-    return loadSnapshot();
+      createLinksFromSnapshotDirToDataDir();
+
+      return loadSnapshot();
+    } finally {
+      logAnalyzer.close();
+    }
   }
 
   private void deleteAllFilesInDataDirs() throws IOException {
@@ -255,4 +285,25 @@ public class SnapshotLoader {
       }
     }
   }
+
+  private void createLinksFromSnapshotDirToDataDir() {
+    while (logAnalyzer.hasNext()) {
+      Pair<String, String> filesPath = logAnalyzer.getNextPairs();
+      File sourceFile = new File(filesPath.left);
+      File linkedFile = new File(filesPath.right);
+      if (!linkedFile.exists()) {
+        LOGGER.warn("Snapshot file {} does not exist, skip it", linkedFile);
+        continue;
+      }
+      if (!sourceFile.getParentFile().exists() && !sourceFile.getParentFile().mkdirs()) {
+        LOGGER.error("Failed to create folder {}", sourceFile.getParentFile());
+        continue;
+      }
+      try {
+        Files.createLink(sourceFile.toPath(), linkedFile.toPath());
+      } catch (IOException e) {
+        LOGGER.error("Failed to create link from {} to {}", linkedFile, sourceFile, e);
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLogAnalyzer.java
new file mode 100644
index 0000000000..8ac0c4b904
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLogAnalyzer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
+public class SnapshotLogAnalyzer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotLogAnalyzer.class);
+  private File snapshotLogFile;
+  private BufferedReader reader;
+
+  public SnapshotLogAnalyzer(File snapshotLogFile) throws FileNotFoundException {
+    this.snapshotLogFile = snapshotLogFile;
+    this.reader = new BufferedReader(new FileReader(snapshotLogFile));
+  }
+
+  public SnapshotLogger.SnapshotType getType() throws IOException {
+    return SnapshotLogger.SnapshotType.valueOf(reader.readLine());
+  }
+
+  public void close() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      LOGGER.error("Exception occurs when closing log analyzer", e);
+    }
+  }
+
+  public boolean hasNext() {
+    try {
+      return reader != null && reader.ready();
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public Pair<String, String> getNextPairs() {
+    if (reader == null) {
+      return null;
+    }
+    try {
+      String fileInfo = reader.readLine();
+      String[] filesPath = fileInfo.split(SnapshotLogger.SPLIT_CHAR);
+      if (filesPath.length != 2) {
+        LOGGER.warn("Illegal file info: {} in snapshot log", fileInfo);
+        return null;
+      }
+      return new Pair<>(filesPath[0], filesPath[1]);
+    } catch (IOException e) {
+      LOGGER.error("Exception occurs when analyzing snapshot log", e);
+      return null;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLogger.java
new file mode 100644
index 0000000000..dc44701741
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLogger.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+public class SnapshotLogger implements AutoCloseable {
+  public static final String SNAPSHOT_LOG_NAME = "snapshot.log";
+  public static final String SPLIT_CHAR = "#";
+
+  private File logFile;
+  private BufferedOutputStream os;
+
+  public SnapshotLogger(File logFile) throws FileNotFoundException {
+    this.logFile = logFile;
+    os = new BufferedOutputStream(new FileOutputStream(logFile));
+  }
+
+  @Override
+  public void close() throws Exception {
+    os.close();
+  }
+
+  public void logFile(String sourceFile, String linkFile) throws IOException {
+    os.write(sourceFile.getBytes(StandardCharsets.UTF_8));
+    os.write(SPLIT_CHAR.getBytes(StandardCharsets.UTF_8));
+    os.write(linkFile.getBytes(StandardCharsets.UTF_8));
+    os.write("\n".getBytes(StandardCharsets.UTF_8));
+    os.flush();
+  }
+
+  public void logSnapshotType(SnapshotType type) throws IOException {
+    os.write(String.valueOf(type).getBytes(StandardCharsets.UTF_8));
+    os.write("\n".getBytes(StandardCharsets.UTF_8));
+    os.flush();
+  }
+
+  public void cleanUpWhenFailed() throws IOException {
+    os.close();
+    Files.delete(logFile.toPath());
+  }
+
+  public static enum SnapshotType {
+    LOCAL_DISK,
+    REMOTE_DISK
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
index 9297967df5..9e6d8c32ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.LinkedList;
@@ -49,6 +51,7 @@ public class SnapshotTaker {
   public static String SNAPSHOT_FILE_INFO_SEP_STR = "_";
   private File seqBaseDir;
   private File unseqBaseDir;
+  private SnapshotLogger snapshotLogger;
 
   public SnapshotTaker(DataRegion dataRegion) {
     this.dataRegion = dataRegion;
@@ -64,7 +67,89 @@ public class SnapshotTaker {
       throw new DirectoryNotLegalException(
           String.format("%s already exists and is not empty", snapshotDirPath));
     }
+
+    if (!snapshotDir.exists() && !snapshotDir.mkdirs()) {
+      throw new IOException(String.format("Failed to create directory %s", snapshotDir));
+    }
+
     boolean inSameDisk = isSnapshotDirAndDataDirOnSameDisk(snapshotDir);
+
+    if (flushBeforeSnapshot) {
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+    }
+
+    File snapshotLog = new File(snapshotDir, SnapshotLogger.SNAPSHOT_LOG_NAME);
+    try {
+      snapshotLogger = new SnapshotLogger(snapshotLog);
+      boolean success = false;
+      if (inSameDisk) {
+        success = createSnapshotInLocalDisk(snapshotDir);
+      } else {
+        success = createSnapshotInAnotherDisk(snapshotDir);
+      }
+
+      LOGGER.info(
+          "Successfully take snapshot for {}-{}, snapshot directory is {}",
+          dataRegion.getLogicalStorageGroupName(),
+          dataRegion.getDataRegionId(),
+          snapshotDirPath);
+
+      return success;
+    } catch (Exception e) {
+      LOGGER.error(
+          "Exception occurs when taking snapshot for {}-{}",
+          dataRegion.getLogicalStorageGroupName(),
+          dataRegion.getDataRegionId(),
+          e);
+      return false;
+    } finally {
+      try {
+        snapshotLogger.close();
+      } catch (Exception e) {
+        LOGGER.error("Failed to close snapshot logger", e);
+      }
+    }
+  }
+
+  private List<String> getAllDataDirOfOnePartition(boolean sequence, long timePartition) {
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    List<String> resultDirs = new LinkedList<>();
+
+    for (String dataDir : dataDirs) {
+      resultDirs.add(
+          dataDir
+              + File.separator
+              + (sequence
+                  ? IoTDBConstant.SEQUENCE_FLODER_NAME
+                  : IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+              + File.separator
+              + dataRegion.getLogicalStorageGroupName()
+              + File.separator
+              + dataRegion.getDataRegionId()
+              + File.separator
+              + timePartition
+              + File.separator);
+    }
+    return resultDirs;
+  }
+
+  private void cleanUpWhenFail(File snapshotDir) {
+    File[] files = snapshotDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        if (!file.delete()) {
+          LOGGER.error("Failed to delete link file {} after failing to create snapshot", file);
+        }
+      }
+    }
+    try {
+      snapshotLogger.cleanUpWhenFailed();
+    } catch (IOException e) {
+      LOGGER.error("Failed to clean up log file", e);
+    }
+  }
+
+  private boolean createSnapshotInLocalDisk(File snapshotDir) {
     seqBaseDir =
         new File(
             snapshotDir,
@@ -82,23 +167,25 @@ public class SnapshotTaker {
                 + File.separator
                 + dataRegion.getDataRegionId());
 
-    if (!snapshotDir.exists() && !snapshotDir.mkdirs()) {
-      throw new IOException(String.format("Failed to create directory %s", snapshotDir));
-    }
-
-    if (flushBeforeSnapshot) {
-      dataRegion.syncCloseAllWorkingTsFileProcessors();
-    }
-
     List<Long> timePartitions = dataRegion.getTimePartitions();
     TsFileManager manager = dataRegion.getTsFileManager();
     manager.readLock();
     try {
+      try {
+        snapshotLogger.logSnapshotType(SnapshotLogger.SnapshotType.LOCAL_DISK);
+      } catch (IOException e) {
+        LOGGER.error("Fail to create snapshot", e);
+        cleanUpWhenFail(snapshotDir);
+        return false;
+      }
       for (Long timePartition : timePartitions) {
         List<String> seqDataDirs = getAllDataDirOfOnePartition(true, timePartition);
 
         try {
-          createFileSnapshot(seqDataDirs, true, timePartition);
+          for (String seqDataDir : seqDataDirs) {
+            createFileSnapshotToTargetOne(
+                new File(seqDataDir), new File(seqBaseDir, String.valueOf(timePartition)));
+          }
         } catch (IOException e) {
           LOGGER.error("Fail to create snapshot", e);
           cleanUpWhenFail(snapshotDir);
@@ -106,9 +193,11 @@ public class SnapshotTaker {
         }
 
         List<String> unseqDataDirs = getAllDataDirOfOnePartition(false, timePartition);
-
         try {
-          createFileSnapshot(unseqDataDirs, false, timePartition);
+          for (String unseqDataDir : unseqDataDirs) {
+            createFileSnapshotToTargetOne(
+                new File(unseqDataDir), new File(seqBaseDir, String.valueOf(timePartition)));
+          }
         } catch (IOException e) {
           LOGGER.error("Fail to create snapshot", e);
           cleanUpWhenFail(snapshotDir);
@@ -118,94 +207,124 @@ public class SnapshotTaker {
     } finally {
       manager.readUnlock();
     }
-
-    LOGGER.info(
-        "Successfully take snapshot for {}-{}, snapshot directory is {}",
-        dataRegion.getLogicalStorageGroupName(),
-        dataRegion.getDataRegionId(),
-        snapshotDirPath);
-
     return true;
   }
 
-  private List<String> getAllDataDirOfOnePartition(boolean sequence, long timePartition) {
-    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
-    List<String> resultDirs = new LinkedList<>();
-
-    for (String dataDir : dataDirs) {
-      resultDirs.add(
-          dataDir
-              + File.separator
-              + (sequence
-                  ? IoTDBConstant.SEQUENCE_FLODER_NAME
-                  : IoTDBConstant.UNSEQUENCE_FLODER_NAME)
-              + File.separator
-              + dataRegion.getLogicalStorageGroupName()
-              + File.separator
-              + dataRegion.getDataRegionId()
-              + File.separator
-              + timePartition
-              + File.separator);
-    }
-    return resultDirs;
-  }
-
-  private void createFileSnapshot(List<String> sourceDirPaths, boolean sequence, long timePartition)
-      throws IOException {
-    File timePartitionDir =
-        new File(sequence ? seqBaseDir : unseqBaseDir, String.valueOf(timePartition));
-    if (!timePartitionDir.exists() && !timePartitionDir.mkdirs()) {
-      throw new IOException(
-          String.format("%s not exists and cannot create it", timePartitionDir.getAbsolutePath()));
-    }
-
-    for (String sourceDirPath : sourceDirPaths) {
-      File sourceDir = new File(sourceDirPath);
-      if (!sourceDir.exists()) {
-        continue;
-      }
-      // Collect TsFile, TsFileResource, Mods, CompactionMods
-      File[] files =
-          sourceDir.listFiles(
-              (dir, name) ->
-                  name.endsWith(".tsfile")
-                      || name.endsWith(TsFileResource.RESOURCE_SUFFIX)
-                      || name.endsWith(ModificationFile.FILE_SUFFIX)
-                      || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX)
-                      || name.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX)
-                      || name.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX)
-                      || name.endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)
-                      || name.endsWith(IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX));
-      if (files == null || files.length == 0) {
-        continue;
+  private boolean createSnapshotInAnotherDisk(File snapshotDir) {
+    for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
+      File seqDir =
+          new File(
+              dataDir,
+              "sequence"
+                  + File.separator
+                  + dataRegion.getLogicalStorageGroupName()
+                  + File.separator
+                  + dataRegion.getDataRegionId());
+      File unseqDir =
+          new File(
+              dataDir,
+              "unsequence"
+                  + File.separator
+                  + dataRegion.getLogicalStorageGroupName()
+                  + File.separator
+                  + dataRegion.getDataRegionId());
+      File localSeqSnapshotDir =
+          new File(
+              dataDir,
+              "snapshot"
+                  + File.separator
+                  + "sequence"
+                  + File.separator
+                  + snapshotDir.getName()
+                  + File.separator
+                  + dataRegion.getLogicalStorageGroupName()
+                  + File.separator
+                  + dataRegion.getDataRegionId());
+      File localUnseqSnapshotDir =
+          new File(
+              dataDir,
+              "snapshot"
+                  + File.separator
+                  + "unsequence"
+                  + File.separator
+                  + snapshotDir.getName()
+                  + File.separator
+                  + dataRegion.getLogicalStorageGroupName()
+                  + File.separator
+                  + dataRegion.getDataRegionId());
+      if (!localSeqSnapshotDir.mkdirs()) {
+        LOGGER.warn("Failed to create local snapshot dir {}", localSeqSnapshotDir);
+        return false;
       }
+      List<Long> timePartitions = dataRegion.getTimePartitions();
+      TsFileManager manager = dataRegion.getTsFileManager();
+      manager.readLock();
+      for (long timePartition : timePartitions) {
+        File seqTimePartitionDir = new File(seqDir, String.valueOf(timePartition));
+        File seqTimePartitionSnapshotDir =
+            new File(localSeqSnapshotDir, String.valueOf(timePartition));
 
-      for (File file : files) {
-        File linkFile = new File(timePartitionDir, file.getName());
-        Files.createLink(linkFile.toPath(), file.toPath());
-      }
-    }
-  }
+        try {
+          createFileSnapshotToTargetOne(seqTimePartitionDir, seqTimePartitionSnapshotDir);
+        } catch (IOException e) {
+          LOGGER.error(
+              "Failed to create snapshot for {}-{}",
+              dataRegion.getLogicalStorageGroupName(),
+              dataRegion.getDataRegionId(),
+              e);
+        } finally {
+          cleanUpWhenFail(snapshotDir);
+        }
 
-  private void cleanUpWhenFail(File snapshotDir) {
-    File[] files = snapshotDir.listFiles();
-    if (files != null) {
-      for (File file : files) {
-        if (!file.delete()) {
-          LOGGER.error("Failed to delete link file {} after failing to create snapshot", file);
+        File unseqTimePartitionDir = new File(unseqDir, String.valueOf(timePartition));
+        File unseqTimePartitionSnapshotDir =
+            new File(localUnseqSnapshotDir, String.valueOf(timePartition));
+        try {
+          createFileSnapshotToTargetOne(unseqTimePartitionDir, unseqTimePartitionSnapshotDir);
+        } catch (IOException e) {
+          LOGGER.error(
+              "Failed to create snapshot for {}-{}",
+              dataRegion.getLogicalStorageGroupName(),
+              dataRegion.getDataRegionId(),
+              e);
+        } finally {
+          cleanUpWhenFail(snapshotDir);
         }
       }
     }
-  }
 
-  private void createSnapshotInLocalDisk(File snapshotDir) {}
+    return true;
+  }
 
-  private void createSnapshotInAnotherDisk(File snapshotDir) {}
+  private void createFileSnapshotToTargetOne(File sourceDir, File targetDir) throws IOException {
+    File[] files =
+        sourceDir.listFiles(
+            (dir, name) ->
+                name.endsWith(".tsfile")
+                    || name.endsWith(TsFileResource.RESOURCE_SUFFIX)
+                    || name.endsWith(ModificationFile.FILE_SUFFIX)
+                    || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX)
+                    || name.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX)
+                    || name.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX)
+                    || name.endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)
+                    || name.endsWith(IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX));
+    if (files == null) {
+      return;
+    }
+    if (!targetDir.exists() && !targetDir.mkdirs()) {
+      throw new IOException("Failed to create dir " + targetDir.getAbsolutePath());
+    }
+    for (File file : files) {
+      File targetFile = new File(targetDir, file.getName());
+      Files.createLink(targetFile.toPath(), file.toPath());
+      snapshotLogger.logFile(file.getAbsolutePath(), targetFile.getAbsolutePath());
+    }
+  }
 
   private boolean isSnapshotDirAndDataDirOnSameDisk(File snapshotDir) throws IOException {
-    String testFileName = "test";
+    String testFileName = "test.txt";
     File testFile = new File(snapshotDir, testFileName);
-    if (!testFile.createNewFile()) {
+    if (!testFile.createNewFile() || !createTestFile(testFile)) {
       throw new IOException(
           "Failed to test whether the data dir and snapshot dir is on the same disk");
     }
@@ -214,8 +333,13 @@ public class SnapshotTaker {
       File dirFile = new File(dataDir);
       File testSnapshotFile = new File(dirFile, testFileName);
       try {
-        Files.createLink(testFile.toPath(), testSnapshotFile.toPath());
+        Files.createLink(testSnapshotFile.toPath(), testFile.toPath());
       } catch (IOException e) {
+        LOGGER.error(
+            "Failed to create file in {}, test file exists:{}",
+            testSnapshotFile,
+            testFile.exists(),
+            e);
         return false;
       }
       testSnapshotFile.delete();
@@ -223,4 +347,14 @@ public class SnapshotTaker {
     testFile.delete();
     return true;
   }
+
+  private boolean createTestFile(File testFile) {
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(testFile))) {
+      writer.write("test");
+      writer.flush();
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java
index bd4742d9e5..96eb9650dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java
@@ -18,11 +18,8 @@
  */
 package org.apache.iotdb.db.engine.snapshot.exception;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-public class DirectoryNotLegalException extends IoTDBException {
+public class DirectoryNotLegalException extends Exception {
   public DirectoryNotLegalException(String message) {
-    super(message, TSStatusCode.SNAPSHOT_DIR_NOT_LEGAL.getStatusCode());
+    super(message);
   }
 }