You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/02 09:05:43 UTC

[iotdb] branch issue_3076 updated (57d74643ea -> 870949223b)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch issue_3076
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard 57d74643ea Optimize StandAloneConsensus read/write performance && consensus module code refactor
     new 870949223b Optimize StandAloneConsensus read/write performance && consensus module code refactor

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (57d74643ea)
            \
             N -- N -- N   refs/heads/issue_3076 (870949223b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/iotdb/consensus/exception/IllegalPeerEndpointException.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[iotdb] 01/01: Optimize StandAloneConsensus read/write performance && consensus module code refactor

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch issue_3076
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 870949223ba80c5f92d10798866f4edb84176c08
Author: LebronAl <TX...@gmail.com>
AuthorDate: Mon May 2 16:57:13 2022 +0800

    Optimize StandAloneConsensus read/write performance && consensus module code refactor
---
 .../statemachine/PartitionRegionStateMachine.java  |   2 +-
 .../apache/iotdb/consensus/ConsensusFactory.java   |   1 -
 .../{statemachine => }/IStateMachine.java          |   5 +-
 .../exception/IllegalPeerEndpointException.java    |  32 ++++++
 .../ratis/ApplicationStateMachineProxy.java        |   2 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |   2 +-
 .../iotdb/consensus/ratis/RequestMessage.java      |   4 -
 .../iotdb/consensus/ratis/SnapshotStorage.java     |   4 +-
 .../org/apache/iotdb/consensus/ratis/Utils.java    |   3 +-
 .../consensus/standalone/StandAloneConsensus.java  | 116 ++++++++++-----------
 .../consensus/standalone/StandAloneServerImpl.java |  14 ++-
 .../apache/iotdb/consensus}/EmptyStateMachine.java |   2 +-
 .../apache/iotdb/consensus/ratis/TestUtils.java    |   2 +-
 .../iotdb/consensus/standalone/RecoveryTest.java   |   4 +-
 .../standalone/StandAloneConsensusTest.java        |  18 +++-
 .../iotdb/commons/consensus/ConsensusGroupId.java  |  69 +++++++-----
 .../iotdb/commons/consensus/DataRegionId.java      |  39 +------
 .../iotdb/commons/consensus/PartitionRegionId.java |  39 +------
 .../iotdb/commons/consensus/SchemaRegionId.java    |  39 +------
 .../apache/iotdb/commons/ConsensusGroupIdTest.java |   4 +-
 .../consensus/statemachine/BaseStateMachine.java   |   2 +-
 .../service/thrift/impl/InternalServiceImpl.java   |   2 +-
 .../iotdb/db/service/InternalServiceImplTest.java  |   4 +-
 23 files changed, 172 insertions(+), 237 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index fa14e859f0..e429f61f20 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
 import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.service.executor.ConfigRequestExecutor;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index d9364ad13e..efc40e31e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
similarity index 97%
rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index aab3e17bcd..c090fa2a35 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.statemachine;
+package org.apache.iotdb.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -25,10 +25,13 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.function.Function;
 
+@ThreadSafe
 public interface IStateMachine {
 
   interface Registry extends Function<ConsensusGroupId, IStateMachine> {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java
new file mode 100644
index 0000000000..7591345acc
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class IllegalPeerEndpointException extends ConsensusException {
+
+  public IllegalPeerEndpointException(TEndPoint currentNode, TEndPoint newNode) {
+    super(
+        String.format(
+            "Illegal creation for node %s in node %s in StandAloneConsensus Mode",
+            newNode, currentNode));
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 53113293a0..317029a740 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d022f096cc..f8d65fa0b0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -39,7 +40,6 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 9dc73d815c..242573e5be 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -23,15 +23,11 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
 public class RequestMessage implements Message {
 
-  private final Logger logger = LoggerFactory.getLogger(RequestMessage.class);
-
   private final IConsensusRequest actualRequest;
   private volatile ByteString serializedContent;
   private static final int DEFAULT_BUFFER_SIZE = 2048 * 10;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 2fbc7916ca..c50a4b3446 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.consensus.ratis;
 
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -46,7 +46,7 @@ import java.util.List;
  * hopefully will be introduced in Ratis 2.3.0.
  */
 public class SnapshotStorage implements StateMachineStorage {
-  private IStateMachine applicationStateMachine;
+  private final IStateMachine applicationStateMachine;
 
   private File stateMachineDir;
   private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index f1842a6dea..96674e5592 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -166,8 +166,7 @@ public class Utils {
 
   public static ByteBuffer getMetadataFromTermIndex(TermIndex termIndex) {
     String ordinal = String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
-    ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
-    return metadata;
+    return ByteBuffer.wrap(ordinal.getBytes());
   }
 
   public static TermIndex getTermIndexFromMetadata(ByteBuffer metadata) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index f0d15221f2..8d79c1b3fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.consensus.standalone;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.IStateMachine.Registry;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -32,9 +32,11 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -43,20 +45,19 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A simple consensus implementation, which can be used when replicaNum is 1.
  *
  * <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
- *
- * <p>any module can use `IConsensus consensusImpl = new StandAloneConsensus(id -> new
- * EmptyStateMachine());` to perform an initialization implementation.
  */
 class StandAloneConsensus implements IConsensus {
 
+  private final Logger logger = LoggerFactory.getLogger(StandAloneConsensus.class);
+
   private final TEndPoint thisNode;
   private final File storageDir;
   private final IStateMachine.Registry registry;
@@ -71,16 +72,21 @@ class StandAloneConsensus implements IConsensus {
 
   @Override
   public void start() throws IOException {
-    if (!this.storageDir.exists()) {
-      storageDir.mkdirs();
+    initAndRecover();
+  }
+
+  private void initAndRecover() throws IOException {
+    if (!storageDir.exists()) {
+      if (!storageDir.mkdirs()) {
+        logger.warn("Unable to create consensus dir at {}", storageDir);
+      }
     } else {
       try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
         for (Path path : stream) {
-          String filename = path.getFileName().toString();
-          String[] items = filename.split("_");
-          TConsensusGroupType type = TConsensusGroupType.valueOf(items[0]);
-          ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createEmpty(type);
-          consensusGroupId.setId(Integer.parseInt(items[1]));
+          String[] items = path.getFileName().toString().split("_");
+          ConsensusGroupId consensusGroupId =
+              ConsensusGroupId.Factory.create(
+                  TConsensusGroupType.valueOf(items[0]), Integer.parseInt(items[1]));
           TEndPoint endPoint = new TEndPoint(items[2], Integer.parseInt(items[3]));
           stateMachineMap.put(
               consensusGroupId,
@@ -96,40 +102,24 @@ class StandAloneConsensus implements IConsensus {
 
   @Override
   public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
-    AtomicReference<TSStatus> result = new AtomicReference<>();
-    stateMachineMap.computeIfPresent(
-        groupId,
-        (k, v) -> {
-          // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect
-          // performance
-          result.set(v.write(request));
-          return v;
-        });
-    if (result.get() == null) {
+    StandAloneServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
       return ConsensusWriteResponse.newBuilder()
           .setException(new ConsensusGroupNotExistException(groupId))
           .build();
     }
-    return ConsensusWriteResponse.newBuilder().setStatus(result.get()).build();
+    return ConsensusWriteResponse.newBuilder().setStatus(impl.write(request)).build();
   }
 
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
-    AtomicReference<DataSet> result = new AtomicReference<>();
-    stateMachineMap.computeIfPresent(
-        groupId,
-        (k, v) -> {
-          // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect
-          // performance
-          result.set(v.read(request));
-          return v;
-        });
-    if (result.get() == null) {
+    StandAloneServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
       return ConsensusReadResponse.newBuilder()
           .setException(new ConsensusGroupNotExistException(groupId))
           .build();
     }
-    return ConsensusReadResponse.newBuilder().setDataSet(result.get()).build();
+    return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
   }
 
   @Override
@@ -140,6 +130,11 @@ class StandAloneConsensus implements IConsensus {
           .setException(new IllegalPeerNumException(consensusGroupSize))
           .build();
     }
+    if (!Objects.equals(thisNode, peers.get(0).getEndpoint())) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new IllegalPeerEndpointException(thisNode, peers.get(0).getEndpoint()))
+          .build();
+    }
     AtomicBoolean exist = new AtomicBoolean(true);
     stateMachineMap.computeIfAbsent(
         groupId,
@@ -148,19 +143,11 @@ class StandAloneConsensus implements IConsensus {
           StandAloneServerImpl impl =
               new StandAloneServerImpl(peers.get(0), registry.apply(groupId));
           impl.start();
-          String groupPath =
-              storageDir
-                  + File.separator
-                  + groupId.getType()
-                  + "_"
-                  + groupId.getId()
-                  + "_"
-                  + peers.get(0).getEndpoint().ip
-                  + "_"
-                  + peers.get(0).getEndpoint().port;
-          File file = new File(groupPath);
-          file.mkdirs();
-
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.mkdirs()) {
+            logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
+          }
           return impl;
         });
     if (exist.get()) {
@@ -177,20 +164,13 @@ class StandAloneConsensus implements IConsensus {
     stateMachineMap.computeIfPresent(
         groupId,
         (k, v) -> {
-          String groupPath =
-              storageDir
-                  + File.separator
-                  + groupId.getType()
-                  + "_"
-                  + groupId.getId()
-                  + "_"
-                  + thisNode.ip
-                  + "_"
-                  + thisNode.port;
-          File file = new File(groupPath);
-          file.delete();
           exist.set(true);
           v.stop();
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.delete()) {
+            logger.warn("Unable to delete consensus dir for group {} at {}", groupId, path);
+          }
           return null;
         });
 
@@ -239,4 +219,16 @@ class StandAloneConsensus implements IConsensus {
     }
     return new Peer(groupId, thisNode);
   }
+
+  private String buildPeerDir(ConsensusGroupId groupId) {
+    return storageDir
+        + File.separator
+        + groupId.getType()
+        + "_"
+        + groupId.getId()
+        + "_"
+        + thisNode.getIp()
+        + "_"
+        + thisNode.getPort();
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index be0dbaa6c7..4c55e9676b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -20,11 +20,11 @@
 package org.apache.iotdb.consensus.standalone;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import java.io.File;
 import java.nio.ByteBuffer;
@@ -69,17 +69,21 @@ public class StandAloneServerImpl implements IStateMachine {
 
   @Override
   public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
-    return false;
+    return stateMachine.takeSnapshot(metadata, snapshotDir);
   }
 
   @Override
   public SnapshotMeta getLatestSnapshot(File snapshotDir) {
-    return null;
+    return stateMachine.getLatestSnapshot(snapshotDir);
   }
 
   @Override
-  public void loadSnapshot(SnapshotMeta latest) {}
+  public void loadSnapshot(SnapshotMeta latest) {
+    stateMachine.loadSnapshot(latest);
+  }
 
   @Override
-  public void cleanUpOldSnapshots(File snapshotDir) {}
+  public void cleanUpOldSnapshots(File snapshotDir) {
+    stateMachine.cleanUpOldSnapshots(snapshotDir);
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
similarity index 97%
rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 8693d1f6ce..118b057d1a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.statemachine;
+package org.apache.iotdb.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index fe209ba019..962dd84d95 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.slf4j.Logger;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index 0c9f0e74f6..a166893b6b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.EmptyStateMachine;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -48,7 +48,7 @@ public class RecoveryTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 STANDALONE_CONSENSUS_CLASS_NAME,
-                new TEndPoint("localhost", 9000),
+                new TEndPoint("0.0.0.0", 9000),
                 new File("./target/recovery"),
                 gid -> new EmptyStateMachine())
             .orElseThrow(
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 986e347356..1222598444 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.EmptyStateMachine;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
@@ -36,9 +38,8 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -134,7 +135,7 @@ public class StandAloneConsensusTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 STANDALONE_CONSENSUS_CLASS_NAME,
-                new TEndPoint("localhost", 6667),
+                new TEndPoint("0.0.0.0", 6667),
                 new File("./target/standalone"),
                 gid -> {
                   switch (gid.getType()) {
@@ -186,11 +187,18 @@ public class StandAloneConsensusTest {
     assertTrue(response3.getException() instanceof IllegalPeerNumException);
 
     ConsensusGenericResponse response4 =
+        consensusImpl.addConsensusGroup(
+            dataRegionId,
+            Collections.singletonList(new Peer(dataRegionId, new TEndPoint("0.0.0.1", 6667))));
+    assertFalse(response4.isSuccess());
+    assertTrue(response4.getException() instanceof IllegalPeerEndpointException);
+
+    ConsensusGenericResponse response5 =
         consensusImpl.addConsensusGroup(
             schemaRegionId,
             Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 6667))));
-    assertTrue(response4.isSuccess());
-    assertNull(response4.getException());
+    assertTrue(response5.isSuccess());
+    assertNull(response5.getException());
   }
 
   @Test
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index aee4bcee55..adb1824c35 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -22,53 +22,66 @@ package org.apache.iotdb.commons.consensus;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-public interface ConsensusGroupId {
+import java.util.Objects;
 
-  // return specific id
-  int getId();
+// we abstract this class to hide word `ConsensusGroup` for IoTDB StorageEngine/SchemaEngine
+public abstract class ConsensusGroupId {
 
-  void setId(int id);
+  protected int id;
+
+  // return specific id
+  public int getId() {
+    return id;
+  }
 
   // return specific type
-  TConsensusGroupType getType();
+  public abstract TConsensusGroupType getType();
 
-  class Factory {
-    public static ConsensusGroupId createEmpty(TConsensusGroupType type) {
+  @Override
+  public int hashCode() {
+    return Objects.hash(getType(), getId());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ConsensusGroupId that = (ConsensusGroupId) o;
+    return getId() == that.getId() && getType() == that.getType();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s[%d]", getType(), getId());
+  }
+
+  public static class Factory {
+
+    public static ConsensusGroupId create(TConsensusGroupType type, int id) {
       ConsensusGroupId groupId;
       switch (type) {
         case DataRegion:
-          groupId = new DataRegionId();
+          groupId = new DataRegionId(id);
           break;
         case SchemaRegion:
-          groupId = new SchemaRegionId();
+          groupId = new SchemaRegionId(id);
           break;
         case PartitionRegion:
-          groupId = new PartitionRegionId();
+          groupId = new PartitionRegionId(id);
           break;
         default:
-          throw new IllegalArgumentException("unrecognized id type " + type);
+          throw new IllegalArgumentException("unrecognized id type " + id);
       }
       return groupId;
     }
 
-    public static ConsensusGroupId convertFromTConsensusGroupId(
+    public static ConsensusGroupId createFromTConsensusGroupId(
         TConsensusGroupId tConsensusGroupId) {
-      ConsensusGroupId groupId = createEmpty(tConsensusGroupId.getType());
-      groupId.setId(tConsensusGroupId.getId());
-      return groupId;
-    }
-
-    public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId consensusGroupId) {
-      TConsensusGroupId result = new TConsensusGroupId();
-      if (consensusGroupId instanceof SchemaRegionId) {
-        result.setType(TConsensusGroupType.SchemaRegion);
-      } else if (consensusGroupId instanceof DataRegionId) {
-        result.setType(TConsensusGroupType.DataRegion);
-      } else if (consensusGroupId instanceof PartitionRegionId) {
-        result.setType(TConsensusGroupType.PartitionRegion);
-      }
-      result.setId(consensusGroupId.getId());
-      return result;
+      return create(tConsensusGroupId.getType(), tConsensusGroupId.getId());
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 81fed3fe62..03ec5e4d7d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-import java.util.Objects;
-
-public class DataRegionId implements ConsensusGroupId {
-
-  private int id;
-
-  public DataRegionId() {}
+public class DataRegionId extends ConsensusGroupId {
 
   public DataRegionId(int id) {
     this.id = id;
   }
 
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.DataRegion;
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    DataRegionId that = (DataRegionId) o;
-    return id == that.id;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(id, TConsensusGroupType.DataRegion);
-  }
-
-  public String toString() {
-    return String.format("%s[%d]", getType(), getId());
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 6f317547ed..82cb7a2fed 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-import java.util.Objects;
-
-public class PartitionRegionId implements ConsensusGroupId {
-
-  private int id;
-
-  public PartitionRegionId() {}
+public class PartitionRegionId extends ConsensusGroupId {
 
   public PartitionRegionId(int id) {
     this.id = id;
   }
 
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.PartitionRegion;
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    PartitionRegionId that = (PartitionRegionId) o;
-    return id == that.id;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(id, TConsensusGroupType.PartitionRegion);
-  }
-
-  public String toString() {
-    return String.format("%s[%d]", getType(), getId());
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 61c4ca80c5..48b9cbf820 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 
-import java.util.Objects;
-
-public class SchemaRegionId implements ConsensusGroupId {
-
-  private int id;
-
-  public SchemaRegionId() {}
+public class SchemaRegionId extends ConsensusGroupId {
 
   public SchemaRegionId(int id) {
     this.id = id;
   }
 
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.SchemaRegion;
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    SchemaRegionId that = (SchemaRegionId) o;
-    return id == that.id;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(id, TConsensusGroupType.SchemaRegion);
-  }
-
-  public String toString() {
-    return String.format("%s[%d]", getType(), getId());
-  }
 }
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
index af19aea35e..f00b6dbb9c 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
@@ -34,14 +34,14 @@ public class ConsensusGroupIdTest {
   @Test
   public void TestCreate() throws IOException {
     ConsensusGroupId dataRegionId =
-        ConsensusGroupId.Factory.convertFromTConsensusGroupId(
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1));
     Assert.assertTrue(dataRegionId instanceof DataRegionId);
     Assert.assertEquals(1, dataRegionId.getId());
     Assert.assertEquals(TConsensusGroupType.DataRegion, dataRegionId.getType());
 
     ConsensusGroupId schemaRegionId =
-        ConsensusGroupId.Factory.convertFromTConsensusGroupId(
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(
             new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2));
     Assert.assertTrue(schemaRegionId instanceof SchemaRegionId);
     Assert.assertEquals(2, schemaRegionId.getId());
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 873d652013..a19291477e 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -20,10 +20,10 @@
 package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.rpc.TSStatusCode;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 86810c8372..672b5206fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -89,7 +89,7 @@ public class InternalServiceImpl implements InternalService.Iface {
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
     QueryType type = QueryType.valueOf(req.queryType);
     ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.convertFromTConsensusGroupId(req.getConsensusGroupId());
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     switch (type) {
       case READ:
         ConsensusReadResponse readResp =
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index bdedbbf4d2..027260d071 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -85,7 +85,7 @@ public class InternalServiceImplTest {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     ConsensusImpl.getInstance()
         .addConsensusGroup(
-            ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId()),
+            ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
             genPeerList(regionReplicaSet));
     internalServiceImpl = new InternalServiceImpl();
   }
@@ -95,7 +95,7 @@ public class InternalServiceImplTest {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     ConsensusImpl.getInstance()
         .removeConsensusGroup(
-            ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId()));
+            ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()));
     FileUtils.deleteFully(new File(conf.getConsensusDir()));
   }