You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/04 11:44:12 UTC
[iotdb] branch master updated: [IOTDB-3094] Consensus Snapshot interface redesign (#5788)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a3e46a1be1 [IOTDB-3094] Consensus Snapshot interface redesign (#5788)
a3e46a1be1 is described below
commit a3e46a1be15419d165c6c0049f8b9fdad25cc41b
Author: SzyWilliam <48...@users.noreply.github.com>
AuthorDate: Wed May 4 19:44:07 2022 +0800
[IOTDB-3094] Consensus Snapshot interface redesign (#5788)
* consensus snapshot interface redesigned
* revise
---
.../statemachine/PartitionRegionStateMachine.java | 14 +--
.../org/apache/iotdb/consensus/IStateMachine.java | 40 ++-----
.../iotdb/consensus/common/SnapshotMeta.java | 54 ----------
.../ratis/ApplicationStateMachineProxy.java | 20 ++--
.../iotdb/consensus/ratis/SnapshotStorage.java | 56 ++++++++--
.../org/apache/iotdb/consensus/ratis/Utils.java | 14 +--
.../consensus/standalone/StandAloneServerImpl.java | 20 +---
.../apache/iotdb/consensus/EmptyStateMachine.java | 14 +--
.../apache/iotdb/consensus/ratis/TestUtils.java | 116 ++++-----------------
.../standalone/StandAloneConsensusTest.java | 13 +--
.../statemachine/DataRegionStateMachine.java | 14 +--
.../statemachine/SchemaRegionStateMachine.java | 14 +--
12 files changed, 105 insertions(+), 284 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index c2cd3c9df5..cb5048070a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc
import org.apache.iotdb.confignode.service.executor.ConfigRequestExecutor;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
/** Statemachine for PartitionRegion */
public class PartitionRegionStateMachine implements IStateMachine {
@@ -99,20 +97,12 @@ public class PartitionRegionStateMachine implements IStateMachine {
}
@Override
- public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ public boolean takeSnapshot(File snapshotDir) {
return false;
}
@Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return null;
- }
-
- @Override
- public void loadSnapshot(SnapshotMeta latest) {}
-
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {}
+ public void loadSnapshot(File latestSnapshotRootDir) {}
/** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
protected DataSet read(ConfigRequest plan) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index c090fa2a35..e12622f2ad 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -22,13 +22,11 @@ package org.apache.iotdb.consensus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
-import java.nio.ByteBuffer;
import java.util.function.Function;
@ThreadSafe
@@ -55,40 +53,18 @@ public interface IStateMachine {
DataSet read(IConsensusRequest IConsensusRequest);
/**
- * IConsensus will periodically take the snapshot on both log and statemachine Data
+ * Take a snapshot of current statemachine. All files are required to be stored under snapshotDir,
+ * which is a sub-directory of the StorageDir in Consensus
*
- * @param metadata the metadata IConsensus want IStateMachine to preserve. NOTICE: the more
- * updated snapshot will have lexicographically larger metadata. This property should be
- * guaranteed by every IConsensus implementation. IStateMachine can use the metadata to sort
- * or label snapshot. e.g, metadata is byteBuffer("123_456"), the statemachine can create a
- * directory ${snapshotDir}/123_456/ and store all files under this directory
- * @param snapshotDir the root dir of snapshot files
- * @return true if snapshot successfully taken
+ * @param snapshotDir required storage dir
+ * @return true if snapshot is successfully taken
*/
- boolean takeSnapshot(ByteBuffer metadata, File snapshotDir);
+ boolean takeSnapshot(File snapshotDir);
/**
- * When recover from crash / leader installSnapshot to follower, this method is called.
- * IStateMachine is required to find the latest snapshot in snapshotDir.
+ * Load the latest snapshot from given dir
*
- * @param snapshotDir the root dir of snapshot files
- * @return latest snapshot info (metadata + snapshot files)
+ * @param latestSnapshotRootDir dir where the latest snapshot sits
*/
- SnapshotMeta getLatestSnapshot(File snapshotDir);
-
- /**
- * When recover from crash / follower installSnapshot from leader, this method is called.
- * IStateMachine is required to load the given snapshot.
- *
- * @param latest is the latest snapshot given
- */
- void loadSnapshot(SnapshotMeta latest);
-
- /**
- * IConsensus will periodically clean up old snapshots. This method is called to inform
- * IStateMachine to remove out-dated snapshot.
- *
- * @param snapshotDir the root dir of snapshot files
- */
- void cleanUpOldSnapshots(File snapshotDir);
+ void loadSnapshot(File latestSnapshotRootDir);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
deleted file mode 100644
index e2fd0188d2..0000000000
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.consensus.common;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class SnapshotMeta {
- /**
- * metadata is the IConsensus metadata given when take this snapshot. More updated snapshot will
- * have lexicographically larger metadata.
- */
- private ByteBuffer metadata;
-
- private List<File> snapshotFiles;
-
- public SnapshotMeta(ByteBuffer metadata, List<File> snapshotFiles) {
- this.metadata = metadata;
- this.snapshotFiles = snapshotFiles;
- }
-
- public ByteBuffer getMetadata() {
- return metadata;
- }
-
- public void setMetadata(ByteBuffer metadata) {
- this.metadata = metadata;
- }
-
- public List<File> getSnapshotFiles() {
- return snapshotFiles;
- }
-
- public void setSnapshotFiles(List<File> snapshotFiles) {
- this.snapshotFiles = snapshotFiles;
- }
-}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 317029a740..123bd4f13d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
public class ApplicationStateMachineProxy extends BaseStateMachine {
@@ -66,14 +64,14 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
() -> {
snapshotStorage.init(storage);
this.statemachineDir = snapshotStorage.getStateMachineDir();
- loadSnapshot(applicationStateMachine.getLatestSnapshot(statemachineDir));
+ loadSnapshot(snapshotStorage.findLatestSnapshotDir());
});
}
@Override
public void reinitialize() throws IOException {
setLastAppliedTermIndex(null);
- loadSnapshot(applicationStateMachine.getLatestSnapshot(statemachineDir));
+ loadSnapshot(snapshotStorage.findLatestSnapshotDir());
if (getLifeCycleState() == LifeCycle.State.PAUSED) {
getLifeCycle().transition(LifeCycle.State.STARTING);
getLifeCycle().transition(LifeCycle.State.RUNNING);
@@ -136,8 +134,9 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
}
// require the application statemachine to take the latest snapshot
- ByteBuffer metadata = Utils.getMetadataFromTermIndex(lastApplied);
- boolean success = applicationStateMachine.takeSnapshot(metadata, statemachineDir);
+ String metadata = Utils.getMetadataFromTermIndex(lastApplied);
+ File snapshotDir = snapshotStorage.getSnapshotDir(metadata);
+ boolean success = applicationStateMachine.takeSnapshot(snapshotDir);
if (!success) {
return RaftLog.INVALID_LOG_INDEX;
}
@@ -145,15 +144,14 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
return lastApplied.getIndex();
}
- private void loadSnapshot(SnapshotMeta snapshot) {
- if (snapshot == null) {
+ private void loadSnapshot(File latestSnapshotDir) {
+ if (latestSnapshotDir == null) {
return;
}
// require the application statemachine to load the latest snapshot
- applicationStateMachine.loadSnapshot(snapshot);
- ByteBuffer metadata = snapshot.getMetadata();
- TermIndex snapshotTermIndex = Utils.getTermIndexFromMetadata(metadata);
+ applicationStateMachine.loadSnapshot(latestSnapshotDir);
+ TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir);
updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index c50a4b3446..8d6d2224a3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.consensus.IStateMachine;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.server.protocol.TermIndex;
@@ -29,15 +28,20 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
/**
* TODO: Warning, currently in Ratis 2.2.0, there is a bug in installSnapshot. In subsequent
@@ -60,16 +64,46 @@ public class SnapshotStorage implements StateMachineStorage {
this.stateMachineDir = raftStorage.getStorageDir().getStateMachineDir();
}
+ private Path[] getSortedSnapshotDirPaths() {
+ ArrayList<Path> snapshotPaths = new ArrayList<>();
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateMachineDir.toPath())) {
+ for (Path path : stream) {
+ snapshotPaths.add(path);
+ }
+ } catch (IOException exception) {
+ logger.warn("cannot construct snapshot directory stream ", exception);
+ return null;
+ }
+
+ Path[] pathArray = snapshotPaths.toArray(new Path[0]);
+ Arrays.sort(
+ pathArray,
+ (o1, o2) -> {
+ String index1 = o1.toFile().getName().split("_")[1];
+ String index2 = o2.toFile().getName().split("_")[1];
+ return Long.compare(Long.parseLong(index1), Long.parseLong(index2));
+ });
+ return pathArray;
+ }
+
+ public File findLatestSnapshotDir() {
+ Path[] snapshots = getSortedSnapshotDirPaths();
+ if (snapshots == null || snapshots.length == 0) {
+ return null;
+ }
+ return snapshots[snapshots.length - 1].toFile();
+ }
+
@Override
public SnapshotInfo getLatestSnapshot() {
- SnapshotMeta snapshotMeta = applicationStateMachine.getLatestSnapshot(stateMachineDir);
- if (snapshotMeta == null) {
+ File latestSnapshotDir = findLatestSnapshotDir();
+ if (latestSnapshotDir == null) {
return null;
}
- TermIndex snapshotTermIndex = Utils.getTermIndexFromMetadata(snapshotMeta.getMetadata());
+ TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir);
List<FileInfo> fileInfos = new ArrayList<>();
- for (File file : snapshotMeta.getSnapshotFiles()) {
+ for (File file : Objects.requireNonNull(latestSnapshotDir.listFiles())) {
Path filePath = file.toPath();
MD5Hash fileHash = null;
try {
@@ -91,10 +125,20 @@ public class SnapshotStorage implements StateMachineStorage {
@Override
public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
throws IOException {
- applicationStateMachine.cleanUpOldSnapshots(stateMachineDir);
+ Path[] sortedSnapshotDirs = getSortedSnapshotDirPaths();
+ if (sortedSnapshotDirs == null || sortedSnapshotDirs.length == 0) {
+ return;
+ }
+ for (int i = 0; i < sortedSnapshotDirs.length - 1; i++) {
+ FileUtils.deleteFully(sortedSnapshotDirs[i]);
+ }
}
public File getStateMachineDir() {
return stateMachineDir;
}
+
+ public File getSnapshotDir(String snapshotMetadata) {
+ return new File(stateMachineDir.getAbsolutePath() + File.separator + snapshotMetadata);
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index c0683572a9..0c3e5d984f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -31,9 +31,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TByteBuffer;
+import java.io.File;
import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
public class Utils {
private static final int tempBufferSize = 1024;
@@ -120,15 +119,12 @@ public class Utils {
return status;
}
- public static ByteBuffer getMetadataFromTermIndex(TermIndex termIndex) {
- String ordinal = String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
- return ByteBuffer.wrap(ordinal.getBytes());
+ public static String getMetadataFromTermIndex(TermIndex termIndex) {
+ return String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
}
- public static TermIndex getTermIndexFromMetadata(ByteBuffer metadata) {
- Charset charset = Charset.defaultCharset();
- CharBuffer charBuffer = charset.decode(metadata);
- String ordinal = charBuffer.toString();
+ public static TermIndex getTermIndexFromDir(File snapshotDir) {
+ String ordinal = snapshotDir.getName();
String[] items = ordinal.split("_");
return TermIndex.valueOf(Long.parseLong(items[0]), Long.parseLong(items[1]));
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index 4c55e9676b..309b86371b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -23,11 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import java.io.File;
-import java.nio.ByteBuffer;
public class StandAloneServerImpl implements IStateMachine {
@@ -68,22 +66,12 @@ public class StandAloneServerImpl implements IStateMachine {
}
@Override
- public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
- return stateMachine.takeSnapshot(metadata, snapshotDir);
+ public boolean takeSnapshot(File snapshotDir) {
+ return stateMachine.takeSnapshot(snapshotDir);
}
@Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return stateMachine.getLatestSnapshot(snapshotDir);
- }
-
- @Override
- public void loadSnapshot(SnapshotMeta latest) {
- stateMachine.loadSnapshot(latest);
- }
-
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {
- stateMachine.cleanUpOldSnapshots(snapshotDir);
+ public void loadSnapshot(File latestSnapshotRootDir) {
+ stateMachine.loadSnapshot(latestSnapshotRootDir);
}
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 118b057d1a..e1ea8fa1d4 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -21,11 +21,9 @@ package org.apache.iotdb.consensus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import java.io.File;
-import java.nio.ByteBuffer;
public class EmptyStateMachine implements IStateMachine {
@@ -46,18 +44,10 @@ public class EmptyStateMachine implements IStateMachine {
}
@Override
- public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ public boolean takeSnapshot(File snapshotDir) {
return false;
}
@Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return null;
- }
-
- @Override
- public void loadSnapshot(SnapshotMeta latest) {}
-
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {}
+ public void loadSnapshot(File latestSnapshotRootDir) {}
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 962dd84d95..eca57c4a19 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -19,27 +19,20 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicInteger;
@@ -97,107 +90,36 @@ public class TestUtils {
return dataSet;
}
- public static synchronized String ensureSnapshotFileName(File snapshotDir, String metadata) {
- File dir = new File(snapshotDir + File.separator + metadata);
- if (!(dir.exists() && dir.isDirectory())) {
- dir.mkdirs();
- }
- return dir.getPath() + File.separator + "snapshot." + metadata;
- }
-
@Override
- public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
- /**
- * When IStateMachine take the snapshot, it can directly use the metadata to name the snapshot
- * file. It's guaranteed that more up-to-date snapshot will have lexicographically larger
- * metadata.
- */
- String tempFilePath = snapshotDir + File.separator + ".tmp";
- String filePath = ensureSnapshotFileName(snapshotDir, new String(metadata.array()));
-
- File tempFile = new File(tempFilePath);
-
- try {
- FileWriter writer = new FileWriter(tempFile);
+ public boolean takeSnapshot(File snapshotDir) {
+ File snapshot = new File(snapshotDir.getAbsolutePath() + File.separator + "snapshot");
+ try (FileWriter writer = new FileWriter(snapshot)) {
writer.write(String.valueOf(integer.get()));
- writer.close();
- tempFile.renameTo(new File(filePath));
} catch (IOException e) {
- logger.error("take snapshot failed ", e);
+ logger.error("cannot open file writer of {}", snapshot);
return false;
}
return true;
}
- private Object[] getSortedPaths(File rootDir) {
- /**
- * When looking for the latest snapshot inside the directory, just list all filenames and sort
- * them.
- */
- ArrayList<Path> paths = new ArrayList<>();
- try {
- DirectoryStream<Path> stream = Files.newDirectoryStream(rootDir.toPath());
- for (Path path : stream) {
- paths.add(path);
- }
- } catch (IOException e) {
- logger.error("read directory failed ", e);
- }
-
- Object[] pathArray = paths.toArray();
- Arrays.sort(
- pathArray,
- new Comparator<Object>() {
- @Override
- public int compare(Object o1, Object o2) {
- Path path1 = (Path) o1;
- Path path2 = (Path) o2;
- String index1 = path1.toFile().getName().split("_")[1];
- String index2 = path2.toFile().getName().split("_")[1];
- return Long.compare(Long.parseLong(index1), Long.parseLong(index2));
- }
- });
- return pathArray;
- }
-
- @Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- Object[] pathArray = getSortedPaths(snapshotDir);
- if (pathArray.length == 0) {
- return null;
- }
- Path max = (Path) pathArray[pathArray.length - 1];
- String dirName = max.toFile().getName();
- File snapshotFile =
- new File(max.toFile().getAbsolutePath() + File.separator + "snapshot." + dirName);
-
- String ordinal = snapshotFile.getName().split("\\.")[1];
- ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
- return new SnapshotMeta(metadata, Collections.singletonList(snapshotFile));
- }
-
@Override
- public void loadSnapshot(SnapshotMeta latest) {
- try {
- Scanner scanner = new Scanner(latest.getSnapshotFiles().get(0));
- int snapshotValue = Integer.parseInt(scanner.next());
- integer.set(snapshotValue);
- scanner.close();
- } catch (IOException e) {
- logger.error("read file failed ", e);
+ public void loadSnapshot(File latestSnapshotRootDir) {
+ File snapshot =
+ new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + "snapshot");
+ try (Scanner scanner = new Scanner(snapshot)) {
+ integer.set(Integer.parseInt(scanner.next()));
+ } catch (FileNotFoundException e) {
+ logger.error("cannot find snapshot file {}", snapshot);
}
}
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {
- Object[] paths = getSortedPaths(snapshotDir);
- for (int i = 0; i < paths.length - 1; i++) {
- try {
- FileUtils.deleteFully((Path) paths[i]);
- } catch (IOException e) {
- logger.error("delete file failed ", e);
- }
+ @TestOnly
+ public static synchronized String ensureSnapshotFileName(File snapshotDir, String metadata) {
+ File dir = new File(snapshotDir + File.separator + metadata);
+ if (!(dir.exists() && dir.isDirectory())) {
+ dir.mkdirs();
}
+ return dir.getPath() + File.separator + "snapshot";
}
}
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 1222598444..5e814f7151 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -114,20 +113,12 @@ public class StandAloneConsensusTest {
}
@Override
- public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ public boolean takeSnapshot(File snapshotDir) {
return false;
}
@Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return null;
- }
-
- @Override
- public void loadSnapshot(SnapshotMeta latest) {}
-
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {}
+ public void loadSnapshot(File latestSnapshotRootDir) {}
}
@Before
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 35691c213b..a5aebf39e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
@@ -39,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.nio.ByteBuffer;
import java.util.Arrays;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -62,20 +60,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
public void stop() {}
@Override
- public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ public boolean takeSnapshot(File snapshotDir) {
return false;
}
@Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return null;
- }
-
- @Override
- public void loadSnapshot(SnapshotMeta latest) {}
-
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {}
+ public void loadSnapshot(File latestSnapshotRootDir) {}
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 68277cce8a..52f7c1e22b 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
@@ -32,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.nio.ByteBuffer;
public class SchemaRegionStateMachine extends BaseStateMachine {
@@ -53,20 +51,12 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
public void stop() {}
@Override
- public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ public boolean takeSnapshot(File snapshotDir) {
return false;
}
@Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return null;
- }
-
- @Override
- public void loadSnapshot(SnapshotMeta latest) {}
-
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {}
+ public void loadSnapshot(File latestSnapshotRootDir) {}
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {