You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/06 03:25:18 UTC

[iotdb] branch native_raft updated: implement membership change funcitons

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 0381fb3f68 implement membership change funcitons
0381fb3f68 is described below

commit 0381fb3f68b5430703c0ec58d8db65edb9c5880a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 6 11:26:46 2023 +0800

    implement membership change funcitons
---
 .../iotdb/consensus/natraft/RaftConsensus.java     |  59 +++++++--
 .../natraft/client/SyncClientAdaptor.java          |   8 ++
 .../consensus/natraft/protocol/RaftMember.java     | 136 +++++++++++++++++++--
 .../natraft/protocol/log/applier/BaseApplier.java  |  10 +-
 .../log/dispatch/AppendNodeEntryHandler.java       |   4 -
 .../protocol/log/dispatch/LogDispatcher.java       |  25 +++-
 .../manager/DirectorySnapshotRaftLogManager.java   |   7 +-
 .../protocol/log/manager/RaftLogManager.java       |  10 +-
 .../serialization/SyncLogDequeSerializer.java      |  10 --
 .../natraft/service/RaftRPCServiceProcessor.java   |   7 ++
 .../iotdb/consensus/natraft/utils/NodeUtils.java   |   1 +
 .../iotdb/consensus/natraft/utils/StatusUtils.java |  18 +++
 thrift-raft/src/main/thrift/raft.thrift            |  22 ++--
 13 files changed, 267 insertions(+), 50 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 3aad5abedc..8c3231d21c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
 import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
+import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -129,7 +130,8 @@ public class RaftConsensus implements IConsensus {
                   null,
                   consensusGroupId,
                   registry.apply(consensusGroupId),
-                  clientManager);
+                  clientManager,
+                  this::onMemberRemoved);
           stateMachineMap.put(consensusGroupId, raftMember);
           raftMember.start();
         }
@@ -179,6 +181,9 @@ public class RaftConsensus implements IConsensus {
     }
   }
 
+  public void onMemberRemoved(ConsensusGroupId groupId) {
+    stateMachineMap.remove(groupId);
+  }
 
   public boolean createNewMemberIfAbsent(ConsensusGroupId groupId, Peer thisPeer,
       List<Peer> peers, List<Peer> newPeers) {
@@ -194,7 +199,8 @@ public class RaftConsensus implements IConsensus {
           }
           RaftMember impl =
               new RaftMember(
-                  path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId), clientManager);
+                  path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId),
+                  clientManager, this::onMemberRemoved);
           impl.start();
           return impl;
         });
@@ -250,32 +256,69 @@ public class RaftConsensus implements IConsensus {
 
   @Override
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftMember impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return StatusUtils.toGenericResponse(impl.addPeer(peer));
   }
 
   @Override
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftMember impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return StatusUtils.toGenericResponse(impl.removePeer(peer));
   }
 
   @Override
   public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+    RaftMember impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return StatusUtils.toGenericResponse(impl.updatePeer(oldPeer, newPeer));
   }
 
   @Override
   public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftMember impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return StatusUtils.toGenericResponse(impl.changeConfig(newPeers));
   }
 
   @Override
   public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftMember impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return StatusUtils.toGenericResponse(impl.transferLeader(newLeader));
   }
 
   @Override
   public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftMember impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    impl.triggerSnapshot();
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index fef5e19c26..a5be4ae46a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -83,6 +83,14 @@ public class SyncClientAdaptor {
     return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
   }
 
+  public static TSStatus forceElection(
+      AsyncRaftServiceClient client, ConsensusGroupId groupId)
+      throws TException, InterruptedException {
+    GenericHandler<TSStatus> matchTermHandler = new GenericHandler<>(client.getEndpoint());
+    client.forceElection(groupId.convertToTConsensusGroupId(), matchTermHandler);
+    return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
+  }
+
   public static ByteBuffer readFile(
       AsyncRaftServiceClient client, String remotePath, long offset, int fetchSize)
       throws InterruptedException, TException {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 02e14a5ce0..9ac0f1ed86 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -23,12 +23,14 @@
 package org.apache.iotdb.consensus.natraft.protocol;
 
 import java.util.Collections;
+import java.util.function.Consumer;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
@@ -193,6 +195,7 @@ public class RaftMember {
   protected LogSequencer logSequencer;
   private volatile LogAppender logAppender;
   private FlowBalancer flowBalancer;
+  private Consumer<ConsensusGroupId> onRemove;
 
   public RaftMember(
       String storageDir,
@@ -202,7 +205,8 @@ public class RaftMember {
       List<Peer> newNodes,
       ConsensusGroupId groupId,
       IStateMachine stateMachine,
-      IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager) {
+      IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager,
+      Consumer<ConsensusGroupId> onRemove) {
     this.config = config;
     this.storageDir = storageDir;
     initConfig();
@@ -226,23 +230,67 @@ public class RaftMember {
 
     this.clientManager = clientManager;
     this.stateMachine = stateMachine;
-    this.logManager =
-        new DirectorySnapshotRaftLogManager(
-            new SyncLogDequeSerializer(groupId, config),
-            new AsyncLogApplier(new BaseApplier(stateMachine), name, config),
-            name,
-            stateMachine,
-            config);
+
     this.votingLogList = new VotingLogList(this);
     this.logAppender = appenderFactory.create(this, config);
     this.logSequencer = SEQUENCER_FACTORY.create(this, config);
     this.logDispatcher = new LogDispatcher(this, config);
     this.heartbeatReqHandler = new HeartbeatReqHandler(this);
     this.electionReqHandler = new ElectionReqHandler(this);
+    this.logManager =
+        new DirectorySnapshotRaftLogManager(
+            new SyncLogDequeSerializer(groupId, config),
+            new AsyncLogApplier(new BaseApplier(stateMachine, this), name, config),
+            name,
+            stateMachine,
+            config,
+            this::examineUnappliedEntry);
+    this.onRemove = onRemove;
 
     initPeerMap();
   }
 
+  public void applyConfigChange(ConfigChangeEntry configChangeEntry) {
+    List<Peer> newNodes = configChangeEntry.getNewPeers();
+    if (!newNodes.equals(this.newNodes)) {
+      return;
+    }
+
+    if (newNodes.contains(thisNode)) {
+      applyNewNodes();
+    }
+  }
+
+  public void applyNewNodes() {
+    try {
+      logManager.getLock().writeLock().lock();
+      logDispatcher.applyNewNodes();
+      allNodes = newNodes;
+      newNodes = null;
+      persistConfiguration();
+    } finally {
+      logManager.getLock().writeLock().unlock();
+    }
+  }
+
+  public void remove() {
+    stop();
+    FileUtils.deleteDirectory(new File(storageDir));
+    onRemove.accept(groupId);
+  }
+
+  private void examineUnappliedEntry(List<Entry> entries) {
+    ConfigChangeEntry configChangeEntry = null;
+    for (Entry entry : entries) {
+      if (entry instanceof  ConfigChangeEntry) {
+        configChangeEntry = (ConfigChangeEntry) entry;
+      }
+    }
+    if (configChangeEntry != null) {
+      setNewNodes(configChangeEntry.getNewPeers());
+    }
+  }
+
   public void recoverConfiguration() {
     ByteBuffer buffer;
     try {
@@ -1239,4 +1287,76 @@ public class RaftMember {
     newPeers.add(newPeer);
     return changeConfig(newPeers);
   }
+
+  public TSStatus removePeer(Peer toRemove) {
+    List<Peer> allNodes = getAllNodes();
+    if (!allNodes.contains(toRemove)) {
+      return StatusUtils.OK.deepCopy().setMessage("Peer already removed");
+    }
+
+    List<Peer> newPeers = new ArrayList<>(allNodes);
+    newPeers.remove(toRemove);
+    return changeConfig(newPeers);
+  }
+
+  public TSStatus updatePeer(Peer oldPeer, Peer newPeer) {
+    List<Peer> allNodes = getAllNodes();
+    if (!allNodes.contains(oldPeer)) {
+      return StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy().setMessage("Peer already removed");
+    }
+    if (allNodes.contains(newPeer)) {
+      return StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy().setMessage("Peer already exists");
+    }
+
+    List<Peer> newPeers = new ArrayList<>(allNodes);
+    newPeers.remove(oldPeer);
+    newPeers.add(newPeer);
+    return changeConfig(newPeers);
+  }
+
+  public void triggerSnapshot() {
+    logManager.takeSnapshot(this);
+  }
+
+  public TSStatus transferLeader(Peer peer) {
+    if (thisNode.equals(peer)) {
+      return StatusUtils.OK;
+    }
+    if (!isLeader()) {
+      return StatusUtils.NO_LEADER.deepCopy().setMessage("This node is not a leader");
+    }
+    if (!allNodes.contains(peer)) {
+      return StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy().setMessage("Peer not in this group");
+    }
+
+    AsyncRaftServiceClient client = getClient(peer.getEndpoint());
+    try {
+      return SyncClientAdaptor.forceElection(client, groupId);
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public TSStatus forceElection() {
+    if (isLeader()) {
+      return StatusUtils.OK;
+    }
+
+    heartbeatThread.setLastHeartbeatReceivedTime(0);
+    heartbeatThread.notifyHeartbeat();
+    long waitStart = System.currentTimeMillis();
+    long maxWait = 10_000L;
+    while (!isLeader() && (System.currentTimeMillis() - waitStart) < maxWait) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        return StatusUtils.TIME_OUT;
+      }
+    }
+    if (isLeader()) {
+      return StatusUtils.OK;
+    } else {
+      return StatusUtils.TIME_OUT;
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
index 6832400798..646159877f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
@@ -24,17 +24,21 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.logtype.RequestEntry;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 /** BaseApplier use PlanExecutor to execute PhysicalPlans. */
 public class BaseApplier implements LogApplier {
 
-  IStateMachine stateMachine;
+  protected IStateMachine stateMachine;
+  protected RaftMember member;
 
-  public BaseApplier(IStateMachine stateMachine) {
+  public BaseApplier(IStateMachine stateMachine, RaftMember member) {
     this.stateMachine = stateMachine;
+    this.member = member;
   }
 
   @TestOnly
@@ -53,6 +57,8 @@ public class BaseApplier implements LogApplier {
         if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           e.setException(new ConsensusException(status.message + ":" + status.code));
         }
+      } else if (e instanceof ConfigChangeEntry) {
+        member.applyConfigChange(((ConfigChangeEntry) e));
       }
     } catch (Exception ex) {
       e.setException(ex);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 2b5d92d390..7b5f9f5dc2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -50,7 +50,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   protected RaftMember member;
   protected VotingEntry log;
   protected Peer directReceiver;
-  protected int quorumSize;
 
   public AppendNodeEntryHandler() {}
 
@@ -150,7 +149,4 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     this.directReceiver = follower;
   }
 
-  public void setQuorumSize(int quorumSize) {
-    this.quorumSize = quorumSize;
-  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 44939e9f61..07f3bc3929 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -187,6 +187,26 @@ public class LogDispatcher {
     return new DispatcherThread(node, logBlockingQueue);
   }
 
+  public void applyNewNodes() {
+    allNodes = newNodes;
+    newNodes = null;
+
+    List<Peer> nodesToRemove = new ArrayList<>();
+    for (Entry<Peer, ExecutorService> entry : executorServices.entrySet()) {
+      if (!allNodes.contains(entry.getKey())) {
+        nodesToRemove.add(entry.getKey());
+      }
+    }
+    for (Peer peer : nodesToRemove) {
+      ExecutorService executorService = executorServices.remove(peer);
+      executorService.shutdownNow();
+      nodesRate.remove(peer);
+      nodesRateLimiter.remove(peer);
+      nodesEnabled.remove(peer);
+      nodesLogQueuesMap.remove(peer);
+    }
+  }
+
   protected class DispatcherThread implements Runnable {
 
     Peer receiver;
@@ -312,12 +332,11 @@ public class LogDispatcher {
     }
 
     public AppendNodeEntryHandler getAppendNodeEntryHandler(
-        VotingEntry log, Peer node, int quorumSize) {
+        VotingEntry log, Peer node) {
       AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
       handler.setDirectReceiver(node);
       handler.setLog(log);
       handler.setMember(member);
-      handler.setQuorumSize(quorumSize);
       return handler;
     }
 
@@ -329,7 +348,7 @@ public class LogDispatcher {
         singleEntryHandlers = new ArrayList<>(batch.size());
         for (VotingEntry sendLogRequest : batch) {
           AppendNodeEntryHandler handler =
-              getAppendNodeEntryHandler(sendLogRequest, receiver, sendLogRequest.getQuorumSize());
+              getAppendNodeEntryHandler(sendLogRequest, receiver);
           singleEntryHandlers.add(handler);
         }
       }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index ddf771a41b..befe8ef655 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.manager;
 
+import java.util.function.Consumer;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
@@ -45,8 +47,9 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
       LogApplier applier,
       String name,
       IStateMachine stateMachine,
-      RaftConfig config) {
-    super(stableEntryManager, applier, name, stateMachine, config);
+      RaftConfig config,
+      Consumer<List<Entry>> unappliedEntryExaminer) {
+    super(stableEntryManager, applier, name, stateMachine, config, unappliedEntryExaminer);
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 6c1be21199..3fdcf963d2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.manager;
 
+import java.util.function.Consumer;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.consensus.IStateMachine;
@@ -111,7 +112,8 @@ public abstract class RaftLogManager {
       LogApplier applier,
       String name,
       IStateMachine stateMachine,
-      RaftConfig config) {
+      RaftConfig config,
+      Consumer<List<Entry>> unappliedEntryExaminer) {
     this.logApplier = applier;
     this.name = name;
     this.stateMachine = stateMachine;
@@ -119,7 +121,7 @@ public abstract class RaftLogManager {
     this.config = config;
 
     initConf();
-    initEntries();
+    initEntries(unappliedEntryExaminer);
 
     this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
 
@@ -154,9 +156,11 @@ public abstract class RaftLogManager {
     }
   }
 
-  private void initEntries() {
+  private void initEntries(Consumer<List<Entry>> unappliedEntryExaminer) {
     LogManagerMeta meta = stableEntryManager.getMeta();
     List<Entry> allEntriesAfterAppliedIndex = stableEntryManager.getAllEntriesAfterAppliedIndex();
+    unappliedEntryExaminer.accept(allEntriesAfterAppliedIndex);
+
     entries = new ArrayList<>();
     if (!allEntriesAfterAppliedIndex.isEmpty()) {
       entries.addAll(allEntriesAfterAppliedIndex);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index c8dc1d2933..4df1d4faf2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -206,16 +206,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
   }
 
-  @TestOnly
-  String getLogDir() {
-    return logDir;
-  }
-
-  @TestOnly
-  File getMetaFile() {
-    return metaFile;
-  }
-
   /** for log tools */
   @Override
   public LogManagerMeta getMeta() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index 9b22f4cc00..08793449fd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -179,6 +179,13 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
     resultHandler.onComplete(member.requestCommitIndex());
   }
 
+  @Override
+  public void forceElection(TConsensusGroupId groupId, AsyncMethodCallback<TSStatus> resultHandler)
+      throws TException {
+    RaftMember member = getMember(groupId);
+    resultHandler.onComplete(member.forceElection());
+  }
+
   @Override
   public void readFile(
       String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
index 94a30dd51e..dc1ec45817 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -28,4 +28,5 @@ public class NodeUtils {
     nodeUnion.addAll(newNodes);
     return nodeUnion;
   }
+
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
index d7f0323627..ba01f7983f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.consensus.natraft.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse.Builder;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient.Factory;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.checkerframework.checker.units.qual.C;
 
 public class StatusUtils {
 
@@ -90,4 +95,17 @@ public class StatusUtils {
     newStatus.setRedirectNode(redirectedNode);
     return newStatus;
   }
+
+  public static ConsensusGenericResponse toGenericResponse(TSStatus status) {
+    Builder builder = ConsensusGenericResponse.newBuilder();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      builder.setSuccess(true);
+    } else {
+      builder.setSuccess(false);
+      builder.setException(
+          new ConsensusException(String.format("%d:%s", status.getCode(), status.getMessage())));
+    }
+
+    return ConsensusGenericResponse.newBuilder().build();
+  }
 }
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index a518a239e0..e2e6fa7e80 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -166,14 +166,16 @@ service RaftService {
   void ping()
 
   /**
-    * Ask the leader for its commit index, used to check whether the node has caught up with the
-    * leader.
-    **/
-    RequestCommitIndexResponse requestCommitIndex(1:common.TConsensusGroupId groupId)
-
-    /**
-      * Read a chunk of a file from the client. If the remaining of the file does not have enough
-      * bytes, only the remaining will be returned.
-      **/
-      binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
+   * Ask the leader for its commit index, used to check whether the node has caught up with the
+   * leader.
+   **/
+   RequestCommitIndexResponse requestCommitIndex(1:common.TConsensusGroupId groupId)
+
+   /**
+   * Read a chunk of a file from the client. If the remaining of the file does not have enough
+   * bytes, only the remaining will be returned.
+   **/
+   binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
+
+   common.TSStatus forceElection(1:common.TConsensusGroupId groupId)
 }
\ No newline at end of file