You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/24 10:04:06 UTC
[iotdb] branch master updated: [IOTDB-4027] Support cross disk link in RatisConsensus (#7056)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7ba111cebb [IOTDB-4027] Support cross disk link in RatisConsensus (#7056)
7ba111cebb is described below
commit 7ba111cebbd6e2861086f780470715db2bc37b4d
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Aug 24 18:04:00 2022 +0800
[IOTDB-4027] Support cross disk link in RatisConsensus (#7056)
---
.../org/apache/iotdb/consensus/IStateMachine.java | 12 ++--
.../org/apache/iotdb/consensus/common/Utils.java | 77 ++++++++++++++++++++++
.../iotdb/consensus/ratis/SnapshotStorage.java | 51 ++------------
.../apache/iotdb/consensus/ratis/SnapshotTest.java | 63 ++++++++++++++++++
.../statemachine/DataRegionStateMachine.java | 6 +-
5 files changed, 157 insertions(+), 52 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 3989272dee..9ce532c999 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -24,11 +24,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.Utils;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
+import java.nio.file.Path;
import java.util.List;
import java.util.function.Function;
@@ -76,18 +78,18 @@ public interface IStateMachine {
void loadSnapshot(File latestSnapshotRootDir);
/**
- * given a snapshot dir, ask statemachine to provide all snapshot files.
+ * given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list
+ * all files recursively under latestSnapshotDir
*
* <p>DataRegion may take snapshot at a different disk and only store a log file containing file
* paths. So statemachine is required to read the log file and provide the real snapshot file
* paths.
*
* @param latestSnapshotRootDir dir where the latest snapshot sits
- * @return List of real snapshot files. If the returned list is null, consensus implementations
- * will visit and add all files under this give latestSnapshotRootDir.
+ * @return List of real snapshot files.
*/
- default List<File> getSnapshotFiles(File latestSnapshotRootDir) {
- return null;
+ default List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
+ return Utils.listAllRegularFilesRecursively(latestSnapshotRootDir);
}
/** An optional API for event notifications. */
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
new file mode 100644
index 0000000000..96d381949e
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.FileVisitor;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+ private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+ public static List<Path> listAllRegularFilesRecursively(File rootDir) {
+ List<Path> allFiles = new ArrayList<>();
+ try {
+ Files.walkFileTree(
+ rootDir.toPath(),
+ new FileVisitor<Path>() {
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+ throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+ throws IOException {
+ if (attrs.isRegularFile()) {
+ allFiles.add(file);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+ logger.info("visit file {} failed due to {}", file.toAbsolutePath(), exc);
+ return FileVisitResult.TERMINATE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc)
+ throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException ioException) {
+ logger.error("IOException occurred during listing snapshot directory: ", ioException);
+ return Collections.emptyList();
+ }
+ return allFiles;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index de0f70a427..60d117dee3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -34,14 +34,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
-import java.nio.file.FileVisitResult;
-import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
public class SnapshotStorage implements StateMachineStorage {
@@ -107,46 +103,6 @@ public class SnapshotStorage implements StateMachineStorage {
return i < 0 ? null : snapshots[i].toFile();
}
- private List<Path> getAllFilesUnder(File rootDir) {
- List<Path> allFiles = new ArrayList<>();
- try {
- Files.walkFileTree(
- rootDir.toPath(),
- new FileVisitor<Path>() {
- @Override
- public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
- throws IOException {
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
- throws IOException {
- if (attrs.isRegularFile()) {
- allFiles.add(file);
- }
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
- logger.info("visit file {} failed due to {}", file.toAbsolutePath(), exc);
- return FileVisitResult.TERMINATE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc)
- throws IOException {
- return FileVisitResult.CONTINUE;
- }
- });
- } catch (IOException ioException) {
- logger.error("IOException occurred during listing snapshot directory: ", ioException);
- return Collections.emptyList();
- }
- return allFiles;
- }
-
@Override
public SnapshotInfo getLatestSnapshot() {
File latestSnapshotDir = findLatestSnapshotDir();
@@ -155,8 +111,13 @@ public class SnapshotStorage implements StateMachineStorage {
}
TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir);
+ List<Path> actualSnapshotFiles = applicationStateMachine.getSnapshotFiles(latestSnapshotDir);
+ if (actualSnapshotFiles == null) {
+ return null;
+ }
+
List<FileInfo> fileInfos = new ArrayList<>();
- for (Path file : getAllFilesUnder(latestSnapshotDir)) {
+ for (Path file : actualSnapshotFiles) {
if (file.endsWith(".md5")) {
continue;
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index 204a6725c4..fc89eebd4d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -30,7 +30,14 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Scanner;
public class SnapshotTest {
@@ -132,4 +139,60 @@ public class SnapshotTest {
+ ".ratis_meta."
+ termIndexMeta;
}
+
+ static class CrossDiskLinkStatemachine extends TestUtils.IntegerCounter {
+ @Override
+ public boolean takeSnapshot(File snapshotDir) {
+ /*
+ * Simulate the cross disk link snapshot
+ * create a real snapshot file and a log file recording real snapshot file path
+ */
+ File snapshotRaw = new File(snapshotDir.getAbsolutePath() + File.separator + "snapshot");
+ File snapshotRecord = new File(snapshotDir.getAbsolutePath() + File.separator + "record");
+ try {
+ Assert.assertTrue(snapshotRaw.createNewFile());
+ FileWriter writer = new FileWriter(snapshotRecord);
+ writer.write(snapshotRaw.getAbsolutePath());
+ writer.close();
+ } catch (IOException ioException) {
+ ioException.printStackTrace();
+ }
+ return true;
+ }
+
+ @Override
+ public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
+ File log = new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + "record");
+ Assert.assertTrue(log.exists());
+ Scanner scanner = null;
+ String actualSnapshotPath = null;
+ try {
+ scanner = new Scanner(log);
+ actualSnapshotPath = scanner.nextLine();
+ scanner.close();
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ }
+ Assert.assertNotNull(scanner);
+
+ return Collections.singletonList(Paths.get(actualSnapshotPath));
+ }
+ }
+
+ @Test
+ public void testCrossDiskLinkSnapshot() throws Exception {
+ ApplicationStateMachineProxy proxy =
+ new ApplicationStateMachineProxy(new CrossDiskLinkStatemachine(), null);
+
+ proxy.initialize(null, null, new EmptyStorageWithOnlySMDir());
+ proxy.notifyTermIndexUpdated(20, 1005);
+ proxy.takeSnapshot();
+ String actualSnapshotName =
+ CrossDiskLinkStatemachine.ensureSnapshotFileName(testDir, "20_1005");
+ File actualSnapshotFile = new File(actualSnapshotName);
+ Assert.assertEquals(proxy.getLatestSnapshot().getFiles().size(), 1);
+ Assert.assertEquals(
+ proxy.getLatestSnapshot().getFiles().get(0).getPath().toFile().getAbsolutePath(),
+ actualSnapshotFile.getAbsolutePath());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 6c52fa73c3..d5cf5e4533 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -57,6 +58,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -258,13 +260,13 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
@Override
- public List<File> getSnapshotFiles(File latestSnapshotRootDir) {
+ public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
try {
return new SnapshotLoader(
latestSnapshotRootDir.getAbsolutePath(),
region.getStorageGroupName(),
region.getDataRegionId())
- .getSnapshotFileInfo();
+ .getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
} catch (IOException e) {
logger.error(
"Meets error when getting snapshot files for {}-{}",