You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/19 01:24:20 UTC

[iotdb] branch master updated: [IOTDB-2977] Take and load snapshot for DataRegionStateMachine (#5923)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ebeac5b6a [IOTDB-2977] Take and load snapshot for DataRegionStateMachine (#5923)
1ebeac5b6a is described below

commit 1ebeac5b6a9a3bfcfee6825ff8bdf7d22ab04e6f
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Thu May 19 09:24:14 2022 +0800

    [IOTDB-2977] Take and load snapshot for DataRegionStateMachine (#5923)
---
 .../iotdb/db/integration/IoTDBSnapshotIT.java      | 259 +++++++++++++++++++++
 .../statemachine/DataRegionStateMachine.java       |  38 ++-
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  13 ++
 .../iotdb/db/engine/snapshot/SnapshotLoader.java   | 196 ++++++++++++++++
 .../iotdb/db/engine/snapshot/SnapshotTaker.java    | 166 +++++++++++++
 .../exception/DirectoryNotLegalException.java      |  28 +++
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   4 +
 .../engine/storagegroup/TsFileNameGenerator.java   |   2 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 9 files changed, 703 insertions(+), 4 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java
new file mode 100644
index 0000000000..9b72f49c5f
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
+import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
+import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.integration.env.EnvFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTDBSnapshotIT {
+  final String SG_NAME = "root.snapshotTest";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeTest();
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterTest();
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
+  }
+
+  @Test
+  public void testTakeSnapshot()
+      throws SQLException, IllegalPathException, StorageEngineException, IOException,
+          DirectoryNotLegalException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("set storage group to " + SG_NAME);
+      for (int i = 0; i < 10; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+        }
+        statement.execute("flush");
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+        }
+        statement.execute("flush");
+      }
+
+      DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+      File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+      if (snapshotDir.exists()) {
+        FileUtils.forceDelete(snapshotDir);
+      }
+
+      new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+
+      Assert.assertTrue(snapshotDir.exists());
+      Assert.assertTrue(snapshotDir.isDirectory());
+      File[] seqTsfiles =
+          snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile") && name.startsWith("seq"));
+      File[] unseqTsfiles =
+          snapshotDir.listFiles(
+              (dir, name) -> name.endsWith(".tsfile") && name.startsWith("unseq"));
+      File[] tsfileResources =
+          snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile.resource"));
+      Assert.assertNotNull(seqTsfiles);
+      Assert.assertNotNull(unseqTsfiles);
+      Assert.assertNotNull(tsfileResources);
+      Assert.assertEquals(10, seqTsfiles.length);
+      Assert.assertEquals(10, unseqTsfiles.length);
+      Assert.assertEquals(20, tsfileResources.length);
+    }
+  }
+
+  @Test(expected = DirectoryNotLegalException.class)
+  public void testTakeSnapshotInNotEmptyDir()
+      throws SQLException, IOException, IllegalPathException, StorageEngineException,
+          DirectoryNotLegalException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("set storage group to " + SG_NAME);
+      for (int i = 0; i < 10; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+        }
+        statement.execute("flush");
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+        }
+        statement.execute("flush");
+      }
+
+      DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+      File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+      if (!snapshotDir.exists()) {
+        snapshotDir.mkdirs();
+      }
+
+      File tmpFile = new File(snapshotDir, "test");
+      tmpFile.createNewFile();
+
+      new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+    }
+  }
+
+  @Test
+  public void testLoadSnapshot()
+      throws SQLException, MetadataException, StorageEngineException, DirectoryNotLegalException,
+          IOException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      Map<String, Integer> resultMap = new HashMap<>();
+      statement.execute("set storage group to " + SG_NAME);
+      for (int i = 0; i < 10; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+        }
+        statement.execute("flush");
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+        }
+        statement.execute("flush");
+      }
+      ResultSet resultSet = statement.executeQuery("select ** from root");
+      while (resultSet.next()) {
+        long time = resultSet.getLong("Time");
+        for (int i = 0; i < 10; ++i) {
+          String measurment = SG_NAME + ".d" + i + ".s";
+          int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+          resultMap.put(time + measurment, res);
+        }
+      }
+
+      DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+      File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+      if (!snapshotDir.exists()) {
+        snapshotDir.mkdirs();
+      }
+      new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+      StorageEngineV2.getInstance()
+          .setDataRegion(
+              new DataRegionId(0),
+              new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0")
+                  .loadSnapshotForStateMachine());
+
+      ChunkCache.getInstance().clear();
+      TimeSeriesMetadataCache.getInstance().clear();
+      resultSet = statement.executeQuery("select ** from root");
+      while (resultSet.next()) {
+        long time = resultSet.getLong("Time");
+        for (int i = 0; i < 10; ++i) {
+          String measurment = SG_NAME + ".d" + i + ".s";
+          int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+          Assert.assertEquals(resultMap.get(time + measurment).intValue(), res);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testTakeAndLoadSnapshotWhenCompaction()
+      throws SQLException, MetadataException, StorageEngineException, InterruptedException,
+          DirectoryNotLegalException, IOException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      Map<String, Integer> resultMap = new HashMap<>();
+      statement.execute("set storage group to " + SG_NAME);
+      for (int i = 0; i < 10; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+        }
+        statement.execute("flush");
+        for (int j = 0; j < 10; ++j) {
+          statement.execute(
+              String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+        }
+        statement.execute("flush");
+      }
+
+      ResultSet resultSet = statement.executeQuery("select ** from root");
+      while (resultSet.next()) {
+        long time = resultSet.getLong("Time");
+        for (int i = 0; i < 10; ++i) {
+          String measurment = SG_NAME + ".d" + i + ".s";
+          int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+          resultMap.put(time + measurment, res);
+        }
+      }
+
+      File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+      if (!snapshotDir.exists()) {
+        snapshotDir.mkdirs();
+      }
+      IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
+      statement.execute("merge");
+      DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+      new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+      region.abortCompaction();
+      StorageEngineV2.getInstance()
+          .setDataRegion(
+              new DataRegionId(0),
+              new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0")
+                  .loadSnapshotForStateMachine());
+      ChunkCache.getInstance().clear();
+      TimeSeriesMetadataCache.getInstance().clear();
+      resultSet = statement.executeQuery("select ** from root");
+      while (resultSet.next()) {
+        long time = resultSet.getLong("Time");
+        for (int i = 0; i < 10; ++i) {
+          String measurment = SG_NAME + ".d" + i + ".s";
+          int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+          Assert.assertEquals(resultMap.get(time + measurment).intValue(), res);
+        }
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 89110c4700..1decc0b84d 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -20,8 +20,12 @@
 package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
+import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -39,7 +43,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
   private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER =
       FragmentInstanceManager.getInstance();
 
-  private final DataRegion region;
+  private DataRegion region;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
@@ -53,11 +57,39 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   @Override
   public boolean takeSnapshot(File snapshotDir) {
-    return false;
+    try {
+      return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+    } catch (Exception e) {
+      logger.error(
+          "Exception occurs when taking snapshot for {}-{} in {}",
+          region.getLogicalStorageGroupName(),
+          region.getDataRegionId(),
+          snapshotDir,
+          e);
+      return false;
+    }
   }
 
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {}
+  public void loadSnapshot(File latestSnapshotRootDir) {
+    DataRegion newRegion =
+        new SnapshotLoader(
+                latestSnapshotRootDir.getAbsolutePath(),
+                region.getLogicalStorageGroupName(),
+                region.getDataRegionId())
+            .loadSnapshotForStateMachine();
+    if (newRegion == null) {
+      logger.error("Fail to load snapshot from {}", latestSnapshotRootDir);
+      return;
+    }
+    this.region = newRegion;
+    try {
+      StorageEngineV2.getInstance()
+          .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
+    } catch (Exception e) {
+      logger.error("Exception occurs when replacing data region in storage engine.", e);
+    }
+  }
 
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 30a716512f..94ea120d20 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -601,6 +601,19 @@ public class StorageEngineV2 implements IService {
     return dataRegionMap.get(regionId);
   }
 
+  public void setDataRegion(DataRegionId regionId, DataRegion newRegion) {
+    if (dataRegionMap.containsKey(regionId)) {
+      DataRegion oldRegion = dataRegionMap.get(regionId);
+      oldRegion.syncCloseAllWorkingTsFileProcessors();
+      oldRegion.abortCompaction();
+    }
+    dataRegionMap.put(regionId, newRegion);
+  }
+
+  public TsFileFlushPolicy getFileFlushPolicy() {
+    return fileFlushPolicy;
+  }
+
   static class InstanceHolder {
 
     private static final StorageEngineV2 INSTANCE = new StorageEngineV2();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
new file mode 100644
index 0000000000..476a645533
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class SnapshotLoader {
+  private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class);
+  private String storageGroupName;
+  private String snapshotPath;
+  private String dataRegionId;
+
+  public SnapshotLoader(String snapshotPath, String storageGroupName, String dataRegionId) {
+    this.snapshotPath = snapshotPath;
+    this.storageGroupName = storageGroupName;
+    this.dataRegionId = dataRegionId;
+  }
+
+  private DataRegion loadSnapshot() {
+    try {
+      return new DataRegion(
+          IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+              + File.separator
+              + "storage_groups"
+              + File.separator
+              + storageGroupName,
+          dataRegionId,
+          StorageEngineV2.getInstance().getFileFlushPolicy(),
+          storageGroupName);
+    } catch (Exception e) {
+      LOGGER.error("Exception occurs while load snapshot from {}", snapshotPath, e);
+      return null;
+    }
+  }
+
+  /**
+   * 1. Clear origin data 2. Move snapshot data to data dir 3. Load data region
+   *
+   * @return
+   */
+  public DataRegion loadSnapshotForStateMachine() {
+    try {
+      deleteAllFilesInDataDirs();
+    } catch (IOException e) {
+      return null;
+    }
+
+    // move the snapshot data to data dir
+    String seqBaseDir =
+        IoTDBConstant.SEQUENCE_FLODER_NAME
+            + File.separator
+            + storageGroupName
+            + File.separator
+            + dataRegionId;
+    String unseqBaseDir =
+        IoTDBConstant.UNSEQUENCE_FLODER_NAME
+            + File.separator
+            + storageGroupName
+            + File.separator
+            + dataRegionId;
+    File sourceDataDir = new File(snapshotPath);
+    if (sourceDataDir.exists()) {
+      try {
+        createLinksFromSnapshotDirToDataDir(sourceDataDir, seqBaseDir, unseqBaseDir);
+      } catch (IOException | DiskSpaceInsufficientException e) {
+        LOGGER.error(
+            "Exception occurs when creating links from snapshot directory to data directory", e);
+        return null;
+      }
+    }
+
+    return loadSnapshot();
+  }
+
+  private void deleteAllFilesInDataDirs() throws IOException {
+    String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+
+    // delete
+    List<File> timePartitions = new ArrayList<>();
+    for (String dataDirPath : dataDirPaths) {
+      File seqDataDirForThisRegion =
+          new File(
+              dataDirPath
+                  + File.separator
+                  + IoTDBConstant.SEQUENCE_FLODER_NAME
+                  + File.separator
+                  + storageGroupName
+                  + File.separator
+                  + dataRegionId);
+      if (seqDataDirForThisRegion.exists()) {
+        File[] files = seqDataDirForThisRegion.listFiles();
+        if (files != null) {
+          timePartitions.addAll(Arrays.asList(files));
+        }
+      }
+
+      File unseqDataDirForThisRegion =
+          new File(
+              dataDirPath
+                  + File.separator
+                  + IoTDBConstant.UNSEQUENCE_FLODER_NAME
+                  + File.separator
+                  + storageGroupName
+                  + File.separator
+                  + dataRegionId);
+
+      if (unseqDataDirForThisRegion.exists()) {
+        File[] files = unseqDataDirForThisRegion.listFiles();
+        if (files != null) {
+          timePartitions.addAll(Arrays.asList(files));
+        }
+      }
+    }
+
+    try {
+      for (File timePartition : timePartitions) {
+        FileUtils.forceDelete(timePartition);
+      }
+    } catch (IOException e) {
+      LOGGER.error(
+          "Exception occurs when deleting time partition directory for {}-{}",
+          storageGroupName,
+          dataRegionId,
+          e);
+      throw e;
+    }
+  }
+
+  private void createLinksFromSnapshotDirToDataDir(
+      File sourceDir, String seqBaseDir, String unseqBaseDir)
+      throws IOException, DiskSpaceInsufficientException {
+    File[] files = sourceDir.listFiles();
+    if (files == null) {
+      return;
+    }
+    for (File sourceFile : files) {
+      String[] fileInfo = sourceFile.getName().split(SnapshotTaker.SNAPSHOT_FILE_INFO_SEP_STR);
+      if (fileInfo.length != 5) {
+        continue;
+      }
+      boolean seq = fileInfo[0].equals("seq");
+      String timePartition = fileInfo[3];
+      String fileName = fileInfo[4];
+      String nextDataDir =
+          seq
+              ? DirectoryManager.getInstance().getNextFolderForSequenceFile()
+              : DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
+      File baseDir = new File(nextDataDir, seq ? seqBaseDir : unseqBaseDir);
+      File targetDirForThisTimePartition = new File(baseDir, timePartition);
+      if (!targetDirForThisTimePartition.exists() && !targetDirForThisTimePartition.mkdirs()) {
+        throw new IOException(
+            String.format("Failed to make directory %s", targetDirForThisTimePartition));
+      }
+
+      File targetFile = new File(targetDirForThisTimePartition, fileName);
+      try {
+        Files.createLink(targetFile.toPath(), sourceFile.toPath());
+      } catch (IOException e) {
+        throw new IOException(
+            String.format("Failed to create hard link from %s to %s", sourceFile, targetFile), e);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
new file mode 100644
index 0000000000..1842b1b830
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * SnapshotTaker takes data snapshot for a DataRegion in one time. It does so by creating hard link
+ * for files or copying them. SnapshotTaker supports two different ways of snapshot: Full Snapshot
+ * and Incremental Snapshot. The former takes a snapshot for all files in an empty directory, and
+ * the latter takes a snapshot based on the snapshot that took before.
+ */
+public class SnapshotTaker {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class);
+  private final DataRegion dataRegion;
+  public static String SNAPSHOT_FILE_INFO_SEP_STR = "_";
+
+  public SnapshotTaker(DataRegion dataRegion) {
+    this.dataRegion = dataRegion;
+  }
+
+  public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot)
+      throws DirectoryNotLegalException, IOException {
+    File snapshotDir = new File(snapshotDirPath);
+    if (snapshotDir.exists()
+        && snapshotDir.listFiles() != null
+        && snapshotDir.listFiles().length > 0) {
+      // the directory should be empty or not exists
+      throw new DirectoryNotLegalException(
+          String.format("%s already exists and is not empty", snapshotDirPath));
+    }
+
+    if (!snapshotDir.exists() && !snapshotDir.mkdirs()) {
+      throw new IOException(String.format("Failed to create directory %s", snapshotDir));
+    }
+
+    if (flushBeforeSnapshot) {
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+    }
+
+    List<Long> timePartitions = dataRegion.getTimePartitions();
+    for (Long timePartition : timePartitions) {
+      List<String> seqDataDirs = getAllDataDirOfOnePartition(true, timePartition);
+
+      try {
+        createFileSnapshot(seqDataDirs, snapshotDir, true, timePartition);
+      } catch (IOException e) {
+        LOGGER.error("Fail to create snapshot", e);
+        File[] files = snapshotDir.listFiles();
+        if (files != null) {
+          for (File file : files) {
+            if (!file.delete()) {
+              LOGGER.error("Failed to delete link file {} after failing to create snapshot", file);
+            }
+          }
+        }
+        return false;
+      }
+
+      List<String> unseqDataDirs = getAllDataDirOfOnePartition(false, timePartition);
+
+      try {
+        createFileSnapshot(unseqDataDirs, snapshotDir, false, timePartition);
+      } catch (IOException e) {
+        LOGGER.error("Fail to create snapshot", e);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private List<String> getAllDataDirOfOnePartition(boolean sequence, long timePartition) {
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    List<String> resultDirs = new LinkedList<>();
+
+    for (String dataDir : dataDirs) {
+      resultDirs.add(
+          dataDir
+              + File.separator
+              + (sequence
+                  ? IoTDBConstant.SEQUENCE_FLODER_NAME
+                  : IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+              + File.separator
+              + dataRegion.getLogicalStorageGroupName()
+              + File.separator
+              + dataRegion.getDataRegionId()
+              + File.separator
+              + timePartition
+              + File.separator);
+    }
+    return resultDirs;
+  }
+
+  private void createFileSnapshot(
+      List<String> sourceDirPaths, File targetDir, boolean sequence, long timePartition)
+      throws IOException {
+    for (String sourceDirPath : sourceDirPaths) {
+      File sourceDir = new File(sourceDirPath);
+      if (!sourceDir.exists()) {
+        continue;
+      }
+      // Collect TsFile, TsFileResource, Mods, CompactionMods
+      File[] files =
+          sourceDir.listFiles(
+              (dir, name) ->
+                  name.endsWith(".tsfile")
+                      || name.endsWith(TsFileResource.RESOURCE_SUFFIX)
+                      || name.endsWith(ModificationFile.FILE_SUFFIX)
+                      || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX)
+                      || name.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX)
+                      || name.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX)
+                      || name.endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)
+                      || name.endsWith(IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX));
+      if (files == null || files.length == 0) {
+        continue;
+      }
+
+      for (File file : files) {
+        String newFileName =
+            (sequence ? "seq" : "unseq")
+                + SNAPSHOT_FILE_INFO_SEP_STR
+                + dataRegion.getLogicalStorageGroupName()
+                + SNAPSHOT_FILE_INFO_SEP_STR
+                + dataRegion.getDataRegionId()
+                + SNAPSHOT_FILE_INFO_SEP_STR
+                + timePartition
+                + SNAPSHOT_FILE_INFO_SEP_STR
+                + file.getName();
+        File linkFile = new File(targetDir, newFileName);
+        Files.createLink(linkFile.toPath(), file.toPath());
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java
new file mode 100644
index 0000000000..bd4742d9e5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot.exception;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DirectoryNotLegalException extends IoTDBException {
+  public DirectoryNotLegalException(String message) {
+    super(message, TSStatusCode.SNAPSHOT_DIR_NOT_LEGAL.getStatusCode());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 1fbd2ab5e0..9fbc911655 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3486,6 +3486,10 @@ public class DataRegion {
         throws WriteProcessException;
   }
 
+  public List<Long> getTimePartitions() {
+    return new ArrayList<>(partitionMaxFileVersions.keySet());
+  }
+
   public String getInsertWriteLockHolder() {
     return insertWriteLockHolder;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index b185838b32..946d827461 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -71,7 +71,7 @@ public class TsFileNameGenerator {
             time, version, innerSpaceCompactionCount, crossSpaceCompactionCount);
   }
 
-  private static String generateTsFileDir(
+  public static String generateTsFileDir(
       boolean sequence,
       String logicalStorageGroup,
       String virtualStorageGroup,
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f612f6732b..bb037da449 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -83,6 +83,7 @@ public enum TSStatusCode {
   WRITE_PROCESS_ERROR(412),
   WRITE_PROCESS_REJECT(413),
   QUERY_ID_NOT_EXIST(414),
+  SNAPSHOT_DIR_NOT_LEGAL(415),
 
   UNSUPPORTED_INDEX_FUNC_ERROR(421),
   UNSUPPORTED_INDEX_TYPE_ERROR(422),