You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/25 15:08:12 UTC
[iotdb] branch master updated: [IOTDB-2968] RatisConsensus snapshot implementation (#5623)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 94bded4094 [IOTDB-2968] RatisConsensus snapshot implementation (#5623)
94bded4094 is described below
commit 94bded40942d6d7bef28f0c8af7120fcb6c53015
Author: SzyWilliam <48...@users.noreply.github.com>
AuthorDate: Mon Apr 25 23:08:07 2022 +0800
[IOTDB-2968] RatisConsensus snapshot implementation (#5623)
* ratis snapshot impl
* format
* close resource
add warning
add comments
* fix reviews
* add snapshot UT
* add license
* fix
* fix
* fix reviews
* fix reviews
* fix reviews
---
.../statemachine/PartitionRegionStateMachine.java | 4 +-
.../ratis/ApplicationStateMachineProxy.java | 81 +++++++-
.../iotdb/consensus/ratis/RatisConsensus.java | 5 +
.../iotdb/consensus/ratis/SnapshotStorage.java | 100 ++++++++++
.../org/apache/iotdb/consensus/ratis/Utils.java | 17 ++
.../consensus/standalone/StandAloneServerImpl.java | 4 +-
.../consensus/statemachine/EmptyStateMachine.java | 4 +-
.../consensus/statemachine/IStateMachine.java | 6 +-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 134 ++++----------
.../apache/iotdb/consensus/ratis/SnapshotTest.java | 120 ++++++++++++
.../apache/iotdb/consensus/ratis/TestUtils.java | 203 +++++++++++++++++++++
.../standalone/StandAloneConsensusTest.java | 4 +-
.../statemachine/DataRegionStateMachine.java | 4 +-
.../statemachine/SchemaRegionStateMachine.java | 4 +-
14 files changed, 585 insertions(+), 105 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 73aa4410d1..a55f83432f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -99,7 +99,9 @@ public class PartitionRegionStateMachine implements IStateMachine {
}
@Override
- public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+ public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ return false;
+ }
@Override
public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 450ff481b6..53113293a0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -20,32 +20,75 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
public class ApplicationStateMachineProxy extends BaseStateMachine {
- private final IStateMachine applicationStateMachine;
private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+ private final IStateMachine applicationStateMachine;
+
+ // Raft Storage sub dir for statemachine data, default (_sm)
+ private File statemachineDir;
+ private final SnapshotStorage snapshotStorage;
public ApplicationStateMachineProxy(IStateMachine stateMachine) {
applicationStateMachine = stateMachine;
+ snapshotStorage = new SnapshotStorage(applicationStateMachine);
applicationStateMachine.start();
}
+ @Override
+ public void initialize(RaftServer raftServer, RaftGroupId raftGroupId, RaftStorage storage)
+ throws IOException {
+ getLifeCycle()
+ .startAndTransition(
+ () -> {
+ snapshotStorage.init(storage);
+ this.statemachineDir = snapshotStorage.getStateMachineDir();
+ loadSnapshot(applicationStateMachine.getLatestSnapshot(statemachineDir));
+ });
+ }
+
+ @Override
+ public void reinitialize() throws IOException {
+ setLastAppliedTermIndex(null);
+ loadSnapshot(applicationStateMachine.getLatestSnapshot(statemachineDir));
+ if (getLifeCycleState() == LifeCycle.State.PAUSED) {
+ getLifeCycle().transition(LifeCycle.State.STARTING);
+ getLifeCycle().transition(LifeCycle.State.RUNNING);
+ }
+ }
+
+ @Override
+ public void pause() {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
+
@Override
public void close() throws IOException {
- applicationStateMachine.stop();
+ getLifeCycle().checkStateAndClose(applicationStateMachine::stop);
}
@Override
@@ -84,4 +127,38 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
DataSet result = applicationStateMachine.read(requestMessage.getActualRequest());
return CompletableFuture.completedFuture(new ResponseMessage(result));
}
+
+ @Override
+ public long takeSnapshot() throws IOException {
+ final TermIndex lastApplied = getLastAppliedTermIndex();
+ if (lastApplied.getTerm() <= 0 || lastApplied.getIndex() <= 0) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ // require the application statemachine to take the latest snapshot
+ ByteBuffer metadata = Utils.getMetadataFromTermIndex(lastApplied);
+ boolean success = applicationStateMachine.takeSnapshot(metadata, statemachineDir);
+ if (!success) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ return lastApplied.getIndex();
+ }
+
+ private void loadSnapshot(SnapshotMeta snapshot) {
+ if (snapshot == null) {
+ return;
+ }
+
+ // require the application statemachine to load the latest snapshot
+ applicationStateMachine.loadSnapshot(snapshot);
+ ByteBuffer metadata = snapshot.getMetadata();
+ TermIndex snapshotTermIndex = Utils.getTermIndexFromMetadata(metadata);
+ updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex());
+ }
+
+ @Override
+ public StateMachineStorage getStateMachineStorage() {
+ return snapshotStorage;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d34eab94f1..0e4e368f03 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -98,6 +98,10 @@ class RatisConsensus implements IConsensus {
private static final int DEFAULT_PRIORITY = 0;
private static final int LEADER_PRIORITY = 1;
+ /**
+ * @param ratisStorageDir different groups of RatisConsensus Peer all share ratisStorageDir as
+ * root dir
+ */
public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
throws IOException {
// create a RaftPeer as endpoint of comm
@@ -105,6 +109,7 @@ class RatisConsensus implements IConsensus {
myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
+ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
// set the port which server listen to in RaftProperty object
final int port = NetUtils.createSocketAddr(address).getPort();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
new file mode 100644
index 0000000000..2fbc7916ca
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.SnapshotMeta;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+
+import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.util.MD5FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TODO: Warning, currently in Ratis 2.2.0, there is a bug in installSnapshot. In subsequent
+ * installSnapshot, a follower may fail to install while the leader assume it success. This bug will
+ * be triggered when the snapshot threshold is low. This is fixed in current Ratis Master, and
+ * hopefully will be introduced in Ratis 2.3.0.
+ */
+public class SnapshotStorage implements StateMachineStorage {
+ private IStateMachine applicationStateMachine;
+
+ private File stateMachineDir;
+ private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
+
+ public SnapshotStorage(IStateMachine applicationStateMachine) {
+ this.applicationStateMachine = applicationStateMachine;
+ }
+
+ @Override
+ public void init(RaftStorage raftStorage) throws IOException {
+ this.stateMachineDir = raftStorage.getStorageDir().getStateMachineDir();
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ SnapshotMeta snapshotMeta = applicationStateMachine.getLatestSnapshot(stateMachineDir);
+ if (snapshotMeta == null) {
+ return null;
+ }
+ TermIndex snapshotTermIndex = Utils.getTermIndexFromMetadata(snapshotMeta.getMetadata());
+
+ List<FileInfo> fileInfos = new ArrayList<>();
+ for (File file : snapshotMeta.getSnapshotFiles()) {
+ Path filePath = file.toPath();
+ MD5Hash fileHash = null;
+ try {
+ fileHash = MD5FileUtil.computeMd5ForFile(file);
+ } catch (IOException e) {
+ logger.error("read file info failed for snapshot file ", e);
+ }
+ FileInfo fileInfo = new FileInfo(filePath, fileHash);
+ fileInfos.add(fileInfo);
+ }
+
+ return new FileListSnapshotInfo(
+ fileInfos, snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex());
+ }
+
+ @Override
+ public void format() throws IOException {}
+
+ @Override
+ public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
+ throws IOException {
+ applicationStateMachine.cleanUpOldSnapshots(stateMachineDir);
+ }
+
+ public File getStateMachineDir() {
+ return stateMachineDir;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index ff3210717b..f1842a6dea 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -28,12 +28,15 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TByteBuffer;
import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -160,4 +163,18 @@ public class Utils {
status.read(protocol);
return status;
}
+
+ public static ByteBuffer getMetadataFromTermIndex(TermIndex termIndex) {
+ String ordinal = String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
+ ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
+ return metadata;
+ }
+
+ public static TermIndex getTermIndexFromMetadata(ByteBuffer metadata) {
+ Charset charset = Charset.defaultCharset();
+ CharBuffer charBuffer = charset.decode(metadata);
+ String ordinal = charBuffer.toString();
+ String[] items = ordinal.split("_");
+ return TermIndex.valueOf(Long.parseLong(items[0]), Long.parseLong(items[1]));
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index bb6220a769..be0dbaa6c7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -68,7 +68,9 @@ public class StandAloneServerImpl implements IStateMachine {
}
@Override
- public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+ public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ return false;
+ }
@Override
public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
index 1f66bf1d87..8693d1f6ce 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
@@ -46,7 +46,9 @@ public class EmptyStateMachine implements IStateMachine {
}
@Override
- public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+ public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ return false;
+ }
@Override
public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
index 5279e235a4..aab3e17bcd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
@@ -57,10 +57,12 @@ public interface IStateMachine {
* @param metadata the metadata IConsensus want IStateMachine to preserve. NOTICE: the more
* updated snapshot will have lexicographically larger metadata. This property should be
* guaranteed by every IConsensus implementation. IStateMachine can use the metadata to sort
- * or label snapshot.
+ * or label snapshot. e.g, metadata is byteBuffer("123_456"), the statemachine can create a
+ * directory ${snapshotDir}/123_456/ and store all files under this directory
* @param snapshotDir the root dir of snapshot files
+ * @return true if snapshot successfully taken
*/
- void takeSnapshot(ByteBuffer metadata, File snapshotDir);
+ boolean takeSnapshot(ByteBuffer metadata, File snapshotDir);
/**
* When recover from crash / leader installSnapshot to follower, this method is called.
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index ce9817b6f8..282141b878 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -19,20 +19,15 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.ConsensusGroup;
-import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -50,79 +45,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
public class RatisConsensusTest {
private static final String RATIS_CLASS_NAME = "org.apache.iotdb.consensus.ratis.RatisConsensus";
- private static class TestDataSet implements DataSet {
- private int number;
-
- public void setNumber(int number) {
- this.number = number;
- }
-
- public int getNumber() {
- return number;
- }
- }
-
- private static class TestRequest {
- private final int cmd;
-
- public TestRequest(ByteBuffer buffer) {
- cmd = buffer.getInt();
- }
-
- public boolean isIncr() {
- return cmd == 1;
- }
- }
-
- private static class IntegerCounter implements IStateMachine {
- AtomicInteger integer;
-
- @Override
- public void start() {
- integer = new AtomicInteger(0);
- }
-
- @Override
- public void stop() {}
-
- @Override
- public TSStatus write(IConsensusRequest IConsensusRequest) {
- ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
- TestRequest testRequest = new TestRequest(request.getContent());
- if (testRequest.isIncr()) {
- integer.incrementAndGet();
- }
- return new TSStatus(200);
- }
-
- @Override
- public DataSet read(IConsensusRequest IConsensusRequest) {
- TestDataSet dataSet = new TestDataSet();
- dataSet.setNumber(integer.get());
- return dataSet;
- }
-
- @Override
- public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
-
- @Override
- public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return null;
- }
-
- @Override
- public void loadSnapshot(SnapshotMeta latest) {}
-
- @Override
- public void cleanUpOldSnapshots(File snapshotDir) {}
- }
-
private ConsensusGroupId gid;
private List<Peer> peers;
private List<File> peersStorage;
@@ -133,6 +60,22 @@ public class RatisConsensusTest {
private Peer peer2;
CountDownLatch latch;
+ private void makeServers() throws IOException {
+ for (int i = 0; i < 3; i++) {
+ servers.add(
+ ConsensusFactory.getConsensusImpl(
+ RATIS_CLASS_NAME,
+ peers.get(i).getEndpoint(),
+ peersStorage.get(i),
+ groupId -> new TestUtils.IntegerCounter())
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(ConsensusFactory.CONSTRUCT_FAILED_MSG, RATIS_CLASS_NAME))));
+ servers.get(i).start();
+ }
+ }
+
@Before
public void setUp() throws IOException {
gid = new DataRegionId(1);
@@ -152,19 +95,7 @@ public class RatisConsensusTest {
}
group = new ConsensusGroup(gid, peers);
servers = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- servers.add(
- ConsensusFactory.getConsensusImpl(
- RATIS_CLASS_NAME,
- peers.get(i).getEndpoint(),
- peersStorage.get(i),
- groupId -> new IntegerCounter())
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(ConsensusFactory.CONSTRUCT_FAILED_MSG, RATIS_CLASS_NAME))));
- servers.get(i).start();
- }
+ makeServers();
}
@After
@@ -180,15 +111,15 @@ public class RatisConsensusTest {
@Test
public void basicConsensus() throws Exception {
- // 4. Add a new group
+ // 1. Add a new group
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
- // 5. Do Consensus 10
+ // 2. Do Consensus 10
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
- // 6. Remove two Peers from Group (peer 0 and peer 2)
+ // 3. Remove two Peers from Group (peer 0 and peer 2)
// transfer the leader to peer1
servers.get(0).transferLeader(gid, peer1);
Assert.assertTrue(servers.get(1).isLeader(gid));
@@ -200,10 +131,10 @@ public class RatisConsensusTest {
servers.get(2).removeConsensusGroup(gid);
Assert.assertEquals(servers.get(1).getLeader(gid).getEndpoint(), peers.get(1).getEndpoint());
- // 7. try consensus again with one peer
+ // 4. try consensus again with one peer
doConsensus(servers.get(1), gid, 10, 20);
- // 8. add two peers back
+ // 5. add two peers back
// first notify these new peers, let them initialize
servers.get(0).addConsensusGroup(gid, peers);
servers.get(2).addConsensusGroup(gid, peers);
@@ -211,17 +142,30 @@ public class RatisConsensusTest {
servers.get(1).addPeer(gid, peer0);
servers.get(1).addPeer(gid, peer2);
- // 9. try consensus with all 3 peers
+ // 6. try consensus with all 3 peers
doConsensus(servers.get(2), gid, 10, 30);
- // 10. again, group contains only peer0
+ // 7. again, group contains only peer0
servers.get(0).transferLeader(group.getGroupId(), peer0);
servers.get(0).changePeer(group.getGroupId(), Collections.singletonList(peer0));
servers.get(1).removeConsensusGroup(group.getGroupId());
servers.get(2).removeConsensusGroup(group.getGroupId());
- // 11. try consensus with only peer0
+ // 8. try consensus with only peer0
doConsensus(servers.get(0), gid, 10, 40);
+
+ // 9. shutdown all the servers
+ for (IConsensus consensus : servers) {
+ consensus.stop();
+ }
+ servers.clear();
+
+ // 10. start again and verify the snapshot
+ makeServers();
+ servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
+ servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
+ servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
+ doConsensus(servers.get(0), gid, 10, 50);
}
private void doConsensus(IConsensus consensus, ConsensusGroupId gid, int count, int target)
@@ -275,7 +219,7 @@ public class RatisConsensusTest {
// Check we reached a consensus
ConsensusReadResponse response = leader.read(gid, getReq);
- TestDataSet result = (TestDataSet) response.getDataset();
+ TestUtils.TestDataSet result = (TestUtils.TestDataSet) response.getDataset();
Assert.assertEquals(target, result.getNumber());
}
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
new file mode 100644
index 0000000000..3af8c5df5e
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.server.storage.RaftStorageMetadataFile;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+public class SnapshotTest {
+
+ private static final File testDir = new File("target" + File.separator + "sm");
+
+ // Mock Storage which only provides the state machine dir
+ private static class EmptyStorageWithOnlySMDir implements RaftStorage {
+
+ @Override
+ public RaftStorageDirectory getStorageDir() {
+ return new RaftStorageDirectory() {
+ @Override
+ public File getRoot() {
+ return null;
+ }
+
+ @Override
+ public boolean isHealthy() {
+ return false;
+ }
+
+ @Override
+ public File getStateMachineDir() {
+ return testDir;
+ }
+ };
+ }
+
+ @Override
+ public RaftStorageMetadataFile getMetadataFile() {
+ return null;
+ }
+
+ @Override
+ public RaftServerConfigKeys.Log.CorruptionPolicy getLogCorruptionPolicy() {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ FileUtils.deleteFully(testDir);
+ FileUtils.createDirectories(testDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.deleteFully(testDir);
+ }
+
+ @Test
+ public void testSnapshot() throws Exception {
+ ApplicationStateMachineProxy proxy =
+ new ApplicationStateMachineProxy(new TestUtils.IntegerCounter());
+
+ proxy.initialize(null, null, new EmptyStorageWithOnlySMDir());
+
+ // take a snapshot at 421-616
+ proxy.notifyTermIndexUpdated(421, 616);
+ String snapshotFilename = TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "421_616");
+ long index = proxy.takeSnapshot();
+ Assert.assertEquals(index, 616);
+ Assert.assertTrue(new File(snapshotFilename).exists());
+
+ // take a snapshot at 616-4217
+ proxy.notifyTermIndexUpdated(616, 4217);
+ String snapshotFilenameLatest =
+ TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "616_4217");
+ long indexLatest = proxy.takeSnapshot();
+ Assert.assertEquals(indexLatest, 4217);
+ Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+
+ // query the latest snapshot
+ SnapshotInfo info = proxy.getLatestSnapshot();
+ Assert.assertEquals(info.getTerm(), 616);
+ Assert.assertEquals(info.getIndex(), 4217);
+ Assert.assertTrue(info.getFiles().get(0).getPath().endsWith(snapshotFilenameLatest));
+
+ // clean up
+ proxy.getStateMachineStorage().cleanupOldSnapshots(null);
+ Assert.assertFalse(new File(snapshotFilename).exists());
+ Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+ }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
new file mode 100644
index 0000000000..fe209ba019
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestUtils {
+ static class TestDataSet implements DataSet {
+ private int number;
+
+ public void setNumber(int number) {
+ this.number = number;
+ }
+
+ public int getNumber() {
+ return number;
+ }
+ }
+
+ static class TestRequest {
+ private final int cmd;
+
+ public TestRequest(ByteBuffer buffer) {
+ cmd = buffer.getInt();
+ }
+
+ public boolean isIncr() {
+ return cmd == 1;
+ }
+ }
+
+ static class IntegerCounter implements IStateMachine {
+ private AtomicInteger integer;
+ private final Logger logger = LoggerFactory.getLogger(IntegerCounter.class);
+
+ @Override
+ public void start() {
+ integer = new AtomicInteger(0);
+ }
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public TSStatus write(IConsensusRequest IConsensusRequest) {
+ ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+ TestRequest testRequest = new TestRequest(request.getContent());
+ if (testRequest.isIncr()) {
+ integer.incrementAndGet();
+ }
+ return new TSStatus(200);
+ }
+
+ @Override
+ public DataSet read(IConsensusRequest IConsensusRequest) {
+ TestDataSet dataSet = new TestDataSet();
+ dataSet.setNumber(integer.get());
+ return dataSet;
+ }
+
+ public static synchronized String ensureSnapshotFileName(File snapshotDir, String metadata) {
+ File dir = new File(snapshotDir + File.separator + metadata);
+ if (!(dir.exists() && dir.isDirectory())) {
+ dir.mkdirs();
+ }
+ return dir.getPath() + File.separator + "snapshot." + metadata;
+ }
+
+ @Override
+ public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ /**
+ * When IStateMachine take the snapshot, it can directly use the metadata to name the snapshot
+ * file. It's guaranteed that more up-to-date snapshot will have lexicographically larger
+ * metadata.
+ */
+ String tempFilePath = snapshotDir + File.separator + ".tmp";
+ String filePath = ensureSnapshotFileName(snapshotDir, new String(metadata.array()));
+
+ File tempFile = new File(tempFilePath);
+
+ try {
+ FileWriter writer = new FileWriter(tempFile);
+ writer.write(String.valueOf(integer.get()));
+ writer.close();
+ tempFile.renameTo(new File(filePath));
+ } catch (IOException e) {
+ logger.error("take snapshot failed ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private Object[] getSortedPaths(File rootDir) {
+ /**
+ * When looking for the latest snapshot inside the directory, just list all filenames and sort
+ * them.
+ */
+ ArrayList<Path> paths = new ArrayList<>();
+ try {
+ DirectoryStream<Path> stream = Files.newDirectoryStream(rootDir.toPath());
+ for (Path path : stream) {
+ paths.add(path);
+ }
+ } catch (IOException e) {
+ logger.error("read directory failed ", e);
+ }
+
+ Object[] pathArray = paths.toArray();
+ Arrays.sort(
+ pathArray,
+ new Comparator<Object>() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ Path path1 = (Path) o1;
+ Path path2 = (Path) o2;
+ String index1 = path1.toFile().getName().split("_")[1];
+ String index2 = path2.toFile().getName().split("_")[1];
+ return Long.compare(Long.parseLong(index1), Long.parseLong(index2));
+ }
+ });
+ return pathArray;
+ }
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ Object[] pathArray = getSortedPaths(snapshotDir);
+ if (pathArray.length == 0) {
+ return null;
+ }
+ Path max = (Path) pathArray[pathArray.length - 1];
+ String dirName = max.toFile().getName();
+ File snapshotFile =
+ new File(max.toFile().getAbsolutePath() + File.separator + "snapshot." + dirName);
+
+ String ordinal = snapshotFile.getName().split("\\.")[1];
+ ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
+ return new SnapshotMeta(metadata, Collections.singletonList(snapshotFile));
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {
+ try {
+ Scanner scanner = new Scanner(latest.getSnapshotFiles().get(0));
+ int snapshotValue = Integer.parseInt(scanner.next());
+ integer.set(snapshotValue);
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("read file failed ", e);
+ }
+ }
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {
+ Object[] paths = getSortedPaths(snapshotDir);
+ for (int i = 0; i < paths.length - 1; i++) {
+ try {
+ FileUtils.deleteFully((Path) paths[i]);
+ } catch (IOException e) {
+ logger.error("delete file failed ", e);
+ }
+ }
+ }
+ }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 1482f21c04..0275868843 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -112,7 +112,9 @@ public class StandAloneConsensusTest {
}
@Override
- public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+ public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ return false;
+ }
@Override
public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 0d99a2d6c2..789cbeafa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -62,7 +62,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
public void stop() {}
@Override
- public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+ public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ return false;
+ }
@Override
public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 26f24b03da..c5c753bfa2 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -53,7 +53,9 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
public void stop() {}
@Override
- public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+ public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+ return false;
+ }
@Override
public SnapshotMeta getLatestSnapshot(File snapshotDir) {