You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/25 15:08:12 UTC

[iotdb] branch master updated: [IOTDB-2968] RatisConsensus snapshot implementation (#5623)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 94bded4094 [IOTDB-2968] RatisConsensus snapshot implementation (#5623)
94bded4094 is described below

commit 94bded40942d6d7bef28f0c8af7120fcb6c53015
Author: SzyWilliam <48...@users.noreply.github.com>
AuthorDate: Mon Apr 25 23:08:07 2022 +0800

    [IOTDB-2968] RatisConsensus snapshot implementation (#5623)
    
    * ratis snapshot impl
    
    * format
    
    * close resource
    
    add warning
    
    add comments
    
    * fix reviews
    
    * add snapshot UT
    
    * add license
    
    * fix
    
    * fix
    
    * fix reviews
    
    * fix reviews
    
    * fix reviews
---
 .../statemachine/PartitionRegionStateMachine.java  |   4 +-
 .../ratis/ApplicationStateMachineProxy.java        |  81 +++++++-
 .../iotdb/consensus/ratis/RatisConsensus.java      |   5 +
 .../iotdb/consensus/ratis/SnapshotStorage.java     | 100 ++++++++++
 .../org/apache/iotdb/consensus/ratis/Utils.java    |  17 ++
 .../consensus/standalone/StandAloneServerImpl.java |   4 +-
 .../consensus/statemachine/EmptyStateMachine.java  |   4 +-
 .../consensus/statemachine/IStateMachine.java      |   6 +-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  | 134 ++++----------
 .../apache/iotdb/consensus/ratis/SnapshotTest.java | 120 ++++++++++++
 .../apache/iotdb/consensus/ratis/TestUtils.java    | 203 +++++++++++++++++++++
 .../standalone/StandAloneConsensusTest.java        |   4 +-
 .../statemachine/DataRegionStateMachine.java       |   4 +-
 .../statemachine/SchemaRegionStateMachine.java     |   4 +-
 14 files changed, 585 insertions(+), 105 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 73aa4410d1..a55f83432f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -99,7 +99,9 @@ public class PartitionRegionStateMachine implements IStateMachine {
   }
 
   @Override
-  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+  public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+    return false;
+  }
 
   @Override
   public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 450ff481b6..53113293a0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -20,32 +20,75 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
 public class ApplicationStateMachineProxy extends BaseStateMachine {
-  private final IStateMachine applicationStateMachine;
   private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+  private final IStateMachine applicationStateMachine;
+
+  // Raft Storage sub dir for statemachine data, default (_sm)
+  private File statemachineDir;
+  private final SnapshotStorage snapshotStorage;
 
   public ApplicationStateMachineProxy(IStateMachine stateMachine) {
     applicationStateMachine = stateMachine;
+    snapshotStorage = new SnapshotStorage(applicationStateMachine);
     applicationStateMachine.start();
   }
 
+  @Override
+  public void initialize(RaftServer raftServer, RaftGroupId raftGroupId, RaftStorage storage)
+      throws IOException {
+    getLifeCycle()
+        .startAndTransition(
+            () -> {
+              snapshotStorage.init(storage);
+              this.statemachineDir = snapshotStorage.getStateMachineDir();
+              loadSnapshot(applicationStateMachine.getLatestSnapshot(statemachineDir));
+            });
+  }
+
+  @Override
+  public void reinitialize() throws IOException {
+    setLastAppliedTermIndex(null);
+    loadSnapshot(applicationStateMachine.getLatestSnapshot(statemachineDir));
+    if (getLifeCycleState() == LifeCycle.State.PAUSED) {
+      getLifeCycle().transition(LifeCycle.State.STARTING);
+      getLifeCycle().transition(LifeCycle.State.RUNNING);
+    }
+  }
+
+  @Override
+  public void pause() {
+    getLifeCycle().transition(LifeCycle.State.PAUSING);
+    getLifeCycle().transition(LifeCycle.State.PAUSED);
+  }
+
   @Override
   public void close() throws IOException {
-    applicationStateMachine.stop();
+    getLifeCycle().checkStateAndClose(applicationStateMachine::stop);
   }
 
   @Override
@@ -84,4 +127,38 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
     DataSet result = applicationStateMachine.read(requestMessage.getActualRequest());
     return CompletableFuture.completedFuture(new ResponseMessage(result));
   }
+
+  @Override
+  public long takeSnapshot() throws IOException {
+    final TermIndex lastApplied = getLastAppliedTermIndex();
+    if (lastApplied.getTerm() <= 0 || lastApplied.getIndex() <= 0) {
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+
+    // require the application statemachine to take the latest snapshot
+    ByteBuffer metadata = Utils.getMetadataFromTermIndex(lastApplied);
+    boolean success = applicationStateMachine.takeSnapshot(metadata, statemachineDir);
+    if (!success) {
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+
+    return lastApplied.getIndex();
+  }
+
+  private void loadSnapshot(SnapshotMeta snapshot) {
+    if (snapshot == null) {
+      return;
+    }
+
+    // require the application statemachine to load the latest snapshot
+    applicationStateMachine.loadSnapshot(snapshot);
+    ByteBuffer metadata = snapshot.getMetadata();
+    TermIndex snapshotTermIndex = Utils.getTermIndexFromMetadata(metadata);
+    updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex());
+  }
+
+  @Override
+  public StateMachineStorage getStateMachineStorage() {
+    return snapshotStorage;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d34eab94f1..0e4e368f03 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -98,6 +98,10 @@ class RatisConsensus implements IConsensus {
   private static final int DEFAULT_PRIORITY = 0;
   private static final int LEADER_PRIORITY = 1;
 
+  /**
+   * @param ratisStorageDir different groups of RatisConsensus Peer all share ratisStorageDir as
+   *     root dir
+   */
   public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
       throws IOException {
     // create a RaftPeer as endpoint of comm
@@ -105,6 +109,7 @@ class RatisConsensus implements IConsensus {
     myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
 
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
 
     // set the port which server listen to in RaftProperty object
     final int port = NetUtils.createSocketAddr(address).getPort();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
new file mode 100644
index 0000000000..2fbc7916ca
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.SnapshotMeta;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+
+import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.util.MD5FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TODO: Warning, currently in Ratis 2.2.0, there is a bug in installSnapshot. In subsequent
+ * installSnapshot, a follower may fail to install while the leader assume it success. This bug will
+ * be triggered when the snapshot threshold is low. This is fixed in current Ratis Master, and
+ * hopefully will be introduced in Ratis 2.3.0.
+ */
+public class SnapshotStorage implements StateMachineStorage {
+  private IStateMachine applicationStateMachine;
+
+  private File stateMachineDir;
+  private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
+
+  public SnapshotStorage(IStateMachine applicationStateMachine) {
+    this.applicationStateMachine = applicationStateMachine;
+  }
+
+  @Override
+  public void init(RaftStorage raftStorage) throws IOException {
+    this.stateMachineDir = raftStorage.getStorageDir().getStateMachineDir();
+  }
+
+  @Override
+  public SnapshotInfo getLatestSnapshot() {
+    SnapshotMeta snapshotMeta = applicationStateMachine.getLatestSnapshot(stateMachineDir);
+    if (snapshotMeta == null) {
+      return null;
+    }
+    TermIndex snapshotTermIndex = Utils.getTermIndexFromMetadata(snapshotMeta.getMetadata());
+
+    List<FileInfo> fileInfos = new ArrayList<>();
+    for (File file : snapshotMeta.getSnapshotFiles()) {
+      Path filePath = file.toPath();
+      MD5Hash fileHash = null;
+      try {
+        fileHash = MD5FileUtil.computeMd5ForFile(file);
+      } catch (IOException e) {
+        logger.error("read file info failed for snapshot file ", e);
+      }
+      FileInfo fileInfo = new FileInfo(filePath, fileHash);
+      fileInfos.add(fileInfo);
+    }
+
+    return new FileListSnapshotInfo(
+        fileInfos, snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex());
+  }
+
+  @Override
+  public void format() throws IOException {}
+
+  @Override
+  public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
+      throws IOException {
+    applicationStateMachine.cleanUpOldSnapshots(stateMachineDir);
+  }
+
+  public File getStateMachineDir() {
+    return stateMachineDir;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index ff3210717b..f1842a6dea 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -28,12 +28,15 @@ import org.apache.iotdb.consensus.common.Peer;
 
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TByteBuffer;
 
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
@@ -160,4 +163,18 @@ public class Utils {
     status.read(protocol);
     return status;
   }
+
+  public static ByteBuffer getMetadataFromTermIndex(TermIndex termIndex) {
+    String ordinal = String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
+    ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
+    return metadata;
+  }
+
+  public static TermIndex getTermIndexFromMetadata(ByteBuffer metadata) {
+    Charset charset = Charset.defaultCharset();
+    CharBuffer charBuffer = charset.decode(metadata);
+    String ordinal = charBuffer.toString();
+    String[] items = ordinal.split("_");
+    return TermIndex.valueOf(Long.parseLong(items[0]), Long.parseLong(items[1]));
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index bb6220a769..be0dbaa6c7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -68,7 +68,9 @@ public class StandAloneServerImpl implements IStateMachine {
   }
 
   @Override
-  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+  public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+    return false;
+  }
 
   @Override
   public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
index 1f66bf1d87..8693d1f6ce 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
@@ -46,7 +46,9 @@ public class EmptyStateMachine implements IStateMachine {
   }
 
   @Override
-  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+  public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+    return false;
+  }
 
   @Override
   public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
index 5279e235a4..aab3e17bcd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
@@ -57,10 +57,12 @@ public interface IStateMachine {
    * @param metadata the metadata IConsensus want IStateMachine to preserve. NOTICE: the more
    *     updated snapshot will have lexicographically larger metadata. This property should be
    *     guaranteed by every IConsensus implementation. IStateMachine can use the metadata to sort
-   *     or label snapshot.
+   *     or label snapshot. e.g, metadata is byteBuffer("123_456"), the statemachine can create a
+   *     directory ${snapshotDir}/123_456/ and store all files under this directory
    * @param snapshotDir the root dir of snapshot files
+   * @return true if snapshot successfully taken
    */
-  void takeSnapshot(ByteBuffer metadata, File snapshotDir);
+  boolean takeSnapshot(ByteBuffer metadata, File snapshotDir);
 
   /**
    * When recover from crash / leader installSnapshot to follower, this method is called.
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index ce9817b6f8..282141b878 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -19,20 +19,15 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
-import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -50,79 +45,11 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class RatisConsensusTest {
 
   private static final String RATIS_CLASS_NAME = "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
-  private static class TestDataSet implements DataSet {
-    private int number;
-
-    public void setNumber(int number) {
-      this.number = number;
-    }
-
-    public int getNumber() {
-      return number;
-    }
-  }
-
-  private static class TestRequest {
-    private final int cmd;
-
-    public TestRequest(ByteBuffer buffer) {
-      cmd = buffer.getInt();
-    }
-
-    public boolean isIncr() {
-      return cmd == 1;
-    }
-  }
-
-  private static class IntegerCounter implements IStateMachine {
-    AtomicInteger integer;
-
-    @Override
-    public void start() {
-      integer = new AtomicInteger(0);
-    }
-
-    @Override
-    public void stop() {}
-
-    @Override
-    public TSStatus write(IConsensusRequest IConsensusRequest) {
-      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
-      TestRequest testRequest = new TestRequest(request.getContent());
-      if (testRequest.isIncr()) {
-        integer.incrementAndGet();
-      }
-      return new TSStatus(200);
-    }
-
-    @Override
-    public DataSet read(IConsensusRequest IConsensusRequest) {
-      TestDataSet dataSet = new TestDataSet();
-      dataSet.setNumber(integer.get());
-      return dataSet;
-    }
-
-    @Override
-    public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
-
-    @Override
-    public SnapshotMeta getLatestSnapshot(File snapshotDir) {
-      return null;
-    }
-
-    @Override
-    public void loadSnapshot(SnapshotMeta latest) {}
-
-    @Override
-    public void cleanUpOldSnapshots(File snapshotDir) {}
-  }
-
   private ConsensusGroupId gid;
   private List<Peer> peers;
   private List<File> peersStorage;
@@ -133,6 +60,22 @@ public class RatisConsensusTest {
   private Peer peer2;
   CountDownLatch latch;
 
+  private void makeServers() throws IOException {
+    for (int i = 0; i < 3; i++) {
+      servers.add(
+          ConsensusFactory.getConsensusImpl(
+                  RATIS_CLASS_NAME,
+                  peers.get(i).getEndpoint(),
+                  peersStorage.get(i),
+                  groupId -> new TestUtils.IntegerCounter())
+              .orElseThrow(
+                  () ->
+                      new IllegalArgumentException(
+                          String.format(ConsensusFactory.CONSTRUCT_FAILED_MSG, RATIS_CLASS_NAME))));
+      servers.get(i).start();
+    }
+  }
+
   @Before
   public void setUp() throws IOException {
     gid = new DataRegionId(1);
@@ -152,19 +95,7 @@ public class RatisConsensusTest {
     }
     group = new ConsensusGroup(gid, peers);
     servers = new ArrayList<>();
-    for (int i = 0; i < 3; i++) {
-      servers.add(
-          ConsensusFactory.getConsensusImpl(
-                  RATIS_CLASS_NAME,
-                  peers.get(i).getEndpoint(),
-                  peersStorage.get(i),
-                  groupId -> new IntegerCounter())
-              .orElseThrow(
-                  () ->
-                      new IllegalArgumentException(
-                          String.format(ConsensusFactory.CONSTRUCT_FAILED_MSG, RATIS_CLASS_NAME))));
-      servers.get(i).start();
-    }
+    makeServers();
   }
 
   @After
@@ -180,15 +111,15 @@ public class RatisConsensusTest {
   @Test
   public void basicConsensus() throws Exception {
 
-    // 4. Add a new group
+    // 1. Add a new group
     servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
     servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
     servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
 
-    // 5. Do Consensus 10
+    // 2. Do Consensus 10
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
 
-    // 6. Remove two Peers from Group (peer 0 and peer 2)
+    // 3. Remove two Peers from Group (peer 0 and peer 2)
     // transfer the leader to peer1
     servers.get(0).transferLeader(gid, peer1);
     Assert.assertTrue(servers.get(1).isLeader(gid));
@@ -200,10 +131,10 @@ public class RatisConsensusTest {
     servers.get(2).removeConsensusGroup(gid);
     Assert.assertEquals(servers.get(1).getLeader(gid).getEndpoint(), peers.get(1).getEndpoint());
 
-    // 7. try consensus again with one peer
+    // 4. try consensus again with one peer
     doConsensus(servers.get(1), gid, 10, 20);
 
-    // 8. add two peers back
+    // 5. add two peers back
     // first notify these new peers, let them initialize
     servers.get(0).addConsensusGroup(gid, peers);
     servers.get(2).addConsensusGroup(gid, peers);
@@ -211,17 +142,30 @@ public class RatisConsensusTest {
     servers.get(1).addPeer(gid, peer0);
     servers.get(1).addPeer(gid, peer2);
 
-    // 9. try consensus with all 3 peers
+    // 6. try consensus with all 3 peers
     doConsensus(servers.get(2), gid, 10, 30);
 
-    // 10. again, group contains only peer0
+    // 7. again, group contains only peer0
     servers.get(0).transferLeader(group.getGroupId(), peer0);
     servers.get(0).changePeer(group.getGroupId(), Collections.singletonList(peer0));
     servers.get(1).removeConsensusGroup(group.getGroupId());
     servers.get(2).removeConsensusGroup(group.getGroupId());
 
-    // 11. try consensus with only peer0
+    // 8. try consensus with only peer0
     doConsensus(servers.get(0), gid, 10, 40);
+
+    // 9. shutdown all the servers
+    for (IConsensus consensus : servers) {
+      consensus.stop();
+    }
+    servers.clear();
+
+    // 10. start again and verify the snapshot
+    makeServers();
+    servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
+    servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
+    servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
+    doConsensus(servers.get(0), gid, 10, 50);
   }
 
   private void doConsensus(IConsensus consensus, ConsensusGroupId gid, int count, int target)
@@ -275,7 +219,7 @@ public class RatisConsensusTest {
 
     // Check we reached a consensus
     ConsensusReadResponse response = leader.read(gid, getReq);
-    TestDataSet result = (TestDataSet) response.getDataset();
+    TestUtils.TestDataSet result = (TestUtils.TestDataSet) response.getDataset();
     Assert.assertEquals(target, result.getNumber());
   }
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
new file mode 100644
index 0000000000..3af8c5df5e
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.server.storage.RaftStorageMetadataFile;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+public class SnapshotTest {
+
+  private static final File testDir = new File("target" + File.separator + "sm");
+
+  // Mock Storage which only provides the state machine dir
+  private static class EmptyStorageWithOnlySMDir implements RaftStorage {
+
+    @Override
+    public RaftStorageDirectory getStorageDir() {
+      return new RaftStorageDirectory() {
+        @Override
+        public File getRoot() {
+          return null;
+        }
+
+        @Override
+        public boolean isHealthy() {
+          return false;
+        }
+
+        @Override
+        public File getStateMachineDir() {
+          return testDir;
+        }
+      };
+    }
+
+    @Override
+    public RaftStorageMetadataFile getMetadataFile() {
+      return null;
+    }
+
+    @Override
+    public RaftServerConfigKeys.Log.CorruptionPolicy getLogCorruptionPolicy() {
+      return null;
+    }
+
+    @Override
+    public void close() throws IOException {}
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    FileUtils.deleteFully(testDir);
+    FileUtils.createDirectories(testDir);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtils.deleteFully(testDir);
+  }
+
+  @Test
+  public void testSnapshot() throws Exception {
+    ApplicationStateMachineProxy proxy =
+        new ApplicationStateMachineProxy(new TestUtils.IntegerCounter());
+
+    proxy.initialize(null, null, new EmptyStorageWithOnlySMDir());
+
+    // take a snapshot at 421-616
+    proxy.notifyTermIndexUpdated(421, 616);
+    String snapshotFilename = TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "421_616");
+    long index = proxy.takeSnapshot();
+    Assert.assertEquals(index, 616);
+    Assert.assertTrue(new File(snapshotFilename).exists());
+
+    // take a snapshot at 616-4217
+    proxy.notifyTermIndexUpdated(616, 4217);
+    String snapshotFilenameLatest =
+        TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "616_4217");
+    long indexLatest = proxy.takeSnapshot();
+    Assert.assertEquals(indexLatest, 4217);
+    Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+
+    // query the latest snapshot
+    SnapshotInfo info = proxy.getLatestSnapshot();
+    Assert.assertEquals(info.getTerm(), 616);
+    Assert.assertEquals(info.getIndex(), 4217);
+    Assert.assertTrue(info.getFiles().get(0).getPath().endsWith(snapshotFilenameLatest));
+
+    // clean up
+    proxy.getStateMachineStorage().cleanupOldSnapshots(null);
+    Assert.assertFalse(new File(snapshotFilename).exists());
+    Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+  }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
new file mode 100644
index 0000000000..fe209ba019
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestUtils {
+  static class TestDataSet implements DataSet {
+    private int number;
+
+    public void setNumber(int number) {
+      this.number = number;
+    }
+
+    public int getNumber() {
+      return number;
+    }
+  }
+
+  static class TestRequest {
+    private final int cmd;
+
+    public TestRequest(ByteBuffer buffer) {
+      cmd = buffer.getInt();
+    }
+
+    public boolean isIncr() {
+      return cmd == 1;
+    }
+  }
+
+  static class IntegerCounter implements IStateMachine {
+    private AtomicInteger integer;
+    private final Logger logger = LoggerFactory.getLogger(IntegerCounter.class);
+
+    @Override
+    public void start() {
+      integer = new AtomicInteger(0);
+    }
+
+    @Override
+    public void stop() {}
+
+    @Override
+    public TSStatus write(IConsensusRequest IConsensusRequest) {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      TestRequest testRequest = new TestRequest(request.getContent());
+      if (testRequest.isIncr()) {
+        integer.incrementAndGet();
+      }
+      return new TSStatus(200);
+    }
+
+    @Override
+    public DataSet read(IConsensusRequest IConsensusRequest) {
+      TestDataSet dataSet = new TestDataSet();
+      dataSet.setNumber(integer.get());
+      return dataSet;
+    }
+
+    public static synchronized String ensureSnapshotFileName(File snapshotDir, String metadata) {
+      File dir = new File(snapshotDir + File.separator + metadata);
+      if (!(dir.exists() && dir.isDirectory())) {
+        dir.mkdirs();
+      }
+      return dir.getPath() + File.separator + "snapshot." + metadata;
+    }
+
+    @Override
+    public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+      /**
+       * When IStateMachine take the snapshot, it can directly use the metadata to name the snapshot
+       * file. It's guaranteed that more up-to-date snapshot will have lexicographically larger
+       * metadata.
+       */
+      String tempFilePath = snapshotDir + File.separator + ".tmp";
+      String filePath = ensureSnapshotFileName(snapshotDir, new String(metadata.array()));
+
+      File tempFile = new File(tempFilePath);
+
+      try {
+        FileWriter writer = new FileWriter(tempFile);
+        writer.write(String.valueOf(integer.get()));
+        writer.close();
+        tempFile.renameTo(new File(filePath));
+      } catch (IOException e) {
+        logger.error("take snapshot failed ", e);
+        return false;
+      }
+      return true;
+    }
+
+    private Object[] getSortedPaths(File rootDir) {
+      /**
+       * When looking for the latest snapshot inside the directory, just list all filenames and sort
+       * them.
+       */
+      ArrayList<Path> paths = new ArrayList<>();
+      try {
+        DirectoryStream<Path> stream = Files.newDirectoryStream(rootDir.toPath());
+        for (Path path : stream) {
+          paths.add(path);
+        }
+      } catch (IOException e) {
+        logger.error("read directory failed ", e);
+      }
+
+      Object[] pathArray = paths.toArray();
+      Arrays.sort(
+          pathArray,
+          new Comparator<Object>() {
+            @Override
+            public int compare(Object o1, Object o2) {
+              Path path1 = (Path) o1;
+              Path path2 = (Path) o2;
+              String index1 = path1.toFile().getName().split("_")[1];
+              String index2 = path2.toFile().getName().split("_")[1];
+              return Long.compare(Long.parseLong(index1), Long.parseLong(index2));
+            }
+          });
+      return pathArray;
+    }
+
+    @Override
+    public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+      Object[] pathArray = getSortedPaths(snapshotDir);
+      if (pathArray.length == 0) {
+        return null;
+      }
+      Path max = (Path) pathArray[pathArray.length - 1];
+      String dirName = max.toFile().getName();
+      File snapshotFile =
+          new File(max.toFile().getAbsolutePath() + File.separator + "snapshot." + dirName);
+
+      String ordinal = snapshotFile.getName().split("\\.")[1];
+      ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
+      return new SnapshotMeta(metadata, Collections.singletonList(snapshotFile));
+    }
+
+    @Override
+    public void loadSnapshot(SnapshotMeta latest) {
+      try {
+        Scanner scanner = new Scanner(latest.getSnapshotFiles().get(0));
+        int snapshotValue = Integer.parseInt(scanner.next());
+        integer.set(snapshotValue);
+        scanner.close();
+      } catch (IOException e) {
+        logger.error("read file failed ", e);
+      }
+    }
+
+    @Override
+    public void cleanUpOldSnapshots(File snapshotDir) {
+      Object[] paths = getSortedPaths(snapshotDir);
+      for (int i = 0; i < paths.length - 1; i++) {
+        try {
+          FileUtils.deleteFully((Path) paths[i]);
+        } catch (IOException e) {
+          logger.error("delete file failed ", e);
+        }
+      }
+    }
+  }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 1482f21c04..0275868843 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -112,7 +112,9 @@ public class StandAloneConsensusTest {
     }
 
     @Override
-    public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+    public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+      return false;
+    }
 
     @Override
     public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 0d99a2d6c2..789cbeafa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -62,7 +62,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
   public void stop() {}
 
   @Override
-  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+  public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+    return false;
+  }
 
   @Override
   public SnapshotMeta getLatestSnapshot(File snapshotDir) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 26f24b03da..c5c753bfa2 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -53,7 +53,9 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
   public void stop() {}
 
   @Override
-  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+  public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
+    return false;
+  }
 
   @Override
   public SnapshotMeta getLatestSnapshot(File snapshotDir) {