You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/02 09:07:43 UTC

[iotdb] branch native_raft updated: update election

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 2a506c5570 update election
2a506c5570 is described below

commit 2a506c55708848ebd7d40ab2ded49b9222238ec9
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Mar 2 17:09:11 2023 +0800

    update election
---
 .../iotdb/consensus/natraft/RaftConsensus.java     | 48 +++++++----
 .../consensus/natraft/protocol/RaftMember.java     | 20 +++--
 .../protocol/heartbeat/ElectionRespHandler.java    | 50 +++---------
 .../natraft/protocol/heartbeat/ElectionState.java  | 81 ++++++++++++++++++
 .../protocol/heartbeat/HeartbeatThread.java        | 95 +++++++++-------------
 .../protocol/log/appender/BlockingLogAppender.java | 72 +++++++++++++---
 .../natraft/protocol/log/catchup/CatchUpTask.java  |  2 +-
 .../protocol/log/dispatch/LogDispatcher.java       | 14 +---
 .../protocol/log/logtype/ConfigChangeEntry.java    |  8 ++
 .../manager/DirectorySnapshotRaftLogManager.java   |  8 +-
 .../protocol/log/manager/RaftLogManager.java       |  3 +-
 .../protocol/log/snapshot/DirectorySnapshot.java   | 48 +++++------
 .../natraft/protocol/log/snapshot/Snapshot.java    | 39 +++++++++
 .../natraft/service/RaftRPCServiceProcessor.java   | 48 ++++++++++-
 .../iotdb/consensus/natraft/utils/NodeUtils.java   | 13 +++
 15 files changed, 376 insertions(+), 173 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 416bf1dca3..3aad5abedc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -179,20 +179,9 @@ public class RaftConsensus implements IConsensus {
     }
   }
 
-  @Override
-  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
-    int consensusGroupSize = peers.size();
-    if (consensusGroupSize == 0) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new IllegalPeerNumException(consensusGroupSize))
-          .build();
-    }
-    Peer thisPeer = new Peer(groupId, thisNodeId, thisNode);
-    if (!peers.contains(thisPeer)) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new IllegalPeerEndpointException(thisNode, peers))
-          .build();
-    }
+
+  public boolean createNewMemberIfAbsent(ConsensusGroupId groupId, Peer thisPeer,
+      List<Peer> peers, List<Peer> newPeers) {
     AtomicBoolean exist = new AtomicBoolean(true);
     stateMachineMap.computeIfAbsent(
         groupId,
@@ -205,11 +194,29 @@ public class RaftConsensus implements IConsensus {
           }
           RaftMember impl =
               new RaftMember(
-                  path, config, thisPeer, peers, null, groupId, registry.apply(groupId), clientManager);
+                  path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId), clientManager);
           impl.start();
           return impl;
         });
-    if (exist.get()) {
+    return !exist.get();
+  }
+
+  @Override
+  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
+    int consensusGroupSize = peers.size();
+    if (consensusGroupSize == 0) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new IllegalPeerNumException(consensusGroupSize))
+          .build();
+    }
+    Peer thisPeer = new Peer(groupId, thisNodeId, thisNode);
+    if (!peers.contains(thisPeer)) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new IllegalPeerEndpointException(thisNode, peers))
+          .build();
+    }
+
+    if (!createNewMemberIfAbsent(groupId, thisPeer, peers, null)) {
       return ConsensusGenericResponse.newBuilder()
           .setException(new ConsensusGroupAlreadyExistException(groupId))
           .build();
@@ -297,4 +304,13 @@ public class RaftConsensus implements IConsensus {
   public RaftMember getMember(ConsensusGroupId groupId) {
     return stateMachineMap.get(groupId);
   }
+
+
+  public int getThisNodeId() {
+    return thisNodeId;
+  }
+
+  public TEndPoint getThisNode() {
+    return thisNode;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index b6fe3de5e5..02e14a5ce0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -144,7 +144,7 @@ public class RaftMember {
   /**
    * the nodes that belong to the same raft group as thisNode.
    */
-  protected List<Peer> allNodes;
+  protected volatile List<Peer> allNodes;
   protected volatile List<Peer> newNodes;
 
   protected ConsensusGroupId groupId;
@@ -1015,7 +1015,7 @@ public class RaftMember {
 
     DirectorySnapshot directorySnapshot;
     try {
-      directorySnapshot = new DirectorySnapshot(null, null);
+      directorySnapshot = new DirectorySnapshot();
       directorySnapshot.deserialize(snapshotBytes);
       directorySnapshot.setSource(source);
       directorySnapshot.setMemberName(name);
@@ -1140,6 +1140,11 @@ public class RaftMember {
     return newNodes;
   }
 
+  public void setNewNodes(List<Peer> newNodes) {
+    logDispatcher.setNewNodes(this.newNodes);
+    this.newNodes = newNodes;
+  }
+
   public AsyncRaftServiceClient getHeartbeatClient(TEndPoint node) {
     try {
       return clientManager.borrowClient(node);
@@ -1158,7 +1163,7 @@ public class RaftMember {
     }
   }
 
-  public TSStatus changeConfig(List<Peer> newConfig) {
+  public TSStatus changeConfig(List<Peer> newNodes) {
     TSStatus tsStatus = ensureLeader(null);
     if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return tsStatus;
@@ -1168,11 +1173,11 @@ public class RaftMember {
     VotingEntry votingEntry;
     try {
       logManager.getLock().writeLock().lock();
-      if (newNodes != null) {
+      if (this.newNodes != null) {
         return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode()).setMessage(
             "Last configuration change in progress");
       }
-      ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newConfig);
+      ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newNodes);
       Entry lastEntry = logManager.getLastEntry();
       long lastIndex = lastEntry.getCurrLogIndex();
       long lastTerm = lastEntry.getCurrLogTerm();
@@ -1184,15 +1189,14 @@ public class RaftMember {
       logManager.append(Collections.singletonList(e));
       votingEntry = LogUtils.buildVotingLog(e, this);
 
-      logDispatcher.setNewNodes(newNodes);
-      this.newNodes = newNodes;
+      setNewNodes(newNodes);
 
       logDispatcher.offer(votingEntry);
     } finally {
       logManager.getLock().writeLock().unlock();
     }
 
-    List<Peer> addedNodes = NodeUtils.computeAddedNodes(oldNodes, newNodes);
+    List<Peer> addedNodes = NodeUtils.computeAddedNodes(oldNodes, this.newNodes);
     for (Peer addedNode : addedNodes) {
       catchUp(addedNode, 0);
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
index 52192083ef..301cb19ad4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
@@ -47,59 +47,36 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
   private String memberName;
   private Peer voter;
   private long currTerm;
-  private AtomicInteger requiredVoteNum;
-  private AtomicBoolean terminated;
-  // when set to true, the elector wins the election
-  private AtomicBoolean electionValid;
-  private AtomicInteger failingVoteCounter;
+  private ElectionState electionState;
 
   public ElectionRespHandler(
       RaftMember raftMember,
       Peer voter,
       long currTerm,
-      AtomicInteger requiredVoteNum,
-      AtomicBoolean terminated,
-      AtomicBoolean electionValid,
-      AtomicInteger failingVoteCounter) {
+      ElectionState electionState) {
     this.raftMember = raftMember;
     this.voter = voter;
     this.currTerm = currTerm;
-    this.requiredVoteNum = requiredVoteNum;
-    this.terminated = terminated;
-    this.electionValid = electionValid;
+    this.electionState = electionState;
     this.memberName = raftMember.getName();
-    this.failingVoteCounter = failingVoteCounter;
   }
 
   @Override
   public void onComplete(Long resp) {
     long voterResp = resp;
 
-    if (terminated.get()) {
-      // a voter has rejected this election, which means the term or the log id falls behind
-      // this node is not able to be the leader
-      logger.info(
-          "{}: Terminated election received a election response {} from {}",
-          memberName,
-          voterResp,
-          voter);
+    if (electionState.isAccepted() || electionState.isRejected()) {
       return;
     }
 
     if (voterResp == RESPONSE_AGREE) {
-      long remaining = requiredVoteNum.decrementAndGet();
+      electionState.onAccept(voter);
       logger.info(
-          "{}: Received a grant vote from {}, remaining votes to succeed: {}",
+          "{}: Received a grant vote from {}",
           memberName,
-          voter,
-          remaining);
-      if (remaining == 0) {
+          voter);
+      if (electionState.isAccepted()) {
         // the election is valid
-        electionValid.set(true);
-        terminated.set(true);
-        synchronized (terminated) {
-          terminated.notifyAll();
-        }
         logger.info("{}: Election {} is won", memberName, currTerm);
       }
       // still need more votes
@@ -121,8 +98,7 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
             voterResp);
         raftMember.stepDown(voterResp, null);
         // the election is rejected
-        terminated.set(true);
-        terminated.notifyAll();
+        electionState.setRejected(true);
       }
     }
   }
@@ -141,12 +117,6 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
   }
 
   private void onFail() {
-    int failingVoteRemaining = failingVoteCounter.decrementAndGet();
-    if (failingVoteRemaining <= 0) {
-      synchronized (terminated) {
-        // wake up heartbeat thread to start the next election
-        terminated.notifyAll();
-      }
-    }
+    electionState.onReject(voter);
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
new file mode 100644
index 0000000000..407252fe2c
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class ElectionState {
+  private List<Peer> currNodes;
+  private List<Peer> newNodes;
+  private Set<Peer> acceptedCurrNodes;
+  private Set<Peer> acceptedNewNodes;
+  private Set<Peer> rejectedCurrNodes;
+  private Set<Peer> rejectedNewNodes;
+  private volatile boolean accepted = false;
+  private volatile boolean rejected = false;
+
+  public ElectionState(List<Peer> currNodes, List<Peer> newNodes) {
+    this.currNodes = currNodes;
+    this.newNodes = newNodes;
+    acceptedCurrNodes = new HashSet<>(currNodes.size());
+    rejectedCurrNodes = new HashSet<>(currNodes.size());
+    acceptedNewNodes = newNodes != null ? new HashSet<>(newNodes.size()) : null;
+    rejectedNewNodes = newNodes != null ? new HashSet<>(newNodes.size()) : null;
+  }
+
+  public void onAccept(Peer node) {
+    if (currNodes.contains(node)) {
+      acceptedCurrNodes.add(node);
+    }
+    if (newNodes != null && newNodes.contains(node)) {
+      acceptedNewNodes.add(node);
+    }
+    if (acceptedCurrNodes.size() >= currNodes.size() / 2 &&
+        (newNodes == null || (acceptedNewNodes.size() >= newNodes.size() / 2))) {
+      accepted = true;
+      synchronized (this) {
+        this.notifyAll();
+      }
+    }
+  }
+
+  public void onReject(Peer node) {
+    if (currNodes.contains(node)) {
+      rejectedCurrNodes.add(node);
+    }
+    if (newNodes != null && newNodes.contains(node)) {
+      rejectedNewNodes.add(node);
+    }
+    if (rejectedCurrNodes.size() >= currNodes.size() / 2 + 1 &&
+        (newNodes == null || (rejectedNewNodes.size() >= newNodes.size() / 2 + 1))) {
+      rejected = true;
+      synchronized (this) {
+        this.notifyAll();
+      }
+    }
+  }
+
+  public List<Peer> getCurrNodes() {
+    return currNodes;
+  }
+
+  public List<Peer> getNewNodes() {
+    return newNodes;
+  }
+
+  public boolean isAccepted() {
+    return accepted;
+  }
+
+  public boolean isRejected() {
+    return rejected;
+  }
+
+  public void setRejected(boolean rejected) {
+    this.rejected = rejected;
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index ef64f15e42..5939f1c577 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -19,12 +19,14 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
 
+import java.util.Collections;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
+import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
 import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
 import org.apache.iotdb.consensus.raft.thrift.HeartBeatRequest;
 
@@ -166,7 +168,9 @@ public class HeartbeatThread implements Runnable {
     logger.info("{}: End elections", memberName);
   }
 
-  /** Send each node (except the local node) in the group of the member a heartbeat. */
+  /**
+   * Send each node (except the local node) in the group of the member a heartbeat.
+   */
   protected void sendHeartbeats() {
     try {
       localMember.getLogManager().getLock().readLock().lock();
@@ -182,7 +186,9 @@ public class HeartbeatThread implements Runnable {
     sendHeartbeats(localMember.getAllNodes());
   }
 
-  /** Send each node (except the local node) in list a heartbeat. */
+  /**
+   * Send each node (except the local node) in list a heartbeat.
+   */
   @SuppressWarnings("java:S2445")
   private void sendHeartbeats(Collection<Peer> nodes) {
     logger.debug(
@@ -266,13 +272,6 @@ public class HeartbeatThread implements Runnable {
       return;
     }
 
-    int quorumNum = localMember.getAllNodes().size() / 2;
-    // set to true when the election has a result (rejected or succeeded)
-    AtomicBoolean electionTerminated = new AtomicBoolean(false);
-    // set to true when the election is won
-    AtomicBoolean electionValid = new AtomicBoolean(false);
-    // a decreasing vote counter
-    AtomicInteger quorum = new AtomicInteger(quorumNum);
     long nextTerm;
     try {
       localMember.getLogManager().getLock().writeLock().lock();
@@ -287,11 +286,10 @@ public class HeartbeatThread implements Runnable {
     }
 
     // the number of votes needed to become a leader,
-    // quorumNum should be equal to localMember.getAllNodes().size() / 2 + 1,
+    // currNodeQuorumNum should be equal to localMember.getAllNodes().size() / 2 + 1,
     // but since it doesn’t need to vote for itself here, it directly decreases 1
-    logger.info("{}: Election {} starts, quorum: {}", memberName, nextTerm, quorumNum);
-    // NOTICE, failingVoteCounter should be equal to quorumNum + 1
-    AtomicInteger failingVoteCounter = new AtomicInteger(quorumNum + 1);
+    logger.info("{}: Election {} starts", memberName, nextTerm);
+    // NOTICE, failingVoteCounter should be equal to currNodeQuorumNum + 1
 
     electionRequest.setTerm(nextTerm);
     electionRequest.setElector(localMember.getThisNode().getEndpoint());
@@ -300,21 +298,20 @@ public class HeartbeatThread implements Runnable {
     electionRequest.setLastLogIndex(localMember.getLogManager().getLastLogIndex());
     electionRequest.setGroupId(localMember.getRaftGroupId().convertToTConsensusGroupId());
 
+    ElectionState electionState = new ElectionState(localMember.getAllNodes(),
+        localMember.getNewNodes());
+
     requestVote(
-        localMember.getAllNodes(),
+        electionState,
         electionRequest,
-        nextTerm,
-        quorum,
-        electionTerminated,
-        electionValid,
-        failingVoteCounter);
+        nextTerm);
 
     try {
       logger.info(
           "{}: Wait for {}ms until election time out", memberName, config.getElectionTimeoutMs());
-      synchronized (electionTerminated) {
-        electionWaitObject = electionTerminated;
-        electionTerminated.wait(config.getElectionTimeoutMs());
+      synchronized (electionState) {
+        electionWaitObject = electionState;
+        electionState.wait(config.getElectionTimeoutMs());
       }
       electionWaitObject = null;
     } catch (InterruptedException e) {
@@ -323,8 +320,7 @@ public class HeartbeatThread implements Runnable {
     }
 
     // if the election times out, the remaining votes do not matter
-    electionTerminated.set(true);
-    if (electionValid.get()) {
+    if (electionState.isAccepted()) {
       logger.info("{}: Election {} accepted", memberName, nextTerm);
       localMember.getStatus().setRole(RaftRole.LEADER);
       localMember.getStatus().getLeader().set(localMember.getThisNode());
@@ -332,44 +328,31 @@ public class HeartbeatThread implements Runnable {
   }
 
   /**
-   * Request a vote from each of the "nodes". Each for vote will decrease the counter "quorum" and
-   * when it reaches 0, the flag "electionValid" and "electionTerminated" will be set to true. Any
-   * against vote will set the flag "electionTerminated" to true and ends the election.
-   *
-   * @param nodes
-   * @param request
-   * @param nextTerm the term of the election
-   * @param quorum
-   * @param electionTerminated
-   * @param electionValid
+   * Request a vote from each of the "currNodes". Each for vote will decrease the counter "quorum"
+   * and when it reaches 0, the flag "electionValid" and "electionTerminated" will be set to true.
+   * Any against vote will set the flag "electionTerminated" to true and ends the election.
    */
   @SuppressWarnings("java:S2445")
   private void requestVote(
-      Collection<Peer> nodes,
+      ElectionState electionState,
       ElectionRequest request,
-      long nextTerm,
-      AtomicInteger quorum,
-      AtomicBoolean electionTerminated,
-      AtomicBoolean electionValid,
-      AtomicInteger failingVoteCounter) {
-    synchronized (nodes) {
-      // avoid concurrent modification
-      for (Peer node : nodes) {
-        if (node.equals(localMember.getThisNode())) {
-          continue;
-        }
+      long nextTerm) {
 
-        ElectionRespHandler handler =
-            new ElectionRespHandler(
-                localMember,
-                node,
-                nextTerm,
-                quorum,
-                electionTerminated,
-                electionValid,
-                failingVoteCounter);
-        requestVoteAsync(node, handler, request);
+    Collection<Peer> peers = NodeUtils.unionNodes(electionState.getCurrNodes(),
+        electionState.getNewNodes());
+    // avoid concurrent modification
+    for (Peer node : peers) {
+      if (node.equals(localMember.getThisNode())) {
+        continue;
       }
+
+      ElectionRespHandler handler =
+          new ElectionRespHandler(
+              localMember,
+              node,
+              nextTerm,
+              electionState);
+      requestVoteAsync(node, handler, request);
     }
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index 6c5e54d1d6..d8b3ae16f3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.natraft.protocol.log.appender;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
 import org.apache.iotdb.consensus.natraft.utils.Response;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
@@ -60,7 +61,7 @@ public class BlockingLogAppender implements LogAppender {
    * and append "log" to it. Otherwise report a log mismatch.
    *
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   public AppendEntryResult appendEntry(AppendEntryRequest request, Entry log) {
     long resp = checkPrevLogIndex(request.prevLogIndex);
@@ -106,7 +107,9 @@ public class BlockingLogAppender implements LogAppender {
     return result;
   }
 
-  /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+  /**
+   * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+   */
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
@@ -151,7 +154,7 @@ public class BlockingLogAppender implements LogAppender {
    *
    * @param logs append logs
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> logs) {
     logger.debug(
@@ -171,18 +174,21 @@ public class BlockingLogAppender implements LogAppender {
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
     }
 
+    ConfigChangeEntry lastConfigEntry = null;
+    for (Entry log : logs) {
+      if (log instanceof ConfigChangeEntry) {
+        lastConfigEntry = ((ConfigChangeEntry) log);
+      }
+    }
+
     AppendEntryResult result = new AppendEntryResult();
     long startWaitingTime = System.currentTimeMillis();
     while (true) {
-      synchronized (logManager) {
-        // TODO: Consider memory footprint to execute a precise rejection
-        if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
-            <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
-          resp =
-              logManager.maybeAppend(
-                  request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
-          break;
-        }
+      if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
+          <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
+        resp = lastConfigEntry == null ? appendWithoutConfigChange(request, logs, result)
+            : appendWithConfigChange(request, logs, result, lastConfigEntry);
+        break;
       }
 
       try {
@@ -197,6 +203,46 @@ public class BlockingLogAppender implements LogAppender {
       }
     }
 
+    return result;
+  }
+
+  protected long appendWithConfigChange(AppendEntriesRequest request, List<Entry> logs,
+      AppendEntryResult result, ConfigChangeEntry configChangeEntry) {
+    long resp;
+    try {
+      logManager.getLock().writeLock().lock();
+      resp =
+          logManager.maybeAppend(
+              request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+
+      if (resp != -1) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "{} append a new log list {}, commit to {}",
+              member.getName(),
+              logs,
+              request.leaderCommit);
+        }
+
+        result.status = Response.RESPONSE_STRONG_ACCEPT;
+        result.setLastLogIndex(logManager.getLastLogIndex());
+        result.setLastLogTerm(logManager.getLastLogTerm());
+        member.setNewNodes(configChangeEntry.getNewPeers());
+      } else {
+        // the incoming log points to an illegal position, reject it
+        result.status = Response.RESPONSE_LOG_MISMATCH;
+      }
+    } finally {
+      logManager.getLock().writeLock().unlock();
+    }
+    return resp;
+  }
+
+  protected long appendWithoutConfigChange(AppendEntriesRequest request, List<Entry> logs,
+      AppendEntryResult result) {
+    long resp =
+        logManager.maybeAppend(
+            request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
     if (resp != -1) {
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -213,7 +259,7 @@ public class BlockingLogAppender implements LogAppender {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
     }
-    return result;
+    return resp;
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
index e7ae9e2dca..15ecbca8b9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
@@ -299,7 +299,7 @@ public class CatchUpTask implements Runnable {
   }
 
   private void doSnapshot() {
-    raftMember.getLogManager().takeSnapshot();
+    raftMember.getLogManager().takeSnapshot(raftMember);
     snapshot = raftMember.getLogManager().getSnapshot(peerInfo.getMatchIndex());
     if (logger.isInfoEnabled()) {
       logger.info(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index f7f5c7ba10..44939e9f61 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
@@ -89,7 +91,7 @@ public class LogDispatcher {
     }
     this.allNodes = member.getAllNodes();
     this.newNodes = member.getNewNodes();
-    createQueueAndBindingThreads(unionNodes());
+    createQueueAndBindingThreads(unionNodes(allNodes, newNodes));
   }
 
   public void updateRateLimiter() {
@@ -99,15 +101,7 @@ public class LogDispatcher {
     }
   }
 
-  private Collection<Peer> unionNodes() {
-    if (newNodes == null) {
-      return allNodes;
-    }
-    Set<Peer> nodeUnion = new HashSet<>();
-    nodeUnion.addAll(allNodes);
-    nodeUnion.addAll(newNodes);
-    return nodeUnion;
-  }
+
 
 
   void createQueue(Peer node) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 8f63f3142f..6cecfa1b9e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -77,4 +77,12 @@ public class ConfigChangeEntry extends Entry {
       newPeers.add(Peer.deserialize(buffer));
     }
   }
+
+  public List<Peer> getNewPeers() {
+    return newPeers;
+  }
+
+  public List<Peer> getOldPeers() {
+    return oldPeers;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index d6a032d32a..ddf771a41b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.consensus.natraft.protocol.log.manager;
 
 import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
@@ -53,7 +55,7 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
   }
 
   @Override
-  public void takeSnapshot() {
+  public void takeSnapshot(RaftMember member) {
     latestSnapshotDir =
         new File(
             config.getStorageDir()
@@ -61,17 +63,19 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
                 + getName()
                 + "-snapshot-"
                 + System.currentTimeMillis());
+    List<Peer> currNodes;
     try {
       lock.readLock().lock();
       snapshotIndex = getAppliedIndex();
       snapshotTerm = getAppliedTerm();
+      currNodes = member.getAllNodes();
     } finally {
       lock.readLock().unlock();
     }
     stateMachine.takeSnapshot(latestSnapshotDir);
     List<Path> snapshotFiles = stateMachine.getSnapshotFiles(latestSnapshotDir);
     snapshotFiles.addAll(IOUtils.collectPaths(latestSnapshotDir));
-    directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles);
+    directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles, currNodes);
     directorySnapshot.setLastLogIndex(snapshotIndex);
     directorySnapshot.setLastLogTerm(snapshotTerm);
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 682bec242a..6c1be21199 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
 import org.apache.iotdb.consensus.natraft.protocol.HardState;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.logtype.EmptyEntry;
@@ -199,7 +200,7 @@ public abstract class RaftLogManager {
    *
    * @throws IOException timeout exception
    */
-  public abstract void takeSnapshot();
+  public abstract void takeSnapshot(RaftMember member);
 
   /**
    * Update the raftNode's hardState(currentTerm,voteFor) and flush to disk.
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index d1fd917684..359bb1393b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -4,13 +4,16 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
 
+import java.io.DataOutputStream;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
 import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,43 +37,40 @@ public class DirectorySnapshot extends Snapshot {
   private TEndPoint source;
   private String memberName;
 
-  public DirectorySnapshot(File directory, List<Path> filePaths) {
+  public DirectorySnapshot() {
+
+  }
+
+  public DirectorySnapshot(File directory, List<Path> filePaths, List<Peer> peers) {
     this.directory = directory;
     this.filePaths = filePaths;
+    this.currNodes = peers;
   }
 
   @Override
   public ByteBuffer serialize() {
 
-    byte[] rootBytes = directory.getAbsolutePath().getBytes(StandardCharsets.UTF_8);
-    int bufferSize = Long.BYTES * 2 + rootBytes.length + Integer.BYTES * 2;
-    byte[][] filePathBytes = new byte[filePaths.size()][];
-    for (int i = 0; i < filePaths.size(); i++) {
-      Path path = filePaths.get(i);
-      byte[] bytes = path.toString().getBytes(StandardCharsets.UTF_8);
-      filePathBytes[i] = bytes;
-      bufferSize += Integer.BYTES;
-      bufferSize += bytes.length;
-    }
+    PublicBAOS baos = new PublicBAOS();
+    DataOutputStream dataOutputStream = new DataOutputStream(baos);
 
-    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-    buffer.putLong(lastLogIndex);
-    buffer.putLong(lastLogTerm);
-    buffer.putInt(rootBytes.length);
-    buffer.put(rootBytes);
-    buffer.putInt(filePaths.size());
-    for (byte[] relativeFileByte : filePathBytes) {
-      buffer.putInt(relativeFileByte.length);
-      buffer.put(relativeFileByte);
+    serializeBase(dataOutputStream);
+
+    try {
+      dataOutputStream.writeBytes(directory.getAbsolutePath());
+      dataOutputStream.writeInt(filePaths.size());
+      for (Path filePath : filePaths) {
+        dataOutputStream.writeBytes(filePath.toString());
+      }
+    } catch (IOException e) {
+      // unreachable
     }
-    buffer.flip();
-    return buffer;
+    return ByteBuffer.wrap(baos.getBuf(), 0, baos.size());
   }
 
   @Override
   public void deserialize(ByteBuffer buffer) {
-    lastLogIndex = buffer.getLong();
-    lastLogTerm = buffer.getLong();
+    deserializeBase(buffer);
+
     int size = buffer.getInt();
     byte[] bytes = new byte[size];
     buffer.get(bytes);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
index 8414ad5483..b5aa2ae455 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
@@ -19,7 +19,12 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 
 import java.nio.ByteBuffer;
@@ -35,6 +40,7 @@ public abstract class Snapshot {
 
   protected long lastLogIndex;
   protected long lastLogTerm;
+  protected List<Peer> currNodes;
 
   public abstract ByteBuffer serialize();
 
@@ -71,4 +77,37 @@ public abstract class Snapshot {
   public String toString() {
     return String.format("%d-%d", lastLogIndex, lastLogTerm);
   }
+
+  public List<Peer> getCurrNodes() {
+    return currNodes;
+  }
+
+  public void setCurrNodes(List<Peer> currNodes) {
+    this.currNodes = currNodes;
+  }
+
+  protected void serializeBase(DataOutputStream dataOutputStream) {
+    try {
+      dataOutputStream.writeLong(lastLogIndex);
+      dataOutputStream.writeLong(lastLogTerm);
+
+      dataOutputStream.writeInt(currNodes.size());
+      for (Peer currNode : currNodes) {
+        currNode.serialize(dataOutputStream);
+      }
+    } catch (IOException e) {
+      // unreachable
+    }
+  }
+
+  protected void deserializeBase(ByteBuffer buffer) {
+    lastLogIndex = buffer.getLong();
+    lastLogTerm = buffer.getLong();
+
+    int size = buffer.getInt();
+    currNodes = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      currNodes.add(Peer.deserialize(buffer));
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index cefecbf9a2..9b22f4cc00 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -21,11 +21,16 @@ package org.apache.iotdb.consensus.natraft.service;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.natraft.RaftConsensus;
 import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
 import org.apache.iotdb.consensus.natraft.utils.IOUtils;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
@@ -56,7 +61,8 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
     this.consensus = consensus;
   }
 
-  public void handleClientExit() {}
+  public void handleClientExit() {
+  }
 
   private RaftMember getMember(TConsensusGroupId groupId) throws TException {
     RaftMember member = consensus.getMember(Factory.createFromTConsensusGroupId(groupId));
@@ -66,6 +72,44 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
     return member;
   }
 
+  private ConfigChangeEntry findFirstConfigChangeEntry(AppendEntriesRequest request)
+      throws UnknownLogTypeException {
+    ConfigChangeEntry configChangeEntry = null;
+    for (ByteBuffer entryBuffer : request.entries) {
+      Entry entry = LogParser.getINSTANCE().parse(entryBuffer);
+      if (entry instanceof ConfigChangeEntry) {
+        configChangeEntry = (ConfigChangeEntry) entry;
+        break;
+      }
+    }
+    return configChangeEntry;
+  }
+
+  /**
+   * Get the associated member or create it using the last config change entry in the request (if
+   * any).
+   */
+  private RaftMember getMemberOrCreate(TConsensusGroupId tgroupId, AppendEntriesRequest request)
+      throws TException {
+    ConsensusGroupId groupId = Factory.createFromTConsensusGroupId(tgroupId);
+    RaftMember member = consensus.getMember(groupId);
+    if (member == null) {
+      try {
+        ConfigChangeEntry lastConfigChangeEntry = findFirstConfigChangeEntry(request);
+        if (lastConfigChangeEntry != null) {
+          Peer thisPeer = new Peer(groupId, consensus.getThisNodeId(), consensus.getThisNode());
+          consensus.createNewMemberIfAbsent(groupId, thisPeer, lastConfigChangeEntry.getOldPeers(),
+              lastConfigChangeEntry.getNewPeers());
+          return consensus.getMember(groupId);
+        }
+      } catch (UnknownLogTypeException e) {
+        throw new TException(e.getMessage());
+      }
+
+    }
+    throw new NoMemberException("No such member of: " + tgroupId);
+  }
+
   @Override
   public void sendHeartbeat(
       HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler)
@@ -89,7 +133,7 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
   public void appendEntries(
       AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
       throws TException {
-    RaftMember member = getMember(request.groupId);
+    RaftMember member = getMemberOrCreate(request.groupId, request);
     try {
       resultHandler.onComplete(member.appendEntries(request));
     } catch (UnknownLogTypeException e) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
index 229a9bc81e..94a30dd51e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -1,7 +1,10 @@
 package org.apache.iotdb.consensus.natraft.utils;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import org.apache.iotdb.consensus.common.Peer;
 
 public class NodeUtils {
@@ -15,4 +18,14 @@ public class NodeUtils {
     }
     return addedNode;
   }
+
+  public static Collection<Peer> unionNodes(List<Peer> currNodes, List<Peer> newNodes) {
+    if (newNodes == null) {
+      return currNodes;
+    }
+    Set<Peer> nodeUnion = new HashSet<>();
+    nodeUnion.addAll(currNodes);
+    nodeUnion.addAll(newNodes);
+    return nodeUnion;
+  }
 }