You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/05/06 02:18:56 UTC

[iotdb] branch master updated: [IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b3b0f8d2 [IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)
f9b3b0f8d2 is described below

commit f9b3b0f8d2b8a8ce07b65a37aacfb2b16ed003a7
Author: Potato <TX...@gmail.com>
AuthorDate: Fri May 6 10:18:52 2022 +0800

    [IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)
    
    [IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)
---
 .../statemachine/PartitionRegionStateMachine.java  |  2 +-
 .../org/apache/iotdb/consensus/IStateMachine.java  | 40 +++++++++++++++-
 .../org/apache/iotdb/consensus/common/Peer.java    |  5 ++
 .../ratis/ApplicationStateMachineProxy.java        | 36 +++++++++++++--
 .../iotdb/consensus/ratis/RatisConsensus.java      | 53 ++++++++++++----------
 .../org/apache/iotdb/consensus/ratis/Utils.java    | 47 +++++++++++++------
 .../apache/iotdb/consensus/EmptyStateMachine.java  |  2 +-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  | 13 +++++-
 .../apache/iotdb/consensus/ratis/SnapshotTest.java |  2 +-
 .../apache/iotdb/consensus/ratis/TestUtils.java    | 37 ++++++++++++++-
 .../apache/iotdb/consensus/ratis/UtilsTest.java    |  4 +-
 .../standalone/StandAloneConsensusTest.java        |  2 +-
 .../consensus/statemachine/BaseStateMachine.java   |  2 +-
 13 files changed, 194 insertions(+), 51 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index cb5048070a..0eb69d74e4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -36,7 +36,7 @@ import java.io.File;
 import java.io.IOException;
 
 /** Statemachine for PartitionRegion */
-public class PartitionRegionStateMachine implements IStateMachine {
+public class PartitionRegionStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index e12622f2ad..203b7d2a6b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -19,14 +19,17 @@
 
 package org.apache.iotdb.consensus;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.File;
+import java.util.List;
 import java.util.function.Function;
 
 @ThreadSafe
@@ -54,7 +57,7 @@ public interface IStateMachine {
 
   /**
    * Take a snapshot of current statemachine. All files are required to be stored under snapshotDir,
-   * which is a sub-directory of the StorageDir in Consensus
+   * which is a subdirectory of the StorageDir in Consensus
    *
    * @param snapshotDir required storage dir
    * @return true if snapshot is successfully taken
@@ -67,4 +70,39 @@ public interface IStateMachine {
    * @param latestSnapshotRootDir dir where the latest snapshot sits
    */
   void loadSnapshot(File latestSnapshotRootDir);
+
+  /** An optional API for event notifications. */
+  interface EventApi {
+    /**
+     * Notify the {@link IStateMachine} that a new leader has been elected. Note that the new leader
+     * can possibly be this server.
+     *
+     * @param groupId The id of this consensus group.
+     * @param newLeader The id of the new leader.
+     */
+    default void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint newLeader) {}
+
+    /**
+     * Notify the {@link IStateMachine} a configuration change. This method will be invoked when a
+     * newConfiguration is processed.
+     *
+     * @param term term of the current log entry
+     * @param index index which is being updated
+     * @param newConfiguration new configuration
+     */
+    default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {}
+  }
+
+  /**
+   * Get the {@link IStateMachine.EventApi} object.
+   *
+   * <p>If this {@link IStateMachine} chooses to support the optional {@link
+   * IStateMachine.EventApi}, it may either implement {@link IStateMachine.EventApi} directly or
+   * override this method to return an {@link IStateMachine.EventApi} object.
+   *
+   * @return The {@link IStateMachine.EventApi} object.
+   */
+  default IStateMachine.EventApi event() {
+    return (IStateMachine.EventApi) this;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index db537e9022..99379156fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -59,4 +59,9 @@ public class Peer {
   public int hashCode() {
     return Objects.hash(groupId, endpoint);
   }
+
+  @Override
+  public String toString() {
+    return "Peer{" + "groupId=" + groupId + ", endpoint=" + endpoint + '}';
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 123bd4f13d..3617a36b0c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -25,8 +25,11 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -49,11 +52,13 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
   // Raft Storage sub dir for statemachine data, default (_sm)
   private File statemachineDir;
   private final SnapshotStorage snapshotStorage;
+  private final RaftGroupId groupId;
 
-  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+  public ApplicationStateMachineProxy(IStateMachine stateMachine, RaftGroupId id) {
     applicationStateMachine = stateMachine;
     snapshotStorage = new SnapshotStorage(applicationStateMachine);
     applicationStateMachine.start();
+    groupId = id;
   }
 
   @Override
@@ -69,7 +74,7 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
   }
 
   @Override
-  public void reinitialize() throws IOException {
+  public void reinitialize() {
     setLastAppliedTermIndex(null);
     loadSnapshot(snapshotStorage.findLatestSnapshotDir());
     if (getLifeCycleState() == LifeCycle.State.PAUSED) {
@@ -136,11 +141,15 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
     // require the application statemachine to take the latest snapshot
     String metadata = Utils.getMetadataFromTermIndex(lastApplied);
     File snapshotDir = snapshotStorage.getSnapshotDir(metadata);
+    snapshotDir.mkdir();
+    if (!snapshotDir.isDirectory()) {
+      logger.error("Unable to create snapshotDir at {}", snapshotDir);
+      return RaftLog.INVALID_LOG_INDEX;
+    }
     boolean success = applicationStateMachine.takeSnapshot(snapshotDir);
     if (!success) {
       return RaftLog.INVALID_LOG_INDEX;
     }
-
     return lastApplied.getIndex();
   }
 
@@ -159,4 +168,25 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
   public StateMachineStorage getStateMachineStorage() {
     return snapshotStorage;
   }
+
+  @Override
+  public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
+    applicationStateMachine
+        .event()
+        .notifyLeaderChanged(
+            Utils.fromRaftGroupIdToConsensusGroupId(groupMemberId.getGroupId()),
+            Utils.formRaftPeerIdToTEndPoint(newLeaderId));
+  }
+
+  @Override
+  public void notifyConfigurationChanged(
+      long term, long index, RaftConfigurationProto newRaftConfiguration) {
+    applicationStateMachine
+        .event()
+        .notifyConfigurationChanged(
+            term,
+            index,
+            Utils.fromRaftProtoListAndRaftGroupIdToPeers(
+                newRaftConfiguration.getPeersList(), groupId));
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 96b3b766e3..8d644bbc81 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -109,7 +110,7 @@ class RatisConsensus implements IConsensus {
   public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
       throws IOException {
     String address = Utils.IPAddress(endpoint);
-    myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+    myself = Utils.fromTEndPointAndPriorityToRaftPeer(endpoint, DEFAULT_PRIORITY);
 
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
     RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
@@ -126,7 +127,8 @@ class RatisConsensus implements IConsensus {
             .setStateMachineRegistry(
                 raftGroupId ->
                     new ApplicationStateMachineProxy(
-                        registry.apply(Utils.toConsensusGroupId(raftGroupId))))
+                        registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
+                        raftGroupId))
             .build();
   }
 
@@ -150,7 +152,7 @@ class RatisConsensus implements IConsensus {
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
 
     // pre-condition: group exists and myself server serves this group
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
       return failedWrite(new ConsensusGroupNotExistException(groupId));
@@ -198,7 +200,7 @@ class RatisConsensus implements IConsensus {
     }
 
     if (suggestedLeader != null) {
-      TEndPoint leaderEndPoint = Utils.getEndpoint(suggestedLeader);
+      TEndPoint leaderEndPoint = Utils.formRaftPeerIdToTEndPoint(suggestedLeader.getId());
       writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), leaderEndPoint.getPort()));
     }
 
@@ -209,7 +211,7 @@ class RatisConsensus implements IConsensus {
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
 
-    RaftGroup group = getGroupInfo(Utils.toRatisGroupId(groupId));
+    RaftGroup group = getGroupInfo(Utils.fromConsensusGroupIdToRaftGroupId(groupId));
     if (group == null || !group.getPeers().contains(myself)) {
       return failedRead(new ConsensusGroupNotExistException(groupId));
     }
@@ -278,7 +280,7 @@ class RatisConsensus implements IConsensus {
    */
   @Override
   public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
 
     // pre-conditions: group exists and myself in this group
@@ -318,9 +320,9 @@ class RatisConsensus implements IConsensus {
    */
   @Override
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup group = getGroupInfo(raftGroupId);
-    RaftPeer peerToAdd = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
+    RaftPeer peerToAdd = Utils.fromTEndPointAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
     if (group == null || !group.getPeers().contains(myself)) {
@@ -353,9 +355,9 @@ class RatisConsensus implements IConsensus {
    */
   @Override
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup group = getGroupInfo(raftGroupId);
-    RaftPeer peerToRemove = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
+    RaftPeer peerToRemove = Utils.fromTEndPointAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
     if (group == null || !group.getPeers().contains(myself)) {
@@ -413,14 +415,14 @@ class RatisConsensus implements IConsensus {
 
     // first fetch the newest information
 
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
 
     if (raftGroup == null) {
       return failed(new ConsensusGroupNotExistException(groupId));
     }
 
-    RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader, LEADER_PRIORITY);
+    RaftPeer newRaftLeader = Utils.fromTEndPointAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY);
 
     ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
     for (RaftPeer raftPeer : raftGroup.getPeers()) {
@@ -428,7 +430,9 @@ class RatisConsensus implements IConsensus {
         newConfiguration.add(newRaftLeader);
       } else {
         // degrade every other peer to default priority
-        newConfiguration.add(Utils.toRaftPeer(Utils.getEndpoint(raftPeer), DEFAULT_PRIORITY));
+        newConfiguration.add(
+            Utils.fromTEndPointAndPriorityToRaftPeer(
+                Utils.formRaftPeerIdToTEndPoint(raftPeer.getId()), DEFAULT_PRIORITY));
       }
     }
 
@@ -459,7 +463,7 @@ class RatisConsensus implements IConsensus {
 
   @Override
   public boolean isLeader(ConsensusGroupId groupId) {
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
 
     boolean isLeader;
     try {
@@ -475,10 +479,10 @@ class RatisConsensus implements IConsensus {
   @Override
   public Peer getLeader(ConsensusGroupId groupId) {
     if (isLeader(groupId)) {
-      return new Peer(groupId, Utils.parseFromRatisId(myself.getId().toString()));
+      return new Peer(groupId, Utils.formRaftPeerIdToTEndPoint(myself.getId()));
     }
 
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftClient client;
     try {
       client = server.getDivision(raftGroupId).getRaftClient();
@@ -486,7 +490,7 @@ class RatisConsensus implements IConsensus {
       logger.warn("cannot find raft client for group " + groupId);
       return null;
     }
-    TEndPoint leaderEndpoint = Utils.parseFromRatisId(client.getLeaderId().toString());
+    TEndPoint leaderEndpoint = Utils.formRaftPeerIdToTEndPoint(client.getLeaderId());
     return new Peer(groupId, leaderEndpoint);
   }
 
@@ -513,7 +517,7 @@ class RatisConsensus implements IConsensus {
         .setServerId(server.getId())
         .setClientId(localFakeId)
         .setCallId(localFakeCallId.incrementAndGet())
-        .setGroupId(Utils.toRatisGroupId(groupId))
+        .setGroupId(Utils.fromConsensusGroupIdToRaftGroupId(groupId))
         .setType(type)
         .setMessage(message)
         .build();
@@ -536,11 +540,9 @@ class RatisConsensus implements IConsensus {
   }
 
   private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
-    List<RaftPeer> raftPeers =
-        peers.stream()
-            .map(peer -> Utils.toRaftPeer(peer, DEFAULT_PRIORITY))
-            .collect(Collectors.toList());
-    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+    return RaftGroup.valueOf(
+        Utils.fromConsensusGroupIdToRaftGroupId(groupId),
+        Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY));
   }
 
   private RatisClient getRaftClient(RaftGroup group) throws IOException {
@@ -575,6 +577,11 @@ class RatisConsensus implements IConsensus {
     return reply;
   }
 
+  @TestOnly
+  public RaftServer getServer() {
+    return server;
+  }
+
   private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RatisClient> {
     @Override
     public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 0c3e5d984f..e1773c7f80 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
 
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.thrift.TException;
@@ -33,6 +35,8 @@ import org.apache.thrift.transport.TByteBuffer;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
 
 public class Utils {
   private static final int tempBufferSize = 1024;
@@ -42,7 +46,7 @@ public class Utils {
     return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
   }
 
-  /** Encode the ConsensusGroupId into 6 bytes 2 Bytes for Group Type 4 Bytes for Group ID */
+  /** Encode the ConsensusGroupId into 6 bytes: 2 Bytes for Group Type and 4 Bytes for Group ID */
   public static long groupEncode(ConsensusGroupId consensusGroupId) {
     // use abbreviations to prevent overflow
     long groupType = consensusGroupId.getType().getValue();
@@ -51,36 +55,49 @@ public class Utils {
     return groupCode;
   }
 
-  public static String RatisPeerId(TEndPoint endpoint) {
-    return String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+  public static RaftPeerId fromTEndPointToRaftPeerId(TEndPoint endpoint) {
+    return RaftPeerId.valueOf(String.format("%s-%d", endpoint.getIp(), endpoint.getPort()));
   }
 
-  public static TEndPoint parseFromRatisId(String ratisId) {
-    String[] items = ratisId.split("-");
+  public static TEndPoint formRaftPeerIdToTEndPoint(RaftPeerId id) {
+    String[] items = id.toString().split("-");
+    return new TEndPoint(items[0], Integer.parseInt(items[1]));
+  }
+
+  public static TEndPoint formRaftPeerProtoToTEndPoint(RaftPeerProto proto) {
+    String[] items = proto.getAddress().split(":");
     return new TEndPoint(items[0], Integer.parseInt(items[1]));
   }
 
   // priority is used as ordinal of leader election
-  public static RaftPeer toRaftPeer(TEndPoint endpoint, int priority) {
+  public static RaftPeer fromTEndPointAndPriorityToRaftPeer(TEndPoint endpoint, int priority) {
     return RaftPeer.newBuilder()
-        .setId(RatisPeerId(endpoint))
+        .setId(fromTEndPointToRaftPeerId(endpoint))
         .setAddress(IPAddress(endpoint))
         .setPriority(priority)
         .build();
   }
 
-  public static RaftPeer toRaftPeer(Peer peer, int priority) {
-    return toRaftPeer(peer.getEndpoint(), priority);
+  public static RaftPeer fromTEndPointAndPriorityToRaftPeer(Peer peer, int priority) {
+    return fromTEndPointAndPriorityToRaftPeer(peer.getEndpoint(), priority);
+  }
+
+  public static List<RaftPeer> fromPeersAndPriorityToRaftPeers(List<Peer> peers, int priority) {
+    return peers.stream()
+        .map(peer -> Utils.fromTEndPointAndPriorityToRaftPeer(peer, priority))
+        .collect(Collectors.toList());
   }
 
-  public static TEndPoint getEndpoint(RaftPeer raftPeer) {
-    String address = raftPeer.getAddress(); // ip:port
-    String[] split = address.split(":");
-    return new TEndPoint(split[0], Integer.parseInt(split[1]));
+  public static List<Peer> fromRaftProtoListAndRaftGroupIdToPeers(
+      List<RaftPeerProto> raftProtoList, RaftGroupId id) {
+    ConsensusGroupId consensusGroupId = Utils.fromRaftGroupIdToConsensusGroupId(id);
+    return raftProtoList.stream()
+        .map(peer -> new Peer(consensusGroupId, Utils.formRaftPeerProtoToTEndPoint(peer)))
+        .collect(Collectors.toList());
   }
 
   /** Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme: */
-  public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+  public static RaftGroupId fromConsensusGroupIdToRaftGroupId(ConsensusGroupId consensusGroupId) {
     long groupCode = groupEncode(consensusGroupId);
     byte[] bGroupCode = ByteBuffer.allocate(Long.BYTES).putLong(groupCode).array();
     byte[] bPaddedGroupName = new byte[16];
@@ -93,7 +110,7 @@ public class Utils {
   }
 
   /** Given raftGroupId, decrypt ConsensusGroupId out of it */
-  public static ConsensusGroupId toConsensusGroupId(RaftGroupId raftGroupId) {
+  public static ConsensusGroupId fromRaftGroupIdToConsensusGroupId(RaftGroupId raftGroupId) {
     byte[] padded = raftGroupId.toByteString().toByteArray();
     long type = (padded[10] << 8) + padded[11];
     ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index e1ea8fa1d4..01992f472f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import java.io.File;
 
-public class EmptyStateMachine implements IStateMachine {
+public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   @Override
   public void start() {}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 2b3e952823..4eee289586 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -54,6 +54,7 @@ public class RatisConsensusTest {
   private List<Peer> peers;
   private List<File> peersStorage;
   private List<IConsensus> servers;
+  private List<TestUtils.IntegerCounter> stateMachines;
   private ConsensusGroup group;
   private Peer peer0;
   private Peer peer1;
@@ -62,12 +63,14 @@ public class RatisConsensusTest {
 
   private void makeServers() throws IOException {
     for (int i = 0; i < 3; i++) {
+      stateMachines.add(new TestUtils.IntegerCounter());
+      int finalI = i;
       servers.add(
           ConsensusFactory.getConsensusImpl(
                   RATIS_CLASS_NAME,
                   peers.get(i).getEndpoint(),
                   peersStorage.get(i),
-                  groupId -> new TestUtils.IntegerCounter())
+                  groupId -> stateMachines.get(finalI))
               .orElseThrow(
                   () ->
                       new IllegalArgumentException(
@@ -95,6 +98,7 @@ public class RatisConsensusTest {
     }
     group = new ConsensusGroup(gid, peers);
     servers = new ArrayList<>();
+    stateMachines = new ArrayList<>();
     makeServers();
   }
 
@@ -130,6 +134,9 @@ public class RatisConsensusTest {
     servers.get(0).removeConsensusGroup(gid);
     servers.get(2).removeConsensusGroup(gid);
     Assert.assertEquals(servers.get(1).getLeader(gid).getEndpoint(), peers.get(1).getEndpoint());
+    Assert.assertEquals(stateMachines.get(1).getLeaderEndpoint(), peers.get(1).getEndpoint());
+    Assert.assertEquals(stateMachines.get(1).getConfiguration().size(), 1);
+    Assert.assertEquals(stateMachines.get(1).getConfiguration().get(0), peers.get(1));
 
     // 4. try consensus again with one peer
     doConsensus(servers.get(1), gid, 10, 20);
@@ -141,6 +148,7 @@ public class RatisConsensusTest {
     // then use addPeer to inform the group leader of configuration change
     servers.get(1).addPeer(gid, peer0);
     servers.get(1).addPeer(gid, peer2);
+    Assert.assertEquals(stateMachines.get(1).getConfiguration().size(), 3);
 
     // 6. try consensus with all 3 peers
     doConsensus(servers.get(2), gid, 10, 30);
@@ -150,6 +158,9 @@ public class RatisConsensusTest {
     servers.get(0).changePeer(group.getGroupId(), Collections.singletonList(peer0));
     servers.get(1).removeConsensusGroup(group.getGroupId());
     servers.get(2).removeConsensusGroup(group.getGroupId());
+    Assert.assertEquals(stateMachines.get(0).getLeaderEndpoint(), peers.get(0).getEndpoint());
+    Assert.assertEquals(stateMachines.get(0).getConfiguration().size(), 1);
+    Assert.assertEquals(stateMachines.get(0).getConfiguration().get(0), peers.get(0));
 
     // 8. try consensus with only peer0
     doConsensus(servers.get(0), gid, 10, 40);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index 3af8c5df5e..e42f981706 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -87,7 +87,7 @@ public class SnapshotTest {
   @Test
   public void testSnapshot() throws Exception {
     ApplicationStateMachineProxy proxy =
-        new ApplicationStateMachineProxy(new TestUtils.IntegerCounter());
+        new ApplicationStateMachineProxy(new TestUtils.IntegerCounter(), null);
 
     proxy.initialize(null, null, new EmptyStorageWithOnlySMDir());
 
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index eca57c4a19..0e36ea878e 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iotdb.consensus.ratis;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
@@ -33,6 +36,7 @@ import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Scanner;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -61,9 +65,11 @@ public class TestUtils {
     }
   }
 
-  static class IntegerCounter implements IStateMachine {
+  static class IntegerCounter implements IStateMachine, IStateMachine.EventApi {
     private AtomicInteger integer;
     private final Logger logger = LoggerFactory.getLogger(IntegerCounter.class);
+    private TEndPoint leaderEndpoint;
+    private List<Peer> configuration;
 
     @Override
     public void start() {
@@ -113,6 +119,27 @@ public class TestUtils {
       }
     }
 
+    @Override
+    public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint newLeader) {
+      this.leaderEndpoint = newLeader;
+      System.out.println("---------newLeader-----------");
+      System.out.println(groupId);
+      System.out.println(newLeader);
+      System.out.println("----------------------");
+    }
+
+    @Override
+    public void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {
+      this.configuration = newConfiguration;
+      System.out.println("----------newConfiguration------------");
+      System.out.println("term : " + term);
+      System.out.println("index : " + index);
+      for (Peer peer : newConfiguration) {
+        System.out.println(peer);
+      }
+      System.out.println("----------------------");
+    }
+
     @TestOnly
     public static synchronized String ensureSnapshotFileName(File snapshotDir, String metadata) {
       File dir = new File(snapshotDir + File.separator + metadata);
@@ -121,5 +148,13 @@ public class TestUtils {
       }
       return dir.getPath() + File.separator + "snapshot";
     }
+
+    public TEndPoint getLeaderEndpoint() {
+      return leaderEndpoint;
+    }
+
+    public List<Peer> getConfiguration() {
+      return configuration;
+    }
   }
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
index 9d11b7967e..794c23289a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
@@ -29,8 +29,8 @@ public class UtilsTest {
   @Test
   public void testEncryption() {
     ConsensusGroupId raw = new PartitionRegionId(100);
-    RaftGroupId id = Utils.toRatisGroupId(raw);
-    ConsensusGroupId cgid = Utils.toConsensusGroupId(id);
+    RaftGroupId id = Utils.fromConsensusGroupIdToRaftGroupId(raw);
+    ConsensusGroupId cgid = Utils.fromRaftGroupIdToConsensusGroupId(id);
     Assert.assertEquals(raw.getId(), cgid.getId());
     Assert.assertEquals(raw.getType(), cgid.getType());
   }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 5e814f7151..776257e2f8 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -82,7 +82,7 @@ public class StandAloneConsensusTest {
     }
   }
 
-  private static class TestStateMachine implements IStateMachine {
+  private static class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
 
     private final boolean direction;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 2298d69272..e46ad09a99 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class BaseStateMachine implements IStateMachine {
+public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);