You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/19 01:24:20 UTC
[iotdb] branch master updated: [IOTDB-2977] Take and load snapshot for DataRegionStateMachine (#5923)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1ebeac5b6a [IOTDB-2977] Take and load snapshot for DataRegionStateMachine (#5923)
1ebeac5b6a is described below
commit 1ebeac5b6a9a3bfcfee6825ff8bdf7d22ab04e6f
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Thu May 19 09:24:14 2022 +0800
[IOTDB-2977] Take and load snapshot for DataRegionStateMachine (#5923)
---
.../iotdb/db/integration/IoTDBSnapshotIT.java | 259 +++++++++++++++++++++
.../statemachine/DataRegionStateMachine.java | 38 ++-
.../apache/iotdb/db/engine/StorageEngineV2.java | 13 ++
.../iotdb/db/engine/snapshot/SnapshotLoader.java | 196 ++++++++++++++++
.../iotdb/db/engine/snapshot/SnapshotTaker.java | 166 +++++++++++++
.../exception/DirectoryNotLegalException.java | 28 +++
.../iotdb/db/engine/storagegroup/DataRegion.java | 4 +
.../engine/storagegroup/TsFileNameGenerator.java | 2 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
9 files changed, 703 insertions(+), 4 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java
new file mode 100644
index 0000000000..9b72f49c5f
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
+import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
+import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.integration.env.EnvFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTDBSnapshotIT {
+ final String SG_NAME = "root.snapshotTest";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeTest();
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterTest();
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
+ }
+
+ @Test
+ public void testTakeSnapshot()
+ throws SQLException, IllegalPathException, StorageEngineException, IOException,
+ DirectoryNotLegalException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set storage group to " + SG_NAME);
+ for (int i = 0; i < 10; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+ }
+ statement.execute("flush");
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+ }
+ statement.execute("flush");
+ }
+
+ DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+ File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+ if (snapshotDir.exists()) {
+ FileUtils.forceDelete(snapshotDir);
+ }
+
+ new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+
+ Assert.assertTrue(snapshotDir.exists());
+ Assert.assertTrue(snapshotDir.isDirectory());
+ File[] seqTsfiles =
+ snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile") && name.startsWith("seq"));
+ File[] unseqTsfiles =
+ snapshotDir.listFiles(
+ (dir, name) -> name.endsWith(".tsfile") && name.startsWith("unseq"));
+ File[] tsfileResources =
+ snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile.resource"));
+ Assert.assertNotNull(seqTsfiles);
+ Assert.assertNotNull(unseqTsfiles);
+ Assert.assertNotNull(tsfileResources);
+ Assert.assertEquals(10, seqTsfiles.length);
+ Assert.assertEquals(10, unseqTsfiles.length);
+ Assert.assertEquals(20, tsfileResources.length);
+ }
+ }
+
+ @Test(expected = DirectoryNotLegalException.class)
+ public void testTakeSnapshotInNotEmptyDir()
+ throws SQLException, IOException, IllegalPathException, StorageEngineException,
+ DirectoryNotLegalException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set storage group to " + SG_NAME);
+ for (int i = 0; i < 10; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+ }
+ statement.execute("flush");
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+ }
+ statement.execute("flush");
+ }
+
+ DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+ File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+
+ File tmpFile = new File(snapshotDir, "test");
+ tmpFile.createNewFile();
+
+ new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+ }
+ }
+
+ @Test
+ public void testLoadSnapshot()
+ throws SQLException, MetadataException, StorageEngineException, DirectoryNotLegalException,
+ IOException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ Map<String, Integer> resultMap = new HashMap<>();
+ statement.execute("set storage group to " + SG_NAME);
+ for (int i = 0; i < 10; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+ }
+ statement.execute("flush");
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+ }
+ statement.execute("flush");
+ }
+ ResultSet resultSet = statement.executeQuery("select ** from root");
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ for (int i = 0; i < 10; ++i) {
+ String measurment = SG_NAME + ".d" + i + ".s";
+ int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+ resultMap.put(time + measurment, res);
+ }
+ }
+
+ DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+ File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+ StorageEngineV2.getInstance()
+ .setDataRegion(
+ new DataRegionId(0),
+ new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0")
+ .loadSnapshotForStateMachine());
+
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ resultSet = statement.executeQuery("select ** from root");
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ for (int i = 0; i < 10; ++i) {
+ String measurment = SG_NAME + ".d" + i + ".s";
+ int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+ Assert.assertEquals(resultMap.get(time + measurment).intValue(), res);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testTakeAndLoadSnapshotWhenCompaction()
+ throws SQLException, MetadataException, StorageEngineException, InterruptedException,
+ DirectoryNotLegalException, IOException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ Map<String, Integer> resultMap = new HashMap<>();
+ statement.execute("set storage group to " + SG_NAME);
+ for (int i = 0; i < 10; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j));
+ }
+ statement.execute("flush");
+ for (int j = 0; j < 10; ++j) {
+ statement.execute(
+ String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1));
+ }
+ statement.execute("flush");
+ }
+
+ ResultSet resultSet = statement.executeQuery("select ** from root");
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ for (int i = 0; i < 10; ++i) {
+ String measurment = SG_NAME + ".d" + i + ".s";
+ int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+ resultMap.put(time + measurment, res);
+ }
+ }
+
+ File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot");
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
+ statement.execute("merge");
+ DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0));
+ new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+ region.abortCompaction();
+ StorageEngineV2.getInstance()
+ .setDataRegion(
+ new DataRegionId(0),
+ new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0")
+ .loadSnapshotForStateMachine());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ resultSet = statement.executeQuery("select ** from root");
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ for (int i = 0; i < 10; ++i) {
+ String measurment = SG_NAME + ".d" + i + ".s";
+ int res = resultSet.getInt(SG_NAME + ".d" + i + ".s");
+ Assert.assertEquals(resultMap.get(time + measurment).intValue(), res);
+ }
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 89110c4700..1decc0b84d 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -20,8 +20,12 @@
package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
+import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -39,7 +43,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER =
FragmentInstanceManager.getInstance();
- private final DataRegion region;
+ private DataRegion region;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
@@ -53,11 +57,39 @@ public class DataRegionStateMachine extends BaseStateMachine {
@Override
public boolean takeSnapshot(File snapshotDir) {
- return false;
+ try {
+ return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
+ } catch (Exception e) {
+ logger.error(
+ "Exception occurs when taking snapshot for {}-{} in {}",
+ region.getLogicalStorageGroupName(),
+ region.getDataRegionId(),
+ snapshotDir,
+ e);
+ return false;
+ }
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {}
+ public void loadSnapshot(File latestSnapshotRootDir) {
+ DataRegion newRegion =
+ new SnapshotLoader(
+ latestSnapshotRootDir.getAbsolutePath(),
+ region.getLogicalStorageGroupName(),
+ region.getDataRegionId())
+ .loadSnapshotForStateMachine();
+ if (newRegion == null) {
+ logger.error("Fail to load snapshot from {}", latestSnapshotRootDir);
+ return;
+ }
+ this.region = newRegion;
+ try {
+ StorageEngineV2.getInstance()
+ .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
+ } catch (Exception e) {
+ logger.error("Exception occurs when replacing data region in storage engine.", e);
+ }
+ }
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 30a716512f..94ea120d20 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -601,6 +601,19 @@ public class StorageEngineV2 implements IService {
return dataRegionMap.get(regionId);
}
+ public void setDataRegion(DataRegionId regionId, DataRegion newRegion) {
+ if (dataRegionMap.containsKey(regionId)) {
+ DataRegion oldRegion = dataRegionMap.get(regionId);
+ oldRegion.syncCloseAllWorkingTsFileProcessors();
+ oldRegion.abortCompaction();
+ }
+ dataRegionMap.put(regionId, newRegion);
+ }
+
+ public TsFileFlushPolicy getFileFlushPolicy() {
+ return fileFlushPolicy;
+ }
+
static class InstanceHolder {
private static final StorageEngineV2 INSTANCE = new StorageEngineV2();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
new file mode 100644
index 0000000000..476a645533
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class SnapshotLoader {
+ private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class);
+ private String storageGroupName;
+ private String snapshotPath;
+ private String dataRegionId;
+
+ public SnapshotLoader(String snapshotPath, String storageGroupName, String dataRegionId) {
+ this.snapshotPath = snapshotPath;
+ this.storageGroupName = storageGroupName;
+ this.dataRegionId = dataRegionId;
+ }
+
+ private DataRegion loadSnapshot() {
+ try {
+ return new DataRegion(
+ IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ + File.separator
+ + "storage_groups"
+ + File.separator
+ + storageGroupName,
+ dataRegionId,
+ StorageEngineV2.getInstance().getFileFlushPolicy(),
+ storageGroupName);
+ } catch (Exception e) {
+ LOGGER.error("Exception occurs while load snapshot from {}", snapshotPath, e);
+ return null;
+ }
+ }
+
+ /**
+ * 1. Clear origin data 2. Move snapshot data to data dir 3. Load data region
+ *
+ * @return
+ */
+ public DataRegion loadSnapshotForStateMachine() {
+ try {
+ deleteAllFilesInDataDirs();
+ } catch (IOException e) {
+ return null;
+ }
+
+ // move the snapshot data to data dir
+ String seqBaseDir =
+ IoTDBConstant.SEQUENCE_FLODER_NAME
+ + File.separator
+ + storageGroupName
+ + File.separator
+ + dataRegionId;
+ String unseqBaseDir =
+ IoTDBConstant.UNSEQUENCE_FLODER_NAME
+ + File.separator
+ + storageGroupName
+ + File.separator
+ + dataRegionId;
+ File sourceDataDir = new File(snapshotPath);
+ if (sourceDataDir.exists()) {
+ try {
+ createLinksFromSnapshotDirToDataDir(sourceDataDir, seqBaseDir, unseqBaseDir);
+ } catch (IOException | DiskSpaceInsufficientException e) {
+ LOGGER.error(
+ "Exception occurs when creating links from snapshot directory to data directory", e);
+ return null;
+ }
+ }
+
+ return loadSnapshot();
+ }
+
+ private void deleteAllFilesInDataDirs() throws IOException {
+ String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+
+ // delete
+ List<File> timePartitions = new ArrayList<>();
+ for (String dataDirPath : dataDirPaths) {
+ File seqDataDirForThisRegion =
+ new File(
+ dataDirPath
+ + File.separator
+ + IoTDBConstant.SEQUENCE_FLODER_NAME
+ + File.separator
+ + storageGroupName
+ + File.separator
+ + dataRegionId);
+ if (seqDataDirForThisRegion.exists()) {
+ File[] files = seqDataDirForThisRegion.listFiles();
+ if (files != null) {
+ timePartitions.addAll(Arrays.asList(files));
+ }
+ }
+
+ File unseqDataDirForThisRegion =
+ new File(
+ dataDirPath
+ + File.separator
+ + IoTDBConstant.UNSEQUENCE_FLODER_NAME
+ + File.separator
+ + storageGroupName
+ + File.separator
+ + dataRegionId);
+
+ if (unseqDataDirForThisRegion.exists()) {
+ File[] files = unseqDataDirForThisRegion.listFiles();
+ if (files != null) {
+ timePartitions.addAll(Arrays.asList(files));
+ }
+ }
+ }
+
+ try {
+ for (File timePartition : timePartitions) {
+ FileUtils.forceDelete(timePartition);
+ }
+ } catch (IOException e) {
+ LOGGER.error(
+ "Exception occurs when deleting time partition directory for {}-{}",
+ storageGroupName,
+ dataRegionId,
+ e);
+ throw e;
+ }
+ }
+
+ private void createLinksFromSnapshotDirToDataDir(
+ File sourceDir, String seqBaseDir, String unseqBaseDir)
+ throws IOException, DiskSpaceInsufficientException {
+ File[] files = sourceDir.listFiles();
+ if (files == null) {
+ return;
+ }
+ for (File sourceFile : files) {
+ String[] fileInfo = sourceFile.getName().split(SnapshotTaker.SNAPSHOT_FILE_INFO_SEP_STR);
+ if (fileInfo.length != 5) {
+ continue;
+ }
+ boolean seq = fileInfo[0].equals("seq");
+ String timePartition = fileInfo[3];
+ String fileName = fileInfo[4];
+ String nextDataDir =
+ seq
+ ? DirectoryManager.getInstance().getNextFolderForSequenceFile()
+ : DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
+ File baseDir = new File(nextDataDir, seq ? seqBaseDir : unseqBaseDir);
+ File targetDirForThisTimePartition = new File(baseDir, timePartition);
+ if (!targetDirForThisTimePartition.exists() && !targetDirForThisTimePartition.mkdirs()) {
+ throw new IOException(
+ String.format("Failed to make directory %s", targetDirForThisTimePartition));
+ }
+
+ File targetFile = new File(targetDirForThisTimePartition, fileName);
+ try {
+ Files.createLink(targetFile.toPath(), sourceFile.toPath());
+ } catch (IOException e) {
+ throw new IOException(
+ String.format("Failed to create hard link from %s to %s", sourceFile, targetFile), e);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
new file mode 100644
index 0000000000..1842b1b830
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * SnapshotTaker takes data snapshot for a DataRegion in one time. It does so by creating hard link
+ * for files or copying them. SnapshotTaker supports two different ways of snapshot: Full Snapshot
+ * and Incremental Snapshot. The former takes a snapshot for all files in an empty directory, and
+ * the latter takes a snapshot based on the snapshot that took before.
+ */
+public class SnapshotTaker {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class);
+ private final DataRegion dataRegion;
+ public static String SNAPSHOT_FILE_INFO_SEP_STR = "_";
+
+ public SnapshotTaker(DataRegion dataRegion) {
+ this.dataRegion = dataRegion;
+ }
+
+ public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot)
+ throws DirectoryNotLegalException, IOException {
+ File snapshotDir = new File(snapshotDirPath);
+ if (snapshotDir.exists()
+ && snapshotDir.listFiles() != null
+ && snapshotDir.listFiles().length > 0) {
+ // the directory should be empty or not exists
+ throw new DirectoryNotLegalException(
+ String.format("%s already exists and is not empty", snapshotDirPath));
+ }
+
+ if (!snapshotDir.exists() && !snapshotDir.mkdirs()) {
+ throw new IOException(String.format("Failed to create directory %s", snapshotDir));
+ }
+
+ if (flushBeforeSnapshot) {
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ }
+
+ List<Long> timePartitions = dataRegion.getTimePartitions();
+ for (Long timePartition : timePartitions) {
+ List<String> seqDataDirs = getAllDataDirOfOnePartition(true, timePartition);
+
+ try {
+ createFileSnapshot(seqDataDirs, snapshotDir, true, timePartition);
+ } catch (IOException e) {
+ LOGGER.error("Fail to create snapshot", e);
+ File[] files = snapshotDir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (!file.delete()) {
+ LOGGER.error("Failed to delete link file {} after failing to create snapshot", file);
+ }
+ }
+ }
+ return false;
+ }
+
+ List<String> unseqDataDirs = getAllDataDirOfOnePartition(false, timePartition);
+
+ try {
+ createFileSnapshot(unseqDataDirs, snapshotDir, false, timePartition);
+ } catch (IOException e) {
+ LOGGER.error("Fail to create snapshot", e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private List<String> getAllDataDirOfOnePartition(boolean sequence, long timePartition) {
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ List<String> resultDirs = new LinkedList<>();
+
+ for (String dataDir : dataDirs) {
+ resultDirs.add(
+ dataDir
+ + File.separator
+ + (sequence
+ ? IoTDBConstant.SEQUENCE_FLODER_NAME
+ : IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+ + File.separator
+ + dataRegion.getLogicalStorageGroupName()
+ + File.separator
+ + dataRegion.getDataRegionId()
+ + File.separator
+ + timePartition
+ + File.separator);
+ }
+ return resultDirs;
+ }
+
+ private void createFileSnapshot(
+ List<String> sourceDirPaths, File targetDir, boolean sequence, long timePartition)
+ throws IOException {
+ for (String sourceDirPath : sourceDirPaths) {
+ File sourceDir = new File(sourceDirPath);
+ if (!sourceDir.exists()) {
+ continue;
+ }
+ // Collect TsFile, TsFileResource, Mods, CompactionMods
+ File[] files =
+ sourceDir.listFiles(
+ (dir, name) ->
+ name.endsWith(".tsfile")
+ || name.endsWith(TsFileResource.RESOURCE_SUFFIX)
+ || name.endsWith(ModificationFile.FILE_SUFFIX)
+ || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX)
+ || name.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX)
+ || name.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX)
+ || name.endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)
+ || name.endsWith(IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX));
+ if (files == null || files.length == 0) {
+ continue;
+ }
+
+ for (File file : files) {
+ String newFileName =
+ (sequence ? "seq" : "unseq")
+ + SNAPSHOT_FILE_INFO_SEP_STR
+ + dataRegion.getLogicalStorageGroupName()
+ + SNAPSHOT_FILE_INFO_SEP_STR
+ + dataRegion.getDataRegionId()
+ + SNAPSHOT_FILE_INFO_SEP_STR
+ + timePartition
+ + SNAPSHOT_FILE_INFO_SEP_STR
+ + file.getName();
+ File linkFile = new File(targetDir, newFileName);
+ Files.createLink(linkFile.toPath(), file.toPath());
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java
new file mode 100644
index 0000000000..bd4742d9e5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.snapshot.exception;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DirectoryNotLegalException extends IoTDBException {
+ public DirectoryNotLegalException(String message) {
+ super(message, TSStatusCode.SNAPSHOT_DIR_NOT_LEGAL.getStatusCode());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 1fbd2ab5e0..9fbc911655 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3486,6 +3486,10 @@ public class DataRegion {
throws WriteProcessException;
}
+ public List<Long> getTimePartitions() {
+ return new ArrayList<>(partitionMaxFileVersions.keySet());
+ }
+
public String getInsertWriteLockHolder() {
return insertWriteLockHolder;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index b185838b32..946d827461 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -71,7 +71,7 @@ public class TsFileNameGenerator {
time, version, innerSpaceCompactionCount, crossSpaceCompactionCount);
}
- private static String generateTsFileDir(
+ public static String generateTsFileDir(
boolean sequence,
String logicalStorageGroup,
String virtualStorageGroup,
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f612f6732b..bb037da449 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -83,6 +83,7 @@ public enum TSStatusCode {
WRITE_PROCESS_ERROR(412),
WRITE_PROCESS_REJECT(413),
QUERY_ID_NOT_EXIST(414),
+ SNAPSHOT_DIR_NOT_LEGAL(415),
UNSUPPORTED_INDEX_FUNC_ERROR(421),
UNSUPPORTED_INDEX_TYPE_ERROR(422),