You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/03 07:09:42 UTC

[iotdb] branch master updated: [IOTDB-3076]Optimize StandAloneConsensus read/write performance && consensus modu… (#5768)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6274c11e [IOTDB-3076]Optimize StandAloneConsensus read/write performance && consensus modu… (#5768)
5e6274c11e is described below

commit 5e6274c11e3f9fe99df03200ebd4588917456d29
Author: Potato <TX...@gmail.com>
AuthorDate: Tue May 3 15:09:37 2022 +0800

    [IOTDB-3076]Optimize StandAloneConsensus read/write performance && consensus modu… (#5768)
    
    * Optimize StandAloneConsensus read/write performance && consensus module code refactor
    
    * refactor
---
 .../statemachine/PartitionRegionStateMachine.java  |   2 +-
 .../apache/iotdb/consensus/ConsensusFactory.java   |   1 -
 .../{statemachine => }/IStateMachine.java          |   5 +-
 .../exception/IllegalPeerEndpointException.java    |  32 ++++++
 .../ratis/ApplicationStateMachineProxy.java        |   2 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |   2 +-
 .../iotdb/consensus/ratis/RequestMessage.java      |   4 -
 .../iotdb/consensus/ratis/SnapshotStorage.java     |   4 +-
 .../org/apache/iotdb/consensus/ratis/Utils.java    |  49 +--------
 .../consensus/standalone/StandAloneConsensus.java  | 116 ++++++++++-----------
 .../consensus/standalone/StandAloneServerImpl.java |  14 ++-
 .../apache/iotdb/consensus}/EmptyStateMachine.java |   2 +-
 .../apache/iotdb/consensus/ratis/TestUtils.java    |   2 +-
 .../iotdb/consensus/standalone/RecoveryTest.java   |   4 +-
 .../standalone/StandAloneConsensusTest.java        |  18 +++-
 .../iotdb/commons/consensus/ConsensusGroupId.java  |  82 ++++++++-------
 .../iotdb/commons/consensus/DataRegionId.java      |  39 +------
 .../iotdb/commons/consensus/PartitionRegionId.java |  39 +------
 .../iotdb/commons/consensus/SchemaRegionId.java    |  39 +------
 .../apache/iotdb/commons/ConsensusGroupIdTest.java |   4 +-
 .../consensus/statemachine/BaseStateMachine.java   |   2 +-
 .../service/thrift/impl/InternalServiceImpl.java   |   2 +-
 .../iotdb/db/service/InternalServiceImplTest.java  |   4 +-
 23 files changed, 180 insertions(+), 288 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index fa14e859f0..e429f61f20 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
 import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.service.executor.ConfigRequestExecutor;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index d9364ad13e..efc40e31e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
similarity index 97%
rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index aab3e17bcd..c090fa2a35 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.statemachine;
+package org.apache.iotdb.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -25,10 +25,13 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.function.Function;
 
+@ThreadSafe
 public interface IStateMachine {
 
   interface Registry extends Function<ConsensusGroupId, IStateMachine> {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java
new file mode 100644
index 0000000000..7591345acc
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class IllegalPeerEndpointException extends ConsensusException {
+
+  public IllegalPeerEndpointException(TEndPoint currentNode, TEndPoint newNode) {
+    super(
+        String.format(
+            "Illegal creation for node %s in node %s in StandAloneConsensus Mode",
+            newNode, currentNode));
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 53113293a0..317029a740 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d022f096cc..f8d65fa0b0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -39,7 +40,6 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 9dc73d815c..242573e5be 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -23,15 +23,11 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
 public class RequestMessage implements Message {
 
-  private final Logger logger = LoggerFactory.getLogger(RequestMessage.class);
-
   private final IConsensusRequest actualRequest;
   private volatile ByteString serializedContent;
   private static final int DEFAULT_BUFFER_SIZE = 2048 * 10;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 2fbc7916ca..c50a4b3446 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.consensus.ratis;
 
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -46,7 +46,7 @@ import java.util.List;
  * hopefully will be introduced in Ratis 2.3.0.
  */
 public class SnapshotStorage implements StateMachineStorage {
-  private IStateMachine applicationStateMachine;
+  private final IStateMachine applicationStateMachine;
 
   private File stateMachineDir;
   private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 5ec86b994b..c0683572a9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -21,9 +21,6 @@ package org.apache.iotdb.consensus.ratis;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.PartitionRegionId;
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.common.Peer;
 
 import org.apache.ratis.protocol.RaftGroupId;
@@ -41,9 +38,6 @@ import java.nio.charset.Charset;
 public class Utils {
   private static final int tempBufferSize = 1024;
   private static final byte PADDING_MAGIC = 0x47;
-  private static final long DataRegionType = 0x01;
-  private static final long SchemaRegionType = 0x02;
-  private static final long PartitionRegionType = 0x03;
 
   public static String IPAddress(TEndPoint endpoint) {
     return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
@@ -52,30 +46,9 @@ public class Utils {
   /** Encode the ConsensusGroupId into 6 bytes 2 Bytes for Group Type 4 Bytes for Group ID */
   public static long groupEncode(ConsensusGroupId consensusGroupId) {
     // use abbreviations to prevent overflow
-    long groupType = 0L;
-    switch (consensusGroupId.getType()) {
-      case DataRegion:
-        {
-          groupType = DataRegionType;
-          break;
-        }
-      case SchemaRegion:
-        {
-          groupType = SchemaRegionType;
-          break;
-        }
-      case PartitionRegion:
-        {
-          groupType = PartitionRegionType;
-          break;
-        }
-      default:
-        {
-          return -1;
-        }
-    }
+    long groupType = consensusGroupId.getType().getValue();
     long groupCode = groupType << 32;
-    groupCode += (long) consensusGroupId.getId();
+    groupCode += consensusGroupId.getId();
     return groupCode;
   }
 
@@ -127,20 +100,7 @@ public class Utils {
     ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
     byteBuffer.put(padded, 12, 4);
     byteBuffer.flip();
-    int gid = byteBuffer.getInt();
-    ConsensusGroupId id;
-
-    if (type == DataRegionType) {
-      id = new DataRegionId(gid);
-    } else if (type == PartitionRegionType) {
-      id = new PartitionRegionId(gid);
-    } else if (type == SchemaRegionType) {
-      id = new SchemaRegionId(gid);
-    } else {
-      throw new IllegalArgumentException(
-          String.format("Unexpected consensusGroupId Type %d", type));
-    }
-    return id;
+    return ConsensusGroupId.Factory.create((int) type, byteBuffer.getInt());
   }
 
   public static ByteBuffer serializeTSStatus(TSStatus status) throws TException {
@@ -162,8 +122,7 @@ public class Utils {
 
   public static ByteBuffer getMetadataFromTermIndex(TermIndex termIndex) {
     String ordinal = String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
-    ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
-    return metadata;
+    return ByteBuffer.wrap(ordinal.getBytes());
   }
 
   public static TermIndex getTermIndexFromMetadata(ByteBuffer metadata) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index f0d15221f2..4c2100d1e1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.consensus.standalone;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.IStateMachine.Registry;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -32,9 +32,11 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -43,20 +45,19 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A simple consensus implementation, which can be used when replicaNum is 1.
  *
  * <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
- *
- * <p>any module can use `IConsensus consensusImpl = new StandAloneConsensus(id -> new
- * EmptyStateMachine());` to perform an initialization implementation.
  */
 class StandAloneConsensus implements IConsensus {
 
+  private final Logger logger = LoggerFactory.getLogger(StandAloneConsensus.class);
+
   private final TEndPoint thisNode;
   private final File storageDir;
   private final IStateMachine.Registry registry;
@@ -71,16 +72,21 @@ class StandAloneConsensus implements IConsensus {
 
   @Override
   public void start() throws IOException {
-    if (!this.storageDir.exists()) {
-      storageDir.mkdirs();
+    initAndRecover();
+  }
+
+  private void initAndRecover() throws IOException {
+    if (!storageDir.exists()) {
+      if (!storageDir.mkdirs()) {
+        logger.warn("Unable to create consensus dir at {}", storageDir);
+      }
     } else {
       try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
         for (Path path : stream) {
-          String filename = path.getFileName().toString();
-          String[] items = filename.split("_");
-          TConsensusGroupType type = TConsensusGroupType.valueOf(items[0]);
-          ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createEmpty(type);
-          consensusGroupId.setId(Integer.parseInt(items[1]));
+          String[] items = path.getFileName().toString().split("_");
+          ConsensusGroupId consensusGroupId =
+              ConsensusGroupId.Factory.create(
+                  TConsensusGroupType.valueOf(items[0]).getValue(), Integer.parseInt(items[1]));
           TEndPoint endPoint = new TEndPoint(items[2], Integer.parseInt(items[3]));
           stateMachineMap.put(
               consensusGroupId,
@@ -96,40 +102,24 @@ class StandAloneConsensus implements IConsensus {
 
   @Override
   public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
-    AtomicReference<TSStatus> result = new AtomicReference<>();
-    stateMachineMap.computeIfPresent(
-        groupId,
-        (k, v) -> {
-          // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect
-          // performance
-          result.set(v.write(request));
-          return v;
-        });
-    if (result.get() == null) {
+    StandAloneServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
       return ConsensusWriteResponse.newBuilder()
           .setException(new ConsensusGroupNotExistException(groupId))
           .build();
     }
-    return ConsensusWriteResponse.newBuilder().setStatus(result.get()).build();
+    return ConsensusWriteResponse.newBuilder().setStatus(impl.write(request)).build();
   }
 
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
-    AtomicReference<DataSet> result = new AtomicReference<>();
-    stateMachineMap.computeIfPresent(
-        groupId,
-        (k, v) -> {
-          // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect
-          // performance
-          result.set(v.read(request));
-          return v;
-        });
-    if (result.get() == null) {
+    StandAloneServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
       return ConsensusReadResponse.newBuilder()
           .setException(new ConsensusGroupNotExistException(groupId))
           .build();
     }
-    return ConsensusReadResponse.newBuilder().setDataSet(result.get()).build();
+    return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
   }
 
   @Override
@@ -140,6 +130,11 @@ class StandAloneConsensus implements IConsensus {
           .setException(new IllegalPeerNumException(consensusGroupSize))
           .build();
     }
+    if (!Objects.equals(thisNode, peers.get(0).getEndpoint())) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new IllegalPeerEndpointException(thisNode, peers.get(0).getEndpoint()))
+          .build();
+    }
     AtomicBoolean exist = new AtomicBoolean(true);
     stateMachineMap.computeIfAbsent(
         groupId,
@@ -148,19 +143,11 @@ class StandAloneConsensus implements IConsensus {
           StandAloneServerImpl impl =
               new StandAloneServerImpl(peers.get(0), registry.apply(groupId));
           impl.start();
-          String groupPath =
-              storageDir
-                  + File.separator
-                  + groupId.getType()
-                  + "_"
-                  + groupId.getId()
-                  + "_"
-                  + peers.get(0).getEndpoint().ip
-                  + "_"
-                  + peers.get(0).getEndpoint().port;
-          File file = new File(groupPath);
-          file.mkdirs();
-
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.mkdirs()) {
+            logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
+          }
           return impl;
         });
     if (exist.get()) {
@@ -177,20 +164,13 @@ class StandAloneConsensus implements IConsensus {
     stateMachineMap.computeIfPresent(
         groupId,
         (k, v) -> {
-          String groupPath =
-              storageDir
-                  + File.separator
-                  + groupId.getType()
-                  + "_"
-                  + groupId.getId()
-                  + "_"
-                  + thisNode.ip
-                  + "_"
-                  + thisNode.port;
-          File file = new File(groupPath);
-          file.delete();
           exist.set(true);
           v.stop();
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.delete()) {
+            logger.warn("Unable to delete consensus dir for group {} at {}", groupId, path);
+          }
           return null;
         });
 
@@ -239,4 +219,16 @@ class StandAloneConsensus implements IConsensus {
     }
     return new Peer(groupId, thisNode);
   }
+
+  private String buildPeerDir(ConsensusGroupId groupId) {
+    return storageDir
+        + File.separator
+        + groupId.getType()
+        + "_"
+        + groupId.getId()
+        + "_"
+        + thisNode.getIp()
+        + "_"
+        + thisNode.getPort();
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index be0dbaa6c7..4c55e9676b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -20,11 +20,11 @@
 package org.apache.iotdb.consensus.standalone;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import java.io.File;
 import java.nio.ByteBuffer;
@@ -69,17 +69,21 @@ public class StandAloneServerImpl implements IStateMachine {
 
   @Override
   public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
-    return false;
+    return stateMachine.takeSnapshot(metadata, snapshotDir);
   }
 
   @Override
   public SnapshotMeta getLatestSnapshot(File snapshotDir) {
-    return null;
+    return stateMachine.getLatestSnapshot(snapshotDir);
   }
 
   @Override
-  public void loadSnapshot(SnapshotMeta latest) {}
+  public void loadSnapshot(SnapshotMeta latest) {
+    stateMachine.loadSnapshot(latest);
+  }
 
   @Override
-  public void cleanUpOldSnapshots(File snapshotDir) {}
+  public void cleanUpOldSnapshots(File snapshotDir) {
+    stateMachine.cleanUpOldSnapshots(snapshotDir);
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
similarity index 97%
rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 8693d1f6ce..118b057d1a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.statemachine;
+package org.apache.iotdb.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index fe209ba019..962dd84d95 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.slf4j.Logger;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index 0c9f0e74f6..a166893b6b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.EmptyStateMachine;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -48,7 +48,7 @@ public class RecoveryTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 STANDALONE_CONSENSUS_CLASS_NAME,
-                new TEndPoint("localhost", 9000),
+                new TEndPoint("0.0.0.0", 9000),
                 new File("./target/recovery"),
                 gid -> new EmptyStateMachine())
             .orElseThrow(
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 986e347356..1222598444 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.EmptyStateMachine;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
@@ -36,9 +38,8 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -134,7 +135,7 @@ public class StandAloneConsensusTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 STANDALONE_CONSENSUS_CLASS_NAME,
-                new TEndPoint("localhost", 6667),
+                new TEndPoint("0.0.0.0", 6667),
                 new File("./target/standalone"),
                 gid -> {
                   switch (gid.getType()) {
@@ -186,11 +187,18 @@ public class StandAloneConsensusTest {
     assertTrue(response3.getException() instanceof IllegalPeerNumException);
 
     ConsensusGenericResponse response4 =
+        consensusImpl.addConsensusGroup(
+            dataRegionId,
+            Collections.singletonList(new Peer(dataRegionId, new TEndPoint("0.0.0.1", 6667))));
+    assertFalse(response4.isSuccess());
+    assertTrue(response4.getException() instanceof IllegalPeerEndpointException);
+
+    ConsensusGenericResponse response5 =
         consensusImpl.addConsensusGroup(
             schemaRegionId,
             Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 6667))));
-    assertTrue(response4.isSuccess());
-    assertNull(response4.getException());
+    assertTrue(response5.isSuccess());
+    assertNull(response5.getException());
   }
 
   @Test
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index aee4bcee55..f5c64f2ad0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -22,53 +22,63 @@ package org.apache.iotdb.commons.consensus;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-public interface ConsensusGroupId {
+import java.util.Objects;
 
-  // return specific id
-  int getId();
+// we abstract this class to hide word `ConsensusGroup` for IoTDB StorageEngine/SchemaEngine
+public abstract class ConsensusGroupId {
 
-  void setId(int id);
+  protected int id;
+
+  // return specific id
+  public int getId() {
+    return id;
+  }
 
   // return specific type
-  TConsensusGroupType getType();
+  public abstract TConsensusGroupType getType();
 
-  class Factory {
-    public static ConsensusGroupId createEmpty(TConsensusGroupType type) {
+  @Override
+  public int hashCode() {
+    return Objects.hash(getType(), getId());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ConsensusGroupId that = (ConsensusGroupId) o;
+    return getId() == that.getId() && getType() == that.getType();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s[%d]", getType(), getId());
+  }
+
+  public static class Factory {
+
+    public static ConsensusGroupId create(int type, int id) {
       ConsensusGroupId groupId;
-      switch (type) {
-        case DataRegion:
-          groupId = new DataRegionId();
-          break;
-        case SchemaRegion:
-          groupId = new SchemaRegionId();
-          break;
-        case PartitionRegion:
-          groupId = new PartitionRegionId();
-          break;
-        default:
-          throw new IllegalArgumentException("unrecognized id type " + type);
+      if (type == TConsensusGroupType.DataRegion.getValue()) {
+        groupId = new DataRegionId(id);
+      } else if (type == TConsensusGroupType.SchemaRegion.getValue()) {
+        groupId = new SchemaRegionId(id);
+      } else if (type == TConsensusGroupType.PartitionRegion.getValue()) {
+        groupId = new PartitionRegionId(id);
+      } else {
+        throw new IllegalArgumentException(
+            "Unrecognized TConsensusGroupType: " + type + " with id = " + id);
       }
       return groupId;
     }
 
-    public static ConsensusGroupId convertFromTConsensusGroupId(
+    public static ConsensusGroupId createFromTConsensusGroupId(
         TConsensusGroupId tConsensusGroupId) {
-      ConsensusGroupId groupId = createEmpty(tConsensusGroupId.getType());
-      groupId.setId(tConsensusGroupId.getId());
-      return groupId;
-    }
-
-    public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId consensusGroupId) {
-      TConsensusGroupId result = new TConsensusGroupId();
-      if (consensusGroupId instanceof SchemaRegionId) {
-        result.setType(TConsensusGroupType.SchemaRegion);
-      } else if (consensusGroupId instanceof DataRegionId) {
-        result.setType(TConsensusGroupType.DataRegion);
-      } else if (consensusGroupId instanceof PartitionRegionId) {
-        result.setType(TConsensusGroupType.PartitionRegion);
-      }
-      result.setId(consensusGroupId.getId());
-      return result;
+      return create(tConsensusGroupId.getType().getValue(), tConsensusGroupId.getId());
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 81fed3fe62..03ec5e4d7d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-import java.util.Objects;
-
-public class DataRegionId implements ConsensusGroupId {
-
-  private int id;
-
-  public DataRegionId() {}
+public class DataRegionId extends ConsensusGroupId {
 
   public DataRegionId(int id) {
     this.id = id;
   }
 
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.DataRegion;
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    DataRegionId that = (DataRegionId) o;
-    return id == that.id;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(id, TConsensusGroupType.DataRegion);
-  }
-
-  public String toString() {
-    return String.format("%s[%d]", getType(), getId());
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 6f317547ed..82cb7a2fed 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-import java.util.Objects;
-
-public class PartitionRegionId implements ConsensusGroupId {
-
-  private int id;
-
-  public PartitionRegionId() {}
+public class PartitionRegionId extends ConsensusGroupId {
 
   public PartitionRegionId(int id) {
     this.id = id;
   }
 
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.PartitionRegion;
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    PartitionRegionId that = (PartitionRegionId) o;
-    return id == that.id;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(id, TConsensusGroupType.PartitionRegion);
-  }
-
-  public String toString() {
-    return String.format("%s[%d]", getType(), getId());
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 61c4ca80c5..48b9cbf820 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-import java.util.Objects;
-
-public class SchemaRegionId implements ConsensusGroupId {
-
-  private int id;
-
-  public SchemaRegionId() {}
+public class SchemaRegionId extends ConsensusGroupId {
 
   public SchemaRegionId(int id) {
     this.id = id;
   }
 
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.SchemaRegion;
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    SchemaRegionId that = (SchemaRegionId) o;
-    return id == that.id;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(id, TConsensusGroupType.SchemaRegion);
-  }
-
-  public String toString() {
-    return String.format("%s[%d]", getType(), getId());
-  }
 }
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
index af19aea35e..f00b6dbb9c 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
@@ -34,14 +34,14 @@ public class ConsensusGroupIdTest {
   @Test
   public void TestCreate() throws IOException {
     ConsensusGroupId dataRegionId =
-        ConsensusGroupId.Factory.convertFromTConsensusGroupId(
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1));
     Assert.assertTrue(dataRegionId instanceof DataRegionId);
     Assert.assertEquals(1, dataRegionId.getId());
     Assert.assertEquals(TConsensusGroupType.DataRegion, dataRegionId.getType());
 
     ConsensusGroupId schemaRegionId =
-        ConsensusGroupId.Factory.convertFromTConsensusGroupId(
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(
             new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2));
     Assert.assertTrue(schemaRegionId instanceof SchemaRegionId);
     Assert.assertEquals(2, schemaRegionId.getId());
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 1fdaf2c7d0..2298d69272 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -20,10 +20,10 @@
 package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.rpc.TSStatusCode;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index b4be544d91..7f0b3a9489 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -89,7 +89,7 @@ public class InternalServiceImpl implements InternalService.Iface {
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
     QueryType type = QueryType.valueOf(req.queryType);
     ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.convertFromTConsensusGroupId(req.getConsensusGroupId());
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     switch (type) {
       case READ:
         ConsensusReadResponse readResp =
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 990c20f933..00bf02cb0b 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -85,7 +85,7 @@ public class InternalServiceImplTest {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     ConsensusImpl.getInstance()
         .addConsensusGroup(
-            ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId()),
+            ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
             genPeerList(regionReplicaSet));
     internalServiceImpl = new InternalServiceImpl();
   }
@@ -95,7 +95,7 @@ public class InternalServiceImplTest {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     ConsensusImpl.getInstance()
         .removeConsensusGroup(
-            ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId()));
+            ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()));
     FileUtils.deleteFully(new File(conf.getConsensusDir()));
   }