You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/01 03:09:50 UTC

[iotdb] branch native_raft updated: temp save

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 96bb581305 temp save
96bb581305 is described below

commit 96bb58130552866561de859c24e75ed05350636b
Author: Tian Jiang <jt...@163.com>
AuthorDate: Wed Mar 1 11:11:18 2023 +0800

    temp save
---
 .../iotdb/consensus/natraft/RaftConsensus.java     |   3 +-
 .../consensus/natraft/protocol/RaftMember.java     | 325 +++++++++++++--------
 .../consensus/natraft/protocol/log/Entry.java      |   3 +-
 .../natraft/protocol/log/VotingEntry.java          | 183 ++++++++++++
 .../consensus/natraft/protocol/log/VotingLog.java  | 119 --------
 .../log/dispatch/AppendNodeEntryHandler.java       |  28 +-
 .../protocol/log/dispatch/LogDispatcher.java       | 118 +++++---
 .../protocol/log/dispatch/VotingLogList.java       | 106 ++++---
 .../log/dispatch/flowcontrol/FlowBalancer.java     |   4 +-
 .../protocol/log/logtype/ConfigChangeEntry.java    |  80 +++++
 .../protocol/log/sequencing/LogSequencer.java      |   7 +-
 .../log/sequencing/LogSequencerFactory.java        |   2 +-
 .../log/sequencing/SynchronousSequencer.java       |  89 +-----
 .../iotdb/consensus/natraft/utils/LogUtils.java    |  81 +++++
 .../iotdb/consensus/natraft/utils/NodeUtils.java   |  18 ++
 15 files changed, 727 insertions(+), 439 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 03ff338c79..416bf1dca3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -126,6 +126,7 @@ public class RaftConsensus implements IConsensus {
                   config,
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   new ArrayList<>(),
+                  null,
                   consensusGroupId,
                   registry.apply(consensusGroupId),
                   clientManager);
@@ -204,7 +205,7 @@ public class RaftConsensus implements IConsensus {
           }
           RaftMember impl =
               new RaftMember(
-                  path, config, thisPeer, peers, groupId, registry.apply(groupId), clientManager);
+                  path, config, thisPeer, peers, null, groupId, registry.apply(groupId), clientManager);
           impl.start();
           return impl;
         });
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index e0019b89c5..b6fe3de5e5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1,27 +1,28 @@
 /*
 
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-
-
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+
+
+ */
 
 package org.apache.iotdb.consensus.natraft.protocol;
 
+import java.util.Collections;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -47,7 +48,7 @@ import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatReqHandler
 import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatThread;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.appender.BlockingLogAppender;
 import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppender;
 import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppenderFactory;
@@ -56,8 +57,10 @@ import org.apache.iotdb.consensus.natraft.protocol.log.applier.BaseApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.catchup.CatchUpManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.VotingLogList;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.VotingLogList.AcceptedType;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowBalancer;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.logtype.RequestEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogCallback;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogTask;
@@ -69,6 +72,8 @@ import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencerFa
 import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSequencer;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
 import org.apache.iotdb.consensus.natraft.utils.IOUtils;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
 import org.apache.iotdb.consensus.natraft.utils.Response;
 import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
@@ -108,7 +113,6 @@ public class RaftMember {
   private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
 
   private RaftConfig config;
-  private boolean enableWeakAcceptance;
   protected static final LogAppenderFactory appenderFactory = new BlockingLogAppender.Factory();
 
   protected static final LogSequencerFactory SEQUENCER_FACTORY = new SynchronousSequencer.Factory();
@@ -119,7 +123,6 @@ public class RaftMember {
   private static final String MSG_NO_LEADER_COMMIT_INDEX =
       "{}: Cannot request commit index from {}";
   private static final String MSG_NO_LEADER_IN_SYNC = "{}: No leader is found when synchronizing";
-  public static final String MSG_LOG_IS_ACCEPTED = "{}: log {} is accepted";
 
   /**
    * when the leader of this node changes, the condition will be notified so other threads that wait
@@ -127,17 +130,22 @@ public class RaftMember {
    */
   private final Object waitLeaderCondition = new Object();
 
-  /** the lock is to make sure that only one thread can apply snapshot at the same time */
+  /**
+   * the lock is to make sure that only one thread can apply snapshot at the same time
+   */
   private final Lock snapshotApplyLock = new ReentrantLock();
   /**
    * when the commit progress is updated by a heartbeat, this object is notified so that we may know
    * if this node is up-to-date with the leader, and whether the given consistency is reached
    */
-  private Object syncLock = new Object();
+  private final Object syncLock = new Object();
 
   protected Peer thisNode;
-  /** the nodes that belong to the same raft group as thisNode. */
+  /**
+   * the nodes that belong to the same raft group as thisNode.
+   */
   protected List<Peer> allNodes;
+  protected volatile List<Peer> newNodes;
 
   protected ConsensusGroupId groupId;
   protected String name;
@@ -145,7 +153,9 @@ public class RaftMember {
 
   protected RaftStatus status = new RaftStatus();
 
-  /** the raft logs are all stored and maintained in the log manager */
+  /**
+   * the raft logs are all stored and maintained in the log manager
+   */
   protected RaftLogManager logManager;
 
   protected HeartbeatThread heartbeatThread;
@@ -157,11 +167,6 @@ public class RaftMember {
    * candidates for weak consistency reads and provide snapshots for the new data holders
    */
   volatile boolean readOnly = false;
-  /**
-   * lastLogIndex when generating the previous member report, to show the log ingestion rate of the
-   * member by comparing it with the current last log index.
-   */
-  long lastReportedLogIndex;
 
   /**
    * client manager that provides reusable Thrift clients to connect to other RaftMembers and
@@ -170,12 +175,15 @@ public class RaftMember {
   protected IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
 
   protected CatchUpManager catchUpManager;
-  /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
+  /**
+   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+   */
   private ExecutorService commitLogPool;
 
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
-   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
+   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
+   * logs.
    */
   private volatile LogDispatcher logDispatcher;
 
@@ -191,6 +199,7 @@ public class RaftMember {
       RaftConfig config,
       Peer thisNode,
       List<Peer> allNodes,
+      List<Peer> newNodes,
       ConsensusGroupId groupId,
       IStateMachine stateMachine,
       IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager) {
@@ -205,6 +214,7 @@ public class RaftMember {
     } else {
       persistConfiguration();
     }
+    this.newNodes = newNodes;
 
     this.groupId = groupId;
     this.name =
@@ -223,9 +233,9 @@ public class RaftMember {
             name,
             stateMachine,
             config);
-    this.votingLogList = new VotingLogList(allNodes.size() / 2, this);
+    this.votingLogList = new VotingLogList(this);
     this.logAppender = appenderFactory.create(this, config);
-    this.logSequencer = SEQUENCER_FACTORY.create(this, logManager, config);
+    this.logSequencer = SEQUENCER_FACTORY.create(this, config);
     this.logDispatcher = new LogDispatcher(this, config);
     this.heartbeatReqHandler = new HeartbeatReqHandler(this);
     this.electionReqHandler = new ElectionReqHandler(this);
@@ -305,7 +315,7 @@ public class RaftMember {
   }
 
   private void initConfig() {
-    this.enableWeakAcceptance = config.isEnableWeakAcceptance();
+    votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
   }
 
   public void initPeerMap() {
@@ -451,7 +461,9 @@ public class RaftMember {
     return appendEntriesInternal(request);
   }
 
-  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+  /**
+   * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
+   */
   private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
       throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntriesRequest", name);
@@ -563,11 +575,7 @@ public class RaftMember {
     return Objects.equals(status.leader.get(), thisNode);
   }
 
-  public TSStatus processRequest(IConsensusRequest request) {
-    if (readOnly) {
-      return StatusUtils.NODE_READ_ONLY;
-    }
-
+  private TSStatus ensureLeader(IConsensusRequest request) {
     if (getLeader() == null) {
       waitLeader();
     }
@@ -576,10 +584,25 @@ public class RaftMember {
       Peer leader = getLeader();
       if (leader == null) {
         return StatusUtils.NO_LEADER;
-      } else {
+      } else if (request != null) {
         return forwardRequest(request, leader.getEndpoint(), leader.getGroupId());
+      } else {
+        return new TSStatus().setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
+            .setRedirectNode(leader.getEndpoint());
       }
     }
+    return StatusUtils.OK;
+  }
+
+  public TSStatus processRequest(IConsensusRequest request) {
+    if (readOnly) {
+      return StatusUtils.NODE_READ_ONLY;
+    }
+
+    TSStatus tsStatus = ensureLeader(request);
+    if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return tsStatus;
+    }
 
     logger.debug("{}: Processing request {}", name, request);
     Entry entry = new RequestEntry(request);
@@ -593,37 +616,16 @@ public class RaftMember {
     }
 
     // assign term and index to the new log and append it
-    VotingLog sendLogRequest = logSequencer.sequence(entry);
+    VotingEntry votingEntry = logSequencer.sequence(entry);
     if (config.isUseFollowerLoadBalance()) {
       FlowMonitorManager.INSTANCE.report(thisNode, entry.estimateSize());
     }
 
-    if (sendLogRequest == null) {
+    if (votingEntry == null) {
       return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
     }
 
-    try {
-      AppendLogResult appendLogResult =
-          waitAppendResult(sendLogRequest, sendLogRequest.getQuorumSize());
-      switch (appendLogResult) {
-        case WEAK_ACCEPT:
-          return includeLogNumbersInStatus(
-              StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), entry);
-        case OK:
-          waitApply(entry);
-          return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), entry);
-        case TIME_OUT:
-          logger.debug("{}: log {} timed out...", name, entry);
-          break;
-        case LEADERSHIP_STALE:
-          // abort the appending, the new leader will fix the local logs by catch-up
-        default:
-          break;
-      }
-    } catch (LogExecutionException e) {
-      return handleLogExecutionException(entry, IOUtils.getRootCause(e));
-    }
-    return StatusUtils.getStatus(TSStatusCode.TIME_OUT);
+    return waitForEntryResult(votingEntry);
   }
 
   protected void waitApply(Entry entry) throws LogExecutionException {
@@ -650,23 +652,16 @@ public class RaftMember {
         getRaftGroupId() + "-" + entry.getCurrLogIndex() + "-" + entry.getCurrLogTerm());
   }
 
-  protected AppendLogResult waitAppendResult(VotingLog log, int quorumSize) {
+  protected AppendLogResult waitAppendResult(VotingEntry votingEntry) {
     // wait for the followers to vote
-    int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
-    int weaklyAccepted = log.getWeaklyAcceptedNodes().size();
-    int stronglyAccepted = totalAccepted - weaklyAccepted;
-
-    if (log.getEntry().getCurrLogIndex() == Long.MIN_VALUE
-        || ((stronglyAccepted < quorumSize
-            || (!enableWeakAcceptance || (totalAccepted < quorumSize)) && !log.isHasFailed()))) {
-      waitAppendResultLoop(log, quorumSize);
+
+    AcceptedType acceptedType = votingLogList.computeAcceptedType(votingEntry);
+    if (votingLogList.computeAcceptedType(votingEntry) == AcceptedType.NOT_ACCEPTED) {
+      acceptedType = waitAppendResultLoop(votingEntry);
     }
-    totalAccepted = votingLogList.totalAcceptedNodeNum(log);
-    weaklyAccepted = log.getWeaklyAcceptedNodes().size();
-    stronglyAccepted = totalAccepted - weaklyAccepted;
 
     // a node has a larger status.term than the local node, so this node is no longer a valid leader
-    if (status.term.get() != log.getEntry().getCurrLogTerm()) {
+    if (status.term.get() != votingEntry.getEntry().getCurrLogTerm()) {
       return AppendLogResult.LEADERSHIP_STALE;
     }
     // the node knows it is no longer the leader from other requests
@@ -674,18 +669,17 @@ public class RaftMember {
       return AppendLogResult.LEADERSHIP_STALE;
     }
 
-    if (totalAccepted >= quorumSize && stronglyAccepted < quorumSize) {
+    if (acceptedType == AcceptedType.WEAKLY_ACCEPTED) {
       return AppendLogResult.WEAK_ACCEPT;
     }
 
-    // cannot get enough agreements within a certain amount of time
-    if (totalAccepted < quorumSize) {
-      logger.info("{} failed because {} < {}", log, totalAccepted, quorumSize);
-      return AppendLogResult.TIME_OUT;
+    if (acceptedType == AcceptedType.STRONGLY_ACCEPTED) {
+      return AppendLogResult.OK;
     }
 
-    // voteCounter has counted down to zero
-    return AppendLogResult.OK;
+    // cannot get enough agreements within a certain amount of time
+    logger.info("{} failed", votingEntry);
+    return AppendLogResult.TIME_OUT;
   }
 
   protected TSStatus handleLogExecutionException(Object log, Throwable cause) {
@@ -705,23 +699,22 @@ public class RaftMember {
    * one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
    */
   @SuppressWarnings({"java:S2445"}) // safe synchronized
-  private void waitAppendResultLoop(VotingLog log, int quorumSize) {
-    int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
-    int weaklyAccepted = log.getWeaklyAcceptedNodes().size();
-    int stronglyAccepted = totalAccepted - weaklyAccepted;
-    long nextTimeToPrint = 5000;
+  private AcceptedType waitAppendResultLoop(VotingEntry log) {
 
+    long nextTimeToPrint = 5000;
     long waitStart = System.nanoTime();
     long alreadyWait = 0;
 
     String threadBaseName = Thread.currentThread().getName();
+    if (logger.isDebugEnabled()) {
+      Thread.currentThread()
+          .setName(threadBaseName + "-waiting-" + log.getEntry().getCurrLogIndex());
+    }
     long waitTime = 1;
+    AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
     synchronized (log.getEntry()) {
-      while (log.getEntry().getCurrLogIndex() == Long.MIN_VALUE
-          || (stronglyAccepted < quorumSize
-              && (!(enableWeakAcceptance && (totalAccepted >= quorumSize))
-                  && alreadyWait < config.getWriteOperationTimeoutMS()
-                  && !log.isHasFailed()))) {
+      while (votingLogList.computeAcceptedType(log) == AcceptedType.NOT_ACCEPTED
+          && alreadyWait < config.getWriteOperationTimeoutMS()) {
         try {
           log.getEntry().wait(waitTime);
         } catch (InterruptedException e) {
@@ -739,9 +732,6 @@ public class RaftMember {
               alreadyWait);
           nextTimeToPrint *= 2;
         }
-        totalAccepted = votingLogList.totalAcceptedNodeNum(log);
-        weaklyAccepted = log.getWeaklyAcceptedNodes().size();
-        stronglyAccepted = totalAccepted - weaklyAccepted;
       }
     }
     if (logger.isDebugEnabled()) {
@@ -755,6 +745,7 @@ public class RaftMember {
           log.getWeaklyAcceptedNodes(),
           alreadyWait);
     }
+    return acceptedType;
   }
 
   public ConsensusWriteResponse executeForwardedRequest(IConsensusRequest request) {
@@ -790,32 +781,31 @@ public class RaftMember {
     }
   }
 
-  public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
+  public void syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
     if (status.role == RaftRole.LEADER) {
-      return true;
+      return;
     }
     waitLeader();
     if (status.leader.get() == null || status.leader.get() == null) {
       // the leader has not been elected, we must assume the node falls behind
       logger.warn(MSG_NO_LEADER_IN_SYNC, name);
-      return false;
+      return;
     }
     if (status.role == RaftRole.LEADER) {
-      return true;
+      return;
     }
     logger.debug("{}: try synchronizing with the leader {}", name, status.leader.get());
-    return waitUntilCatchUp(checkConsistency);
+    waitUntilCatchUp(checkConsistency);
   }
 
   /**
    * Request the leader's commit index and wait until the local commit index becomes not less than
    * it.
    *
-   * @return true if this node has caught up before timeout, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *     value after timeout
+   *                                   value after timeout
    */
-  protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
+  protected void waitUntilCatchUp(CheckConsistency checkConsistency)
       throws CheckConsistencyException {
     long leaderCommitId = Long.MIN_VALUE;
     RequestCommitIndexResponse response;
@@ -826,7 +816,7 @@ public class RaftMember {
       tryUpdateCommitIndex(
           response.getTerm(), response.getCommitLogIndex(), response.getCommitLogTerm());
 
-      return syncLocalApply(leaderCommitId, true);
+      syncLocalApply(leaderCommitId, true);
     } catch (TException e) {
       logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, status.leader.get(), e);
     } catch (InterruptedException e) {
@@ -838,17 +828,15 @@ public class RaftMember {
       }
     }
     logger.debug("Start to sync with leader, leader commit id is {}", leaderCommitId);
-    return false;
   }
 
   /**
    * sync local applyId to leader commitId
    *
    * @param leaderCommitId leader commit id
-   * @param fastFail if enabled, when log differ too much, return false directly.
-   * @return true if leaderCommitId <= localAppliedId
+   * @param fastFail       if enabled, when log differ too much, return false directly.
    */
-  public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
+  public void syncLocalApply(long leaderCommitId, boolean fastFail) {
     long startTime = System.currentTimeMillis();
     long waitedTime = 0;
     long localAppliedId;
@@ -856,7 +844,7 @@ public class RaftMember {
     if (fastFail && leaderCommitId - logManager.getAppliedIndex() > config.getMaxSyncLogLag()) {
       logger.info(
           "{}: The raft log of this member is too backward to provide service directly.", name);
-      return false;
+      return;
     }
 
     while (waitedTime < config.getSyncLeaderMaxWaitMs()) {
@@ -870,7 +858,7 @@ public class RaftMember {
             logger.debug(
                 "{}: synchronized to target index {} after {}ms", name, leaderCommitId, waitedTime);
           }
-          return true;
+          return;
         }
         // wait for next heartbeat to catch up
         // the local node will not perform a commit here according to the leaderCommitId because
@@ -889,10 +877,11 @@ public class RaftMember {
         name,
         leaderCommitId,
         waitedTime);
-    return false;
   }
 
-  /** Wait until the leader of this node becomes known or time out. */
+  /**
+   * Wait until the leader of this node becomes known or time out.
+   */
   public void waitLeader() {
     long startTime = System.currentTimeMillis();
     while (status.leader.get() == null || status.leader.get() == null) {
@@ -929,7 +918,9 @@ public class RaftMember {
     return handler.getResult(config.getConnectionTimeoutInMS());
   }
 
-  /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
+  /**
+   * @return true if there is a log whose index is "index" and term is "term", false otherwise
+   */
   public boolean matchLog(long index, long term) {
     boolean matched = logManager.matchTerm(term, index);
     logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -939,14 +930,14 @@ public class RaftMember {
   /**
    * Forward a non-query plan to a node using the default client.
    *
-   * @param plan a non-query plan
-   * @param node cannot be the local node
+   * @param plan    a non-query plan
+   * @param node    cannot be the local node
    * @param groupId must be set for data group communication, set to null for meta group
-   *     communication
+   *                communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
   public TSStatus forwardRequest(IConsensusRequest plan, TEndPoint node, ConsensusGroupId groupId) {
-    if (node == null || node.equals(thisNode)) {
+    if (node == null || node.equals(thisNode.getEndpoint())) {
       logger.debug("{}: plan {} has no where to be forwarded", name, plan);
       return StatusUtils.NO_LEADER;
     }
@@ -957,7 +948,7 @@ public class RaftMember {
     if (status.getCode() == TSStatusCode.TIME_OUT.getStatusCode()
         && (groupId == null || groupId.equals(getRaftGroupId()))
         && (this.status.leader.get() != null)
-        && this.status.leader.get().equals(node)) {
+        && this.status.leader.get().getEndpoint().equals(node)) {
       // leader is down, trigger a new election by resetting heartbeat
       heartbeatThread.setLastHeartbeatReceivedTime(-1);
       this.status.leader.set(null);
@@ -969,7 +960,7 @@ public class RaftMember {
   /**
    * Forward a non-query plan to "receiver" using "client".
    *
-   * @param plan a non-query plan
+   * @param plan    a non-query plan
    * @param groupId to determine which DataGroupMember of "receiver" will process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
@@ -1105,8 +1096,8 @@ public class RaftMember {
     return logDispatcher;
   }
 
-  public VotingLog buildVotingLog(Entry e) {
-    return new VotingLog(e, allNodes.size(), null, allNodes.size() / 2, config);
+  public VotingEntry buildVotingLog(Entry e) {
+    return new VotingEntry(e, null, allNodes, newNodes, config);
   }
 
   public HeartbeatThread getHeartbeatThread() {
@@ -1145,6 +1136,10 @@ public class RaftMember {
     return allNodes;
   }
 
+  public List<Peer> getNewNodes() {
+    return newNodes;
+  }
+
   public AsyncRaftServiceClient getHeartbeatClient(TEndPoint node) {
     try {
       return clientManager.borrowClient(node);
@@ -1162,4 +1157,82 @@ public class RaftMember {
       return null;
     }
   }
+
+  public TSStatus changeConfig(List<Peer> newConfig) {
+    TSStatus tsStatus = ensureLeader(null);
+    if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return tsStatus;
+    }
+
+    List<Peer> oldNodes = new ArrayList<>(allNodes);
+    VotingEntry votingEntry;
+    try {
+      logManager.getLock().writeLock().lock();
+      if (newNodes != null) {
+        return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode()).setMessage(
+            "Last configuration change in progress");
+      }
+      ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newConfig);
+      Entry lastEntry = logManager.getLastEntry();
+      long lastIndex = lastEntry.getCurrLogIndex();
+      long lastTerm = lastEntry.getCurrLogTerm();
+
+      e.setCurrLogTerm(getStatus().getTerm().get());
+      e.setCurrLogIndex(lastIndex + 1);
+      e.setPrevTerm(lastTerm);
+
+      logManager.append(Collections.singletonList(e));
+      votingEntry = LogUtils.buildVotingLog(e, this);
+
+      logDispatcher.setNewNodes(newNodes);
+      this.newNodes = newNodes;
+
+      logDispatcher.offer(votingEntry);
+    } finally {
+      logManager.getLock().writeLock().unlock();
+    }
+
+    List<Peer> addedNodes = NodeUtils.computeAddedNodes(oldNodes, newNodes);
+    for (Peer addedNode : addedNodes) {
+      catchUp(addedNode, 0);
+    }
+
+    return waitForEntryResult(votingEntry);
+  }
+
+  private TSStatus waitForEntryResult(VotingEntry votingEntry) {
+    try {
+      AppendLogResult appendLogResult =
+          waitAppendResult(votingEntry);
+      switch (appendLogResult) {
+        case WEAK_ACCEPT:
+          return includeLogNumbersInStatus(
+              StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), votingEntry.getEntry());
+        case OK:
+          waitApply(votingEntry.getEntry());
+          return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), votingEntry.getEntry());
+        case TIME_OUT:
+          logger.debug("{}: log {} timed out...", name, votingEntry.getEntry());
+          break;
+        case LEADERSHIP_STALE:
+          // abort the appending, the new leader will fix the local logs by catch-up
+        default:
+          break;
+      }
+    } catch (LogExecutionException e) {
+      return handleLogExecutionException(votingEntry.getEntry(), IOUtils.getRootCause(e));
+    }
+    return StatusUtils.getStatus(TSStatusCode.TIME_OUT);
+  }
+
+  public TSStatus addPeer(Peer newPeer) {
+    List<Peer> allNodes = getAllNodes();
+    if (allNodes.contains(newPeer)) {
+      return StatusUtils.OK.deepCopy().setMessage("Peer already exists");
+    }
+
+    List<Peer> newPeers = new ArrayList<>(allNodes);
+    newPeers.add(newPeer);
+    return changeConfig(newPeers);
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 1a357441d5..1748518078 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -61,7 +61,8 @@ public abstract class Entry implements Comparable<Entry> {
   public enum Types {
     // DO CHECK LogParser when you add a new type of log
     CLIENT_REQUEST,
-    EMPTY
+    EMPTY,
+    CONFIG_CHANGE
   }
 
   public long getCurrLogIndex() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
new file mode 100644
index 0000000000..30be376f74
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+public class VotingEntry {
+
+  protected Entry entry;
+  // for NB-Raft
+  protected Set<Peer> weaklyAcceptedNodes;
+  private boolean hasFailed;
+  private AppendEntryRequest appendEntryRequest;
+  private Future<ByteBuffer> serializedLogFuture;
+  protected List<Peer> currNodes;
+  protected List<Peer> newNodes;
+  private boolean isStronglyAccepted;
+
+  public VotingEntry(
+      Entry entry,
+      AppendEntryRequest appendEntryRequest,
+      List<Peer> currNodes,
+      List<Peer> newNodes,
+      RaftConfig config) {
+    this.entry = entry;
+    if (config.isUseFollowerSlidingWindow()) {
+      weaklyAcceptedNodes = new HashSet<>(
+          currNodes.size() + (newNodes != null ? newNodes.size() : 0));
+    }
+    this.setAppendEntryRequest(appendEntryRequest);
+    this.currNodes = currNodes;
+    this.newNodes = newNodes;
+  }
+
+  public VotingEntry(VotingEntry another) {
+    this.entry = another.entry;
+    this.weaklyAcceptedNodes = another.weaklyAcceptedNodes;
+    this.setAppendEntryRequest(another.appendEntryRequest);
+    this.setSerializedLogFuture(another.getSerializedLogFuture());
+    this.currNodes = another.currNodes;
+    this.newNodes = another.newNodes;
+  }
+
+  public Entry getEntry() {
+    return entry;
+  }
+
+  public void setEntry(Entry entry) {
+    this.entry = entry;
+  }
+
+  public Set<Peer> getWeaklyAcceptedNodes() {
+    return weaklyAcceptedNodes != null ? weaklyAcceptedNodes : Collections.emptySet();
+  }
+
+  public void addWeaklyAcceptedNodes(Peer node) {
+    weaklyAcceptedNodes.add(node);
+  }
+
+  @Override
+  public String toString() {
+    return entry.toString();
+  }
+
+  public AppendEntryRequest getAppendEntryRequest() {
+    return appendEntryRequest;
+  }
+
+  public void setAppendEntryRequest(AppendEntryRequest appendEntryRequest) {
+    this.appendEntryRequest = appendEntryRequest;
+  }
+
+  public Future<ByteBuffer> getSerializedLogFuture() {
+    return serializedLogFuture;
+  }
+
+  public void setSerializedLogFuture(Future<ByteBuffer> serializedLogFuture) {
+    this.serializedLogFuture = serializedLogFuture;
+  }
+
+  public int currNodesQuorumNum() {
+    return currNodes.size() / 2 + 1;
+  }
+
+  public int newNodesQuorumNum() {
+    return newNodes != null ? newNodes.size() / 2 + 1 : 0;
+  }
+
+  public boolean isStronglyAccepted(Map<Peer, Long> stronglyAcceptedIndices) {
+    if (isStronglyAccepted) {
+      return true;
+    }
+    int currNodeQuorumNum = currNodesQuorumNum();
+    int newNodeQuorumNum = newNodesQuorumNum();
+    boolean stronglyAcceptedByCurrNodes =
+        stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices) >= currNodeQuorumNum;
+    boolean stronglyAcceptedByNewNodes =
+        stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices) >= newNodeQuorumNum;
+    if (stronglyAcceptedByCurrNodes && stronglyAcceptedByNewNodes) {
+      isStronglyAccepted = true;
+    }
+    return stronglyAcceptedByCurrNodes && stronglyAcceptedByNewNodes;
+  }
+
+  public int stronglyAcceptedNumByCurrNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+    int num = 0;
+    for (Peer node : currNodes) {
+      if (stronglyAcceptedIndices.getOrDefault(node, -1L) >= entry.getCurrLogIndex()) {
+        num++;
+      }
+    }
+    return num;
+  }
+
+  public int stronglyAcceptedNumByNewNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+    if (!hasNewNodes()) {
+      return 0;
+    }
+    int num = 0;
+    for (Peer node : newNodes) {
+      if (stronglyAcceptedIndices.getOrDefault(node, -1L) >= entry.getCurrLogIndex()) {
+        num++;
+      }
+    }
+    return num;
+  }
+
+  public int weaklyAcceptedNumByCurrNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+    int num = 0;
+    for (Peer node : currNodes) {
+      if (weaklyAcceptedNodes.contains(node)
+          && stronglyAcceptedIndices.getOrDefault(node, -1L) < entry.getCurrLogIndex()) {
+        num++;
+      }
+    }
+    return num;
+  }
+
+  public int weaklyAcceptedNumByNewNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+    if (!hasNewNodes()) {
+      return 0;
+    }
+    int num = 0;
+    for (Peer node : currNodes) {
+      if (weaklyAcceptedNodes.contains(node)
+          && stronglyAcceptedIndices.getOrDefault(node, -1L) < entry.getCurrLogIndex()) {
+        num++;
+      }
+    }
+    return num;
+  }
+
+  public boolean hasNewNodes() {
+    return newNodes != null;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingLog.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingLog.java
deleted file mode 100644
index 45022cd328..0000000000
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingLog.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.natraft.protocol.log;
-
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-public class VotingLog {
-
-  protected Entry entry;
-  // for NB-Raft
-  protected Set<Peer> weaklyAcceptedNodes;
-  protected Set<Peer> failedNodes;
-  private boolean hasFailed;
-  private AppendEntryRequest appendEntryRequest;
-  private Future<ByteBuffer> serializedLogFuture;
-  private int quorumSize;
-
-  public VotingLog(
-      Entry entry,
-      int groupSize,
-      AppendEntryRequest appendEntryRequest,
-      int quorumSize,
-      RaftConfig config) {
-    this.entry = entry;
-    failedNodes = new HashSet<>(groupSize);
-    if (config.isUseFollowerSlidingWindow()) {
-      weaklyAcceptedNodes = new HashSet<>(groupSize);
-    }
-    this.setAppendEntryRequest(appendEntryRequest);
-    this.setQuorumSize(quorumSize);
-  }
-
-  public VotingLog(VotingLog another) {
-    this.entry = another.entry;
-    this.weaklyAcceptedNodes = another.weaklyAcceptedNodes;
-    this.failedNodes = another.failedNodes;
-    this.setAppendEntryRequest(another.appendEntryRequest);
-    this.setQuorumSize(another.quorumSize);
-    this.setSerializedLogFuture(another.getSerializedLogFuture());
-  }
-
-  public Entry getEntry() {
-    return entry;
-  }
-
-  public void setEntry(Entry entry) {
-    this.entry = entry;
-  }
-
-  public Set<Peer> getWeaklyAcceptedNodes() {
-    return weaklyAcceptedNodes != null ? weaklyAcceptedNodes : Collections.emptySet();
-  }
-
-  @Override
-  public String toString() {
-    return entry.toString();
-  }
-
-  public Set<Peer> getFailedNodes() {
-    return failedNodes;
-  }
-
-  public boolean isHasFailed() {
-    return hasFailed;
-  }
-
-  public void setHasFailed(boolean hasFailed) {
-    this.hasFailed = hasFailed;
-  }
-
-  public AppendEntryRequest getAppendEntryRequest() {
-    return appendEntryRequest;
-  }
-
-  public void setAppendEntryRequest(AppendEntryRequest appendEntryRequest) {
-    this.appendEntryRequest = appendEntryRequest;
-  }
-
-  public Future<ByteBuffer> getSerializedLogFuture() {
-    return serializedLogFuture;
-  }
-
-  public void setSerializedLogFuture(Future<ByteBuffer> serializedLogFuture) {
-    this.serializedLogFuture = serializedLogFuture;
-  }
-
-  public int getQuorumSize() {
-    return quorumSize;
-  }
-
-  public void setQuorumSize(int quorumSize) {
-    this.quorumSize = quorumSize;
-  }
-}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index d00f70c43d..2b5d92d390 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
 import org.apache.thrift.TApplicationException;
@@ -48,7 +48,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
 
   protected RaftMember member;
-  protected VotingLog log;
+  protected VotingEntry log;
   protected Peer directReceiver;
   protected int quorumSize;
 
@@ -56,10 +56,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
 
   @Override
   public void onComplete(AppendEntryResult response) {
-    if (log.isHasFailed()) {
-      return;
-    }
-
     Peer trueReceiver =
         response.isSetReceiver()
             ? new Peer(
@@ -77,7 +73,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       member
           .getVotingLogList()
           .onStronglyAccept(
-              log.getEntry().getCurrLogIndex(), log.getEntry().getCurrLogTerm(), trueReceiver);
+              log, trueReceiver);
 
       member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
     } else if (resp > 0) {
@@ -95,7 +91,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (log) {
-        log.getWeaklyAcceptedNodes().add(trueReceiver);
+        log.addWeaklyAcceptedNodes(trueReceiver);
         log.notifyAll();
       }
     } else {
@@ -114,7 +110,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
             log,
             trueReceiver,
             resp);
-        onFail(trueReceiver);
       }
     }
     // rejected because the receiver's logs are stale or the receiver has no cluster info, just
@@ -140,23 +135,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       logger.warn(
           "{}: Cannot append log {} to {}", member.getName(), log, directReceiver, exception);
     }
-    onFail(directReceiver);
   }
 
-  private void onFail(Peer trueReceiver) {
-    synchronized (log.getEntry()) {
-      log.getFailedNodes().add(trueReceiver);
-      if (log.getFailedNodes().size() > quorumSize) {
-        // quorum members have failed, there is no need to wait for others
-        logger.warn(
-            "{} failed because too many replicas have failed: {}", log, log.getFailedNodes());
-        log.setHasFailed(true);
-        log.getEntry().notifyAll();
-      }
-    }
-  }
 
-  public void setLog(VotingLog log) {
+  public void setLog(VotingEntry log) {
     this.log = log;
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 6b3787db73..f7f5c7ba10 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -27,7 +30,7 @@ import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
 import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
@@ -62,7 +65,9 @@ public class LogDispatcher {
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
   protected RaftMember member;
   private RaftConfig config;
-  protected Map<Peer, BlockingQueue<VotingLog>> nodesLogQueuesMap = new HashMap<>();
+  protected List<Peer> allNodes;
+  protected List<Peer> newNodes;
+  protected Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = new HashMap<>();
   protected Map<Peer, Boolean> nodesEnabled;
   protected Map<Peer, RateLimiter> nodesRateLimiter = new HashMap<>();
   protected Map<Peer, Double> nodesRate = new HashMap<>();
@@ -82,7 +87,9 @@ public class LogDispatcher {
     if (!queueOrdered) {
       maxBatchSize = 1;
     }
-    createQueueAndBindingThreads();
+    this.allNodes = member.getAllNodes();
+    this.newNodes = member.getNewNodes();
+    createQueueAndBindingThreads(unionNodes());
   }
 
   public void updateRateLimiter() {
@@ -92,33 +99,51 @@ public class LogDispatcher {
     }
   }
 
-  void createQueueAndBindingThreads() {
-    for (Peer node : member.getAllNodes()) {
-      if (!node.equals(member.getThisNode())) {
-        BlockingQueue<VotingLog> logBlockingQueue;
-        logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
-        nodesLogQueuesMap.put(node, logBlockingQueue);
-        nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
-      }
+  private Collection<Peer> unionNodes() {
+    if (newNodes == null) {
+      return allNodes;
     }
-    updateRateLimiter();
+    Set<Peer> nodeUnion = new HashSet<>();
+    nodeUnion.addAll(allNodes);
+    nodeUnion.addAll(newNodes);
+    return nodeUnion;
+  }
+
+
+  void createQueue(Peer node) {
+    BlockingQueue<VotingEntry> logBlockingQueue;
+    logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
+    nodesLogQueuesMap.put(node, logBlockingQueue);
+    nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
 
     for (int i = 0; i < bindingThreadNum; i++) {
-      for (Entry<Peer, BlockingQueue<VotingLog>> pair : nodesLogQueuesMap.entrySet()) {
-        executorServices
-            .computeIfAbsent(
-                pair.getKey(),
-                n ->
-                    IoTDBThreadPoolFactory.newCachedThreadPool(
-                        "LogDispatcher-"
-                            + member.getName()
-                            + "-"
-                            + pair.getKey().getEndpoint().getIp()
-                            + "-"
-                            + pair.getKey().getEndpoint().getPort()))
-            .submit(newDispatcherThread(pair.getKey(), pair.getValue()));
+      executorServices
+          .computeIfAbsent(
+              node,
+              n -> createPool(node))
+          .submit(newDispatcherThread(node, logBlockingQueue));
+    }
+  }
+
+  ExecutorService createPool(Peer node) {
+    return IoTDBThreadPoolFactory.newCachedThreadPool(
+        "LogDispatcher-"
+            + member.getName()
+            + "-"
+            + node.getEndpoint().getIp()
+            + "-"
+            + node.getEndpoint().getPort()
+            + "-"
+            + node.getNodeId());
+  }
+
+  void createQueueAndBindingThreads(Collection<Peer> peers) {
+    for (Peer node : peers) {
+      if (!node.equals(member.getThisNode())) {
+       createQueue(node);
       }
     }
+    updateRateLimiter();
   }
 
   @TestOnly
@@ -134,18 +159,18 @@ public class LogDispatcher {
     resultHandlerThread.shutdownNow();
   }
 
-  protected boolean addToQueue(BlockingQueue<VotingLog> nodeLogQueue, VotingLog request) {
+  protected boolean addToQueue(BlockingQueue<VotingEntry> nodeLogQueue, VotingEntry request) {
     return nodeLogQueue.add(request);
   }
 
-  public void offer(VotingLog request) {
+  public void offer(VotingEntry request) {
 
-    for (Entry<Peer, BlockingQueue<VotingLog>> entry : nodesLogQueuesMap.entrySet()) {
+    for (Entry<Peer, BlockingQueue<VotingEntry>> entry : nodesLogQueuesMap.entrySet()) {
       if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.getKey(), false)) {
         continue;
       }
 
-      BlockingQueue<VotingLog> nodeLogQueue = entry.getValue();
+      BlockingQueue<VotingEntry> nodeLogQueue = entry.getValue();
       try {
         boolean addSucceeded = addToQueue(nodeLogQueue, request);
 
@@ -164,18 +189,18 @@ public class LogDispatcher {
     }
   }
 
-  DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingLog> logBlockingQueue) {
+  DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue) {
     return new DispatcherThread(node, logBlockingQueue);
   }
 
   protected class DispatcherThread implements Runnable {
 
     Peer receiver;
-    private final BlockingQueue<VotingLog> logBlockingDeque;
-    protected List<VotingLog> currBatch = new ArrayList<>();
+    private final BlockingQueue<VotingEntry> logBlockingDeque;
+    protected List<VotingEntry> currBatch = new ArrayList<>();
     private final String baseName;
 
-    protected DispatcherThread(Peer receiver, BlockingQueue<VotingLog> logBlockingDeque) {
+    protected DispatcherThread(Peer receiver, BlockingQueue<VotingEntry> logBlockingDeque) {
       this.receiver = receiver;
       this.logBlockingDeque = logBlockingDeque;
       baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
@@ -189,7 +214,7 @@ public class LogDispatcher {
       try {
         while (!Thread.interrupted()) {
           synchronized (logBlockingDeque) {
-            VotingLog poll = logBlockingDeque.take();
+            VotingEntry poll = logBlockingDeque.take();
             currBatch.add(poll);
             if (maxBatchSize > 1) {
               while (!logBlockingDeque.isEmpty() && currBatch.size() < maxBatchSize) {
@@ -216,7 +241,7 @@ public class LogDispatcher {
     }
 
     protected void serializeEntries() throws InterruptedException {
-      for (VotingLog request : currBatch) {
+      for (VotingEntry request : currBatch) {
 
         request.getAppendEntryRequest().entry = request.getEntry().serialize();
         request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
@@ -224,7 +249,7 @@ public class LogDispatcher {
     }
 
     private void appendEntriesAsync(
-        List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingLog> currBatch) {
+        List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingEntry> currBatch) {
       AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
       AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
       try {
@@ -240,7 +265,7 @@ public class LogDispatcher {
     }
 
     protected AppendEntriesRequest prepareRequest(
-        List<ByteBuffer> logList, List<VotingLog> currBatch, int firstIndex) {
+        List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
       AppendEntriesRequest request = new AppendEntriesRequest();
 
       request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -261,7 +286,7 @@ public class LogDispatcher {
       return request;
     }
 
-    private void sendLogs(List<VotingLog> currBatch) throws TException {
+    private void sendLogs(List<VotingEntry> currBatch) throws TException {
       int logIndex = 0;
       logger.debug(
           "send logs from index {} to {}",
@@ -293,7 +318,7 @@ public class LogDispatcher {
     }
 
     public AppendNodeEntryHandler getAppendNodeEntryHandler(
-        VotingLog log, Peer node, int quorumSize) {
+        VotingEntry log, Peer node, int quorumSize) {
       AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
       handler.setDirectReceiver(node);
       handler.setLog(log);
@@ -306,9 +331,9 @@ public class LogDispatcher {
 
       private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
 
-      private AppendEntriesHandler(List<VotingLog> batch) {
+      private AppendEntriesHandler(List<VotingEntry> batch) {
         singleEntryHandlers = new ArrayList<>(batch.size());
-        for (VotingLog sendLogRequest : batch) {
+        for (VotingEntry sendLogRequest : batch) {
           AppendNodeEntryHandler handler =
               getAppendNodeEntryHandler(sendLogRequest, receiver, sendLogRequest.getQuorumSize());
           singleEntryHandlers.add(handler);
@@ -335,7 +360,16 @@ public class LogDispatcher {
     return nodesRate;
   }
 
-  public Map<Peer, BlockingQueue<VotingLog>> getNodesLogQueuesMap() {
+  public Map<Peer, BlockingQueue<VotingEntry>> getNodesLogQueuesMap() {
     return nodesLogQueuesMap;
   }
+
+  public void setNewNodes(List<Peer> newNodes) {
+    this.newNodes = newNodes;
+    for (Peer newNode : newNodes) {
+      if (!allNodes.contains(newNode)) {
+        createQueue(newNode);
+      }
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 40533616d5..7a097e1f4d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -19,39 +19,34 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class VotingLogList {
 
   private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
-  private int quorumSize;
   private RaftMember member;
   private Map<Peer, Long> stronglyAcceptedIndices = new ConcurrentHashMap<>();
   private AtomicLong newCommitIndex = new AtomicLong(-1);
+  private boolean enableWeakAcceptance = false;
 
-  public VotingLogList(int quorumSize, RaftMember member) {
-    this.quorumSize = quorumSize;
+  public VotingLogList(RaftMember member) {
     this.member = member;
+    stronglyAcceptedIndices.put(member.getThisNode(), Long.MAX_VALUE);
   }
 
-  private boolean tryCommit() {
+  private boolean tryCommit(VotingEntry entry) {
     RaftLogManager logManager = member.getLogManager();
 
-    if (computeNewCommitIndex()
+    if (computeNewCommitIndex(entry)
         && logManager != null
         && newCommitIndex.get() > logManager.getCommitLogIndex()) {
       try {
@@ -65,15 +60,16 @@ public class VotingLogList {
     }
   }
 
-  public boolean computeNewCommitIndex() {
-    List<Entry<Peer, Long>> nodeIndices = new ArrayList<>(stronglyAcceptedIndices.entrySet());
-    if (nodeIndices.size() < quorumSize) {
+  public boolean computeNewCommitIndex(VotingEntry entry) {
+    long currLogIndex = entry.getEntry().getCurrLogIndex();
+    if (newCommitIndex.get() >= currLogIndex) {
+      return false;
+    }
+    if (entry.isStronglyAccepted(stronglyAcceptedIndices)) {
+      return currLogIndex > newCommitIndex.getAndUpdate(ov -> Math.max(ov, currLogIndex));
+    } else {
       return false;
     }
-    nodeIndices.sort(Entry.comparingByValue());
-    Long value = nodeIndices.get(nodeIndices.size() - quorumSize).getValue();
-    long oldValue = newCommitIndex.getAndUpdate(oldV -> Math.max(value, oldV));
-    return value > oldValue;
   }
 
   /**
@@ -81,41 +77,28 @@ public class VotingLogList {
    * all entries whose index <= the accepted entry. If any entry is accepted by a quorum, remove it
    * from the list.
    *
-   * @param index
-   * @param term
-   * @param acceptingNode
    * @return the lastly removed entry if any.
    */
-  public void onStronglyAccept(long index, long term, Peer acceptingNode) {
-    logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNode);
+  public void onStronglyAccept(VotingEntry entry, Peer acceptingNode) {
+    logger.debug("{} is strongly accepted by {}", entry, acceptingNode);
+    long currLogIndex = entry.getEntry().getCurrLogIndex();
 
     Long newIndex =
         stronglyAcceptedIndices.compute(
             acceptingNode,
             (nid, oldIndex) -> {
               if (oldIndex == null) {
-                return index;
+                return currLogIndex;
               } else {
-                if (index > oldIndex) {
-                  return index;
+                if (currLogIndex > oldIndex) {
+                  return currLogIndex;
                 }
                 return oldIndex;
               }
             });
-    if (newIndex == index) {
-      tryCommit();
-    }
-  }
-
-  public int totalAcceptedNodeNum(VotingLog log) {
-    long index = log.getEntry().getCurrLogIndex();
-    int num = log.getWeaklyAcceptedNodes().size();
-    for (Entry<Peer, Long> entry : stronglyAcceptedIndices.entrySet()) {
-      if (entry.getValue() >= index) {
-        num++;
-      }
+    if (newIndex == currLogIndex) {
+      tryCommit(entry);
     }
-    return num;
   }
 
   public String report() {
@@ -123,4 +106,43 @@ public class VotingLogList {
         "Nodes accepted indices: %s, new commitIndex: %d",
         stronglyAcceptedIndices, newCommitIndex.get());
   }
+
+  public AcceptedType computeAcceptedType(VotingEntry votingEntry) {
+    if ((votingEntry.getEntry().getCurrLogIndex() == Long.MIN_VALUE)) {
+      return AcceptedType.NOT_ACCEPTED;
+    }
+
+    if (newCommitIndex.get() >= votingEntry.getEntry().getCurrLogIndex()) {
+      return AcceptedType.STRONGLY_ACCEPTED;
+    }
+
+    if (enableWeakAcceptance) {
+      int currNodeQuorumNum = votingEntry.currNodesQuorumNum();
+      int newNodeQuorumNum = votingEntry.newNodesQuorumNum();
+      int stronglyAcceptedNumByCurrNodes = votingEntry.stronglyAcceptedNumByCurrNodes(
+          stronglyAcceptedIndices);
+      int stronglyAcceptedNumByNewNodes = votingEntry.stronglyAcceptedNumByNewNodes(
+          stronglyAcceptedIndices);
+      int weaklyAcceptedNumByCurrNodes = votingEntry.weaklyAcceptedNumByCurrNodes(
+          stronglyAcceptedIndices);
+      int weaklyAcceptedNumByNewNodes = votingEntry.weaklyAcceptedNumByNewNodes(
+          stronglyAcceptedIndices);
+      if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= currNodeQuorumNum &&
+          (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= newNodeQuorumNum) {
+        return AcceptedType.WEAKLY_ACCEPTED;
+      }
+    }
+
+    return AcceptedType.NOT_ACCEPTED;
+  }
+
+  public void setEnableWeakAcceptance(boolean enableWeakAcceptance) {
+    this.enableWeakAcceptance = enableWeakAcceptance;
+  }
+
+  public enum AcceptedType {
+    NOT_ACCEPTED,
+    STRONGLY_ACCEPTED,
+    WEAKLY_ACCEPTED
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 14a1b20eb8..3c838f81b7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
 
 import org.slf4j.Logger;
@@ -88,7 +88,7 @@ public class FlowBalancer {
     double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
     double assumedFlow = thisNodeFlow * overestimateFactor;
     logger.info("Flow of this node: {}", thisNodeFlow);
-    Map<Peer, BlockingQueue<VotingLog>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
+    Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
     Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
 
     // sort followers according to their queue length
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
new file mode 100644
index 0000000000..8f63f3142f
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log.logtype;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+
+public class ConfigChangeEntry extends Entry {
+
+  private List<Peer> oldPeers;
+  private List<Peer> newPeers;
+
+  public ConfigChangeEntry(List<Peer> oldPeers, List<Peer> newPeers) {
+    this.oldPeers = oldPeers;
+    this.newPeers = newPeers;
+  }
+
+  @Override
+  public ByteBuffer serialize() {
+    ByteArrayOutputStream byteArrayOutputStream =
+        new ByteArrayOutputStream(getDefaultSerializationBufferSize());
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
+      dataOutputStream.writeLong(getCurrLogIndex());
+      dataOutputStream.writeLong(getCurrLogTerm());
+
+      dataOutputStream.writeInt(oldPeers.size());
+      for (Peer oldPeer : oldPeers) {
+        oldPeer.serialize(dataOutputStream);
+      }
+      dataOutputStream.writeInt(newPeers.size());
+      for (Peer newPeer : newPeers) {
+        newPeer.serialize(dataOutputStream);
+      }
+    } catch (IOException e) {
+      // unreachable
+    }
+    return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    setCurrLogIndex(buffer.getLong());
+    setCurrLogTerm(buffer.getLong());
+
+    int size = buffer.getInt();
+    oldPeers = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      oldPeers.add(Peer.deserialize(buffer));
+    }
+    size = buffer.getInt();
+    newPeers = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      newPeers.add(Peer.deserialize(buffer));
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java
index 328b74099a..1eb79d1789 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java
@@ -20,8 +20,7 @@
 package org.apache.iotdb.consensus.natraft.protocol.log.sequencing;
 
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
-import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 
 /**
  * LogSequencer assigns a unique index and associated term to a log entry and offers the entry to a
@@ -36,9 +35,7 @@ public interface LogSequencer {
    * @param e a log entry that is not yet indexed.
    * @return A SendLogRequest through which the caller can monitor the status of the sending entry.
    */
-  VotingLog sequence(Entry e);
-
-  void setLogManager(RaftLogManager logManager);
+  VotingEntry sequence(Entry e);
 
   void close();
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
index 62c73f6955..ed65209947 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
@@ -25,5 +25,5 @@ import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
 
 public interface LogSequencerFactory {
 
-  LogSequencer create(RaftMember member, RaftLogManager logManager, RaftConfig config);
+  LogSequencer create(RaftMember member, RaftConfig config);
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index ab85f0e725..aaba4ae2c6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -22,17 +22,15 @@ package org.apache.iotdb.consensus.natraft.protocol.log.sequencing;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
@@ -42,35 +40,24 @@ public class SynchronousSequencer implements LogSequencer {
 
   private static final Logger logger = LoggerFactory.getLogger(SynchronousSequencer.class);
   private RaftMember member;
-  private RaftLogManager logManager;
   private RaftConfig config;
 
-  public SynchronousSequencer(RaftMember member, RaftLogManager logManager, RaftConfig config) {
+  public SynchronousSequencer(RaftMember member, RaftConfig config) {
     this.member = member;
-    this.logManager = logManager;
     this.config = config;
   }
 
-  private VotingLog enqueueEntry(VotingLog sendLogRequest) {
 
-    if (member.getAllNodes().size() > 1) {
-      member.getLogDispatcher().offer(sendLogRequest);
-    }
-    return sendLogRequest;
-  }
-
-  private static AtomicLong indexBlockCounter = new AtomicLong();
 
   @Override
-  public VotingLog sequence(Entry e) {
-    VotingLog sendLogRequest = null;
+  public VotingEntry sequence(Entry e) {
+    VotingEntry votingEntry = null;
 
     long startWaitingTime = System.currentTimeMillis();
-
+    RaftLogManager logManager = member.getLogManager();
     while (true) {
       try {
         logManager.getLock().writeLock().lock();
-        indexBlockCounter.decrementAndGet();
         Entry lastEntry = logManager.getLastEntry();
         long lastIndex = lastEntry.getCurrLogIndex();
         long lastTerm = lastEntry.getCurrLogTerm();
@@ -85,10 +72,10 @@ public class SynchronousSequencer implements LogSequencer {
           // logDispatcher will serialize log, and set log size, and we will use the size after it
           logManager.append(Collections.singletonList(e));
 
-          sendLogRequest = buildSendLogRequest(e);
+          votingEntry = LogUtils.buildVotingLog(e, member);
 
           if (!(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance())) {
-            sendLogRequest = enqueueEntry(sendLogRequest);
+            votingEntry = LogUtils.enqueueEntry(votingEntry, member);
           }
           break;
         }
@@ -108,70 +95,18 @@ public class SynchronousSequencer implements LogSequencer {
     }
 
     if (config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance()) {
-      sendLogRequest = enqueueEntry(sendLogRequest);
+      votingEntry = LogUtils.enqueueEntry(votingEntry, member);
     }
 
-    return sendLogRequest;
-  }
-
-  @Override
-  public void setLogManager(RaftLogManager logManager) {
-    this.logManager = logManager;
+    return votingEntry;
   }
 
-  private VotingLog buildSendLogRequest(Entry e) {
-    VotingLog votingLog = member.buildVotingLog(e);
-
-    AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(e, false);
-    votingLog.setAppendEntryRequest(appendEntryRequest);
-
-    return votingLog;
-  }
-
-  public AppendEntryRequest buildAppendEntryRequest(Entry e, boolean serializeNow) {
-    AppendEntryRequest request = buildAppendEntryRequestBasic(e, serializeNow);
-    request = buildAppendEntryRequestExtended(request, e, serializeNow);
-    return request;
-  }
-
-  protected AppendEntryRequest buildAppendEntryRequestBasic(Entry entry, boolean serializeNow) {
-    AppendEntryRequest request = new AppendEntryRequest();
-    request.setTerm(member.getStatus().getTerm().get());
-    if (serializeNow) {
-      ByteBuffer byteBuffer = entry.serialize();
-      entry.setByteSize(byteBuffer.array().length);
-      request.entry = byteBuffer;
-    }
-    try {
-      if (entry.getPrevTerm() != -1) {
-        request.setPrevLogTerm(entry.getPrevTerm());
-      } else {
-        request.setPrevLogTerm(logManager.getTerm(entry.getCurrLogIndex() - 1));
-      }
-    } catch (Exception e) {
-      logger.error("getTerm failed for newly append entries", e);
-    }
-    request.setLeader(member.getThisNode().getEndpoint());
-    request.setLeaderId(member.getThisNode().getNodeId());
-    // don't need lock because even if it's larger than the commitIndex when appending this log to
-    // logManager, the follower can handle the larger commitIndex with no effect
-    request.setLeaderCommit(logManager.getCommitLogIndex());
-    request.setPrevLogIndex(entry.getCurrLogIndex() - 1);
-    request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
-
-    return request;
-  }
-
-  protected AppendEntryRequest buildAppendEntryRequestExtended(
-      AppendEntryRequest request, Entry e, boolean serializeNow) {
-    return request;
-  }
 
   public static class Factory implements LogSequencerFactory {
 
     @Override
-    public LogSequencer create(RaftMember member, RaftLogManager logManager, RaftConfig config) {
-      return new SynchronousSequencer(member, logManager, config);
+    public LogSequencer create(RaftMember member, RaftConfig config) {
+      return new SynchronousSequencer(member, config);
     }
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
new file mode 100644
index 0000000000..d9f137277f
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.consensus.natraft.utils;
+
+import java.nio.ByteBuffer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(LogUtils.class);
+
+  public static VotingEntry buildVotingLog(Entry e, RaftMember member) {
+    VotingEntry votingEntry = member.buildVotingLog(e);
+
+    AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(e, false, member);
+    votingEntry.setAppendEntryRequest(appendEntryRequest);
+
+    return votingEntry;
+  }
+
+  public static AppendEntryRequest buildAppendEntryRequest(Entry entry, boolean serializeNow,
+      RaftMember member) {
+    AppendEntryRequest request = new AppendEntryRequest();
+    request.setTerm(member.getStatus().getTerm().get());
+    if (serializeNow) {
+      ByteBuffer byteBuffer = entry.serialize();
+      entry.setByteSize(byteBuffer.array().length);
+      request.entry = byteBuffer;
+    }
+    try {
+      if (entry.getPrevTerm() != -1) {
+        request.setPrevLogTerm(entry.getPrevTerm());
+      } else {
+        request.setPrevLogTerm(member.getLogManager().getTerm(entry.getCurrLogIndex() - 1));
+      }
+    } catch (Exception e) {
+      logger.error("getTerm failed for newly append entries", e);
+    }
+    request.setLeader(member.getThisNode().getEndpoint());
+    request.setLeaderId(member.getThisNode().getNodeId());
+    // don't need lock because even if it's larger than the commitIndex when appending this log to
+    // logManager, the follower can handle the larger commitIndex with no effect
+    request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
+    request.setPrevLogIndex(entry.getCurrLogIndex() - 1);
+    request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+
+    return request;
+  }
+
+  public static VotingEntry enqueueEntry(VotingEntry sendLogRequest, RaftMember member) {
+    if (member.getAllNodes().size() > 1) {
+      member.getLogDispatcher().offer(sendLogRequest);
+    }
+    return sendLogRequest;
+  }
+
+
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
new file mode 100644
index 0000000000..229a9bc81e
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.consensus.natraft.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class NodeUtils {
+
+  public static List<Peer> computeAddedNodes(List<Peer> oldNodes, List<Peer> newNodes) {
+    List<Peer> addedNode = new ArrayList<>();
+    for (Peer newNode : newNodes) {
+      if (!oldNodes.contains(newNode)) {
+        addedNode.add(newNode);
+      }
+    }
+    return addedNode;
+  }
+}