You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:06 UTC

[iotdb] 06/09: merge sequencer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6c29bcaac1a34971a3922b3c9ef6e351ebacb3e
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:45:26 2021 +0800

    merge sequencer
---
 .../apache/iotdb/cluster/config/ClusterConfig.java |   3 +-
 .../apache/iotdb/cluster/expr/SequencerExpr.java   | 339 +++------------------
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   4 +-
 .../log/sequencing/AsynchronousSequencer.java      |  33 +-
 .../log/sequencing/LogSequencerFactory.java        |   1 -
 .../log/sequencing/SynchronousSequencer.java       |  12 +-
 .../cluster/server/member/DataGroupMember.java     |   1 -
 .../cluster/server/member/MetaGroupMember.java     |   1 -
 .../iotdb/cluster/server/member/RaftMember.java    | 137 +++------
 9 files changed, 123 insertions(+), 408 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index c9cbc59..420f7ca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -183,8 +183,7 @@ public class ClusterConfig {
 
   private boolean useIndirectBroadcasting = false;
 
-  private boolean useAsyncSequencing = false;
-
+  private boolean useAsyncSequencing = true;
 
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
index 7dae6a7..4629962 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
@@ -19,23 +19,9 @@
 
 package org.apache.iotdb.cluster.expr;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.exception.LogExecutionException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
-import org.apache.iotdb.cluster.log.LogParser;
-import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -47,265 +33,34 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-import org.apache.iotdb.cluster.utils.IOUtils;
-import org.apache.iotdb.cluster.utils.StatusUtils;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
-import org.apache.iotdb.db.qp.physical.sys.LogPlan;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class SequencerExpr extends MetaGroupMember {
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-  private static final Logger logger = LoggerFactory.getLogger(SequencerExpr.class);
+public class SequencerExpr extends MetaGroupMember {
 
-  private int v2ThreadNum = 2000;
-  private int v3ThreadNum = 0000;
+  private int threadNum = 1000;
   private AtomicLong reqCnt = new AtomicLong();
 
-  private BlockingQueue<SendLogRequest> nonsequencedLogQueue = new ArrayBlockingQueue<>(
-      4096);
-
   public SequencerExpr() {
-    LogApplier applier = new LogApplier() {
-      @Override
-      public void apply(Log log) {
-        log.setApplied(true);
-      }
-
-      @Override
-      public void close() {
-
-      }
-    };
+    LogApplier applier =
+        new LogApplier() {
+          @Override
+          public void apply(Log log) {
+            log.setApplied(true);
+          }
+
+          @Override
+          public void close() {}
+        };
     logManager = new MetaSingleSnapshotLogManager(applier, this);
 
-    new Thread(this::sequenceLog).start();
-    new Thread(this::sequenceLog).start();
-    new Thread(this::sequenceLog).start();
-    new Thread(this::sequenceLog).start();
     reportThread = Executors.newSingleThreadScheduledExecutor();
     reportThread.scheduleAtFixedRate(
         this::generateNodeReport, REPORT_INTERVAL_SEC, REPORT_INTERVAL_SEC, TimeUnit.SECONDS);
-  }
-
-  private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
-    logger.debug("{}: Processing plan {}", getName(), plan);
-    // assign term and index to the new log and append it
-    SendLogRequest sendLogRequest;
-
-    Log log;
-    if (plan instanceof LogPlan) {
-      try {
-        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-      } catch (UnknownLogTypeException e) {
-        logger.error("Can not parse LogPlan {}", plan, e);
-        return StatusUtils.PARSE_LOG_ERROR;
-      }
-    } else {
-      log = new PhysicalPlanLog();
-      ((PhysicalPlanLog) log).setPlan(plan);
-    }
-
-    if (log.serialize().capacity() + Integer.BYTES
-        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-      logger.error(
-          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-              + "or reduce the size of requests you send.");
-      return StatusUtils.INTERNAL_ERROR;
-    }
-
-    long startTime =
-        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
-    synchronized (logManager) {
-      Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
-          startTime);
-
-      plan.setIndex(logManager.getLastLogIndex() + 1);
-      log.setCurrLogTerm(getTerm().get());
-      log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
-      startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
-      // just like processPlanLocally,we need to check the size of log
-
-      // logDispatcher will serialize log, and set log size, and we will use the size after it
-      logManager.append(log);
-      Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
-      startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
-      sendLogRequest = buildSendLogRequest(log);
-      Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
-
-      startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
-      log.setCreateTime(System.nanoTime());
-      votingLogList.insert(sendLogRequest.getVotingLog());
-      getLogDispatcher().offer(sendLogRequest);
-      Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
-    }
-
-    try {
-      AppendLogResult appendLogResult =
-          waitAppendResult(
-              sendLogRequest.getVotingLog(),
-              sendLogRequest.getLeaderShipStale(),
-              sendLogRequest.getNewLeaderTerm(),
-              sendLogRequest.getQuorumSize());
-      Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
-          sendLogRequest.getVotingLog().getLog().getCreateTime());
-
-      switch (appendLogResult) {
-        case WEAK_ACCEPT:
-          // TODO: change to weak
-          Statistic.RAFT_WEAK_ACCEPT.add(1);
-          return StatusUtils.OK;
-        case OK:
-          logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
-          startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
-          commitLog(log);
-          Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
-          return StatusUtils.OK;
-        case TIME_OUT:
-          logger.debug("{}: log {} timed out...", getName(), log);
-          break;
-        case LEADERSHIP_STALE:
-          // abort the appending, the new leader will fix the local logs by catch-up
-        default:
-          break;
-      }
-    } catch (Exception e) {
-      return handleLogExecutionException(log, IOUtils.getRootCause(e));
-    }
-    return StatusUtils.TIME_OUT;
-  }
-
-  public SendLogRequest enqueueSendLogRequest(Log log) {
-    VotingLog votingLog = buildVotingLog(log);
-    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(term.get());
-
-    SendLogRequest sendLogRequest = new SendLogRequest(
-        votingLog, leaderShipStale, newLeaderTerm, null, allNodes.size() / 2);
-    try {
-      nonsequencedLogQueue.put(sendLogRequest);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    return sendLogRequest;
-  }
-
-  private void sequenceLog(List<SendLogRequest> sendLogRequests) {
-    long startTime;
-    synchronized (logManager) {
-      for (SendLogRequest sendLogRequest : sendLogRequests) {
-        Log log = sendLogRequest.getVotingLog().getLog();
-        log.setCurrLogTerm(getTerm().get());
-        log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-        if (log instanceof PhysicalPlanLog) {
-          ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
-        }
-
-        startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
-        // just like processPlanLocally,we need to check the size of log
-
-        // logDispatcher will serialize log, and set log size, and we will use the size after it
-        logManager.append(log);
-        Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
-        AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
-        sendLogRequest.setAppendEntryRequest(appendEntryRequest);
-
-        startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
-        log.setCreateTime(System.nanoTime());
-        votingLogList.insert(sendLogRequest.getVotingLog());
-        getLogDispatcher().offer(sendLogRequest);
-        Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
-      }
-    }
-    sendLogRequests.clear();
-  }
-
-  private void sequenceLog() {
-    List<SendLogRequest> sendLogRequests = new ArrayList<>();
-    while (!Thread.interrupted()) {
-      try {
-        synchronized (nonsequencedLogQueue) {
-          SendLogRequest request = nonsequencedLogQueue.take();
-          sendLogRequests.add(request);
-          nonsequencedLogQueue.drainTo(sendLogRequests);
-        }
-
-        sequenceLog(sendLogRequests);
-      } catch (InterruptedException e) {
-        return;
-      }
-    }
-  }
-
-  private TSStatus processPlanLocallyV3(PhysicalPlan plan) {
-    logger.debug("{}: Processing plan {}", getName(), plan);
-    // assign term and index to the new log and append it
-    SendLogRequest sendLogRequest;
-
-    Log log;
-    if (plan instanceof LogPlan) {
-      try {
-        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-      } catch (UnknownLogTypeException e) {
-        logger.error("Can not parse LogPlan {}", plan, e);
-        return StatusUtils.PARSE_LOG_ERROR;
-      }
-    } else {
-      log = new PhysicalPlanLog();
-      ((PhysicalPlanLog) log).setPlan(plan);
-    }
-
-    if (log.serialize().capacity() + Integer.BYTES
-        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-      logger.error(
-          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-              + "or reduce the size of requests you send.");
-      return StatusUtils.INTERNAL_ERROR;
-    }
-
-    long startTime;
-    sendLogRequest = enqueueSendLogRequest(log);
-
-    try {
-      AppendLogResult appendLogResult =
-          waitAppendResult(
-              sendLogRequest.getVotingLog(),
-              sendLogRequest.getLeaderShipStale(),
-              sendLogRequest.getNewLeaderTerm(),
-              sendLogRequest.getQuorumSize());
-      Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
-          sendLogRequest.getVotingLog().getLog().getCreateTime());
-
-      switch (appendLogResult) {
-        case WEAK_ACCEPT:
-          // TODO: change to weak
-          Statistic.RAFT_WEAK_ACCEPT.add(1);
-          return StatusUtils.OK;
-        case OK:
-          logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
-          startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
-          commitLog(log);
-          Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
-          return StatusUtils.OK;
-        case TIME_OUT:
-          logger.debug("{}: log {} timed out...", getName(), log);
-          break;
-        case LEADERSHIP_STALE:
-          // abort the appending, the new leader will fix the local logs by catch-up
-        default:
-          break;
-      }
-    } catch (Exception e) {
-      return handleLogExecutionException(log, IOUtils.getRootCause(e));
-    }
-    return StatusUtils.TIME_OUT;
+    logSequencer = SEQUENCER_FACTORY.create(this, logManager);
   }
 
   @Override
@@ -323,43 +78,36 @@ public class SequencerExpr extends MetaGroupMember {
     };
   }
 
-  private void decentralizedSequencing() {
-    for (int i = 0; i < v2ThreadNum; i++) {
-      new Thread(() -> {
-        while (true) {
-          reqCnt.incrementAndGet();
-          DummyPlan dummyPlan = new DummyPlan();
-          processPlanLocallyV2(dummyPlan);
-        }
-      }).start();
-    }
-  }
-
-  private void centralizedSequencing() {
-    for (int i = 0; i < v3ThreadNum; i++) {
-      new Thread(() -> {
-        while (true) {
-          reqCnt.incrementAndGet();
-          DummyPlan dummyPlan = new DummyPlan();
-          processPlanLocallyV3(dummyPlan);
-        }
-      }).start();
+  private void sequencing() {
+    for (int i = 0; i < threadNum; i++) {
+      new Thread(
+              () -> {
+                while (true) {
+                  reqCnt.incrementAndGet();
+                  DummyPlan dummyPlan = new DummyPlan();
+                  processPlanLocallyV2(dummyPlan);
+                }
+              })
+          .start();
     }
   }
 
   private void startMonitor() {
-    new Thread(() -> {
-      long startTime = System.currentTimeMillis();
-      while (true) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-        long consumedTime = System.currentTimeMillis() - startTime;
-        System.out.println("" + consumedTime + ", " + (reqCnt.get() * 1.0 / consumedTime * 1000L));
-      }
-    }).start();
+    new Thread(
+            () -> {
+              long startTime = System.currentTimeMillis();
+              while (true) {
+                try {
+                  Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                  e.printStackTrace();
+                }
+                long consumedTime = System.currentTimeMillis() - startTime;
+                System.out.println(
+                    "" + consumedTime + ", " + (reqCnt.get() * 1.0 / consumedTime * 1000L));
+              }
+            })
+        .start();
   }
 
   public static void main(String[] args) {
@@ -372,8 +120,7 @@ public class SequencerExpr extends MetaGroupMember {
       group.add(new Node().setNodeIdentifier(i).setMetaPort(i));
     }
     sequencerExpr.setAllNodes(group);
-    sequencerExpr.centralizedSequencing();
-    sequencerExpr.decentralizedSequencing();
+    sequencerExpr.sequencing();
     sequencerExpr.startMonitor();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index ac60e44..4b445df 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -976,6 +976,8 @@ public abstract class RaftLogManager {
     while (!Thread.currentThread().isInterrupted()) {
       try {
         doCheckAppliedLogIndex();
+      } catch (IndexOutOfBoundsException e) {
+        // ignore
       } catch (Exception e) {
         logger.error("{}, an exception occurred when checking the applied log index", name, e);
       }
@@ -998,7 +1000,7 @@ public abstract class RaftLogManager {
       }
       Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
       if (log == null || log.getCurrLogIndex() != nextToCheckIndex) {
-        logger.warn(
+        logger.debug(
             "{}, get log error when checking the applied log index, log={}, nextToCheckIndex={}",
             name,
             log,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 63cc38b..0def4d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -19,16 +19,9 @@
 
 package org.apache.iotdb.cluster.log.sequencing;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -36,9 +29,18 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class AsynchronousSequencer implements LogSequencer {
 
   private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class);
@@ -51,8 +53,7 @@ public class AsynchronousSequencer implements LogSequencer {
 
   private BlockingQueue<SendLogRequest> unsequencedLogQueue;
 
-  public AsynchronousSequencer(RaftMember member,
-      RaftLogManager logManager) {
+  public AsynchronousSequencer(RaftMember member, RaftLogManager logManager) {
     this.member = member;
     this.logManager = logManager;
     unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
@@ -62,12 +63,13 @@ public class AsynchronousSequencer implements LogSequencer {
   }
 
   public SendLogRequest enqueueSendLogRequest(Log log) {
-    AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+    VotingLog votingLog = member.buildVotingLog(log);
     AtomicBoolean leaderShipStale = new AtomicBoolean(false);
     AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
 
-    SendLogRequest request = new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm,
-        null);
+    SendLogRequest request =
+        new SendLogRequest(
+            votingLog, leaderShipStale, newLeaderTerm, null, member.getAllNodes().size() / 2);
     try {
       unsequencedLogQueue.put(request);
     } catch (InterruptedException e) {
@@ -81,7 +83,7 @@ public class AsynchronousSequencer implements LogSequencer {
     long startTime;
     synchronized (logManager) {
       for (SendLogRequest sendLogRequest : sendLogRequests) {
-        Log log = sendLogRequest.getLog();
+        Log log = sendLogRequest.getVotingLog().getLog();
         log.setCurrLogTerm(member.getTerm().get());
         log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
         if (log instanceof PhysicalPlanLog) {
@@ -100,6 +102,7 @@ public class AsynchronousSequencer implements LogSequencer {
 
         startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
         log.setCreateTime(System.nanoTime());
+        member.getVotingLogList().insert(sendLogRequest.getVotingLog());
         member.getLogDispatcher().offer(sendLogRequest);
         Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
       }
@@ -127,7 +130,7 @@ public class AsynchronousSequencer implements LogSequencer {
 
   @Override
   public SendLogRequest sequence(Log log) {
-    return null;
+    return enqueueSendLogRequest(log);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
index 627ef84..cf5b3cc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.iotdb.cluster.log.sequencing;
 
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 4da61b5..f7e6704 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log.sequencing;
 
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -28,7 +29,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -70,6 +70,7 @@ public class SynchronousSequencer implements LogSequencer {
 
       startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
       log.setCreateTime(System.nanoTime());
+      member.getVotingLogList().insert(sendLogRequest.getVotingLog());
       member.getLogDispatcher().offer(sendLogRequest);
       Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
     }
@@ -82,7 +83,7 @@ public class SynchronousSequencer implements LogSequencer {
   }
 
   private SendLogRequest buildSendLogRequest(Log log) {
-    AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+    VotingLog votingLog = member.buildVotingLog(log);
     AtomicBoolean leaderShipStale = new AtomicBoolean(false);
     AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
 
@@ -90,7 +91,12 @@ public class SynchronousSequencer implements LogSequencer {
     AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
     Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
 
-    return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
+    return new SendLogRequest(
+        votingLog,
+        leaderShipStale,
+        newLeaderTerm,
+        appendEntryRequest,
+        member.getAllNodes().size() / 2);
   }
 
   public static class Factory implements LogSequencerFactory {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index a00420f..eeea9d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
 import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
-import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 81abb87..9dafebe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
-import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
 import org.apache.iotdb.cluster.partition.NodeAdditionResult;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 2835cf3..829e76b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
-import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer;
 import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
 import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
 import org.apache.iotdb.cluster.log.sequencing.LogSequencerFactory;
@@ -124,8 +123,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
 
 /**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so
- * on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
  */
 @SuppressWarnings("java:S3077") // reference volatile is enough
 public abstract class RaftMember {
@@ -137,7 +135,8 @@ public abstract class RaftMember {
   public static boolean ENABLE_COMMIT_RETURN = false;
 
   protected static final LogSequencerFactory SEQUENCER_FACTORY =
-      ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing() ? new Factory()
+      ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing()
+          ? new Factory()
           : new SynchronousSequencer.Factory();
 
   private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
@@ -158,32 +157,22 @@ public abstract class RaftMember {
    * on this may be woken.
    */
   private final Object waitLeaderCondition = new Object();
-  /**
-   * the lock is to make sure that only one thread can apply snapshot at the same time
-   */
+  /** the lock is to make sure that only one thread can apply snapshot at the same time */
   private final Object snapshotApplyLock = new Object();
 
   private final Object heartBeatWaitObject = new Object();
 
   protected Node thisNode = ClusterConstant.EMPTY_NODE;
 
-  /**
-   * the nodes that belong to the same raft group as thisNode.
-   */
+  /** the nodes that belong to the same raft group as thisNode. */
   protected PartitionGroup allNodes;
 
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-  /**
-   * the name of the member, to distinguish several members in the logs.
-   */
+  /** the name of the member, to distinguish several members in the logs. */
   String name;
-  /**
-   * to choose nodes to send request of joining cluster randomly.
-   */
+  /** to choose nodes to send request of joining cluster randomly. */
   Random random = new Random();
-  /**
-   * when the node is a leader, this map is used to track log progress of each follower.
-   */
+  /** when the node is a leader, this map is used to track log progress of each follower. */
   Map<Node, Peer> peerMap;
   /**
    * the current term of the node, this object also works as lock of some transactions of the member
@@ -208,8 +197,8 @@ public abstract class RaftMember {
   /** the raft logs are all stored and maintained in the log manager */
   protected RaftLogManager logManager;
 
-  /**s
-   * the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
+  /**
+   * s the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
    * when this node is a leader, or start elections when this node is an elector.
    */
   ExecutorService heartBeatService;
@@ -224,9 +213,7 @@ public abstract class RaftMember {
    * member by comparing it with the current last log index.
    */
   long lastReportedLogIndex;
-  /**
-   * the thread pool that runs catch-up tasks
-   */
+  /** the thread pool that runs catch-up tasks */
   private ExecutorService catchUpService;
   /**
    * lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -262,26 +249,20 @@ public abstract class RaftMember {
    * one slow node.
    */
   private ExecutorService serialToParallelPool;
-  /**
-   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
-   */
+  /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
   private ExecutorService commitLogPool;
 
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
-   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
-   * logs.
+   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
    */
   private LogDispatcher logDispatcher;
 
-  /**
-   * If this node can not be the leader, this parameter will be set true.
-   */
+  /** If this node can not be the leader, this parameter will be set true. */
   private volatile boolean skipElection = false;
 
   /**
-   * localExecutor is used to directly execute plans like load configuration in the underlying
-   * IoTDB
+   * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
    */
   protected PlanExecutor localExecutor;
 
@@ -842,22 +823,16 @@ public abstract class RaftMember {
     return lastCatchUpResponseTime;
   }
 
-  /**
-   * Sub-classes will add their own process of HeartBeatResponse in this method.
-   */
-  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
-  }
+  /** Sub-classes will add their own process of HeartBeatResponse in this method. */
+  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
 
-  /**
-   * The actions performed when the node wins in an election (becoming a leader).
-   */
-  public void onElectionWins() {
-  }
+  /** The actions performed when the node wins in an election (becoming a leader). */
+  public void onElectionWins() {}
 
   /**
    * Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
-   * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
-   * a part of data is in the snapshot, then it is not in the logs.
+   * follower. If some of the required logs are removed, also send the snapshot. <br>
+   * notice that if a part of data is in the snapshot, then it is not in the logs.
    */
   public void catchUp(Node follower, long lastLogIdx) {
     // for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -944,9 +919,7 @@ public abstract class RaftMember {
     }
   }
 
-  /**
-   * call back after syncLeader
-   */
+  /** call back after syncLeader */
   public interface CheckConsistency {
 
     /**
@@ -955,7 +928,7 @@ public abstract class RaftMember {
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
      * @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
-     *                                   implements.
+     *     implements.
      */
     void postCheckConsistency(long leaderCommitId, long localAppliedId)
         throws CheckConsistencyException;
@@ -964,7 +937,8 @@ public abstract class RaftMember {
   public static class MidCheckConsistency implements CheckConsistency {
 
     /**
-     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION
+     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
+     * CHECK_MID_CONSISTENCY_EXCEPTION
      *
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
@@ -976,7 +950,7 @@ public abstract class RaftMember {
       if (leaderCommitId == Long.MAX_VALUE
           || leaderCommitId == Long.MIN_VALUE
           || leaderCommitId - localAppliedId
-          > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+              > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
         throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
       }
     }
@@ -1009,7 +983,7 @@ public abstract class RaftMember {
    * @param checkConsistency check after syncleader
    * @return true if the node has caught up, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *                                   value after timeout
+   *     value after timeout
    */
   public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
     if (character == NodeCharacter.LEADER) {
@@ -1028,9 +1002,7 @@ public abstract class RaftMember {
     return waitUntilCatchUp(checkConsistency);
   }
 
-  /**
-   * Wait until the leader of this node becomes known or time out.
-   */
+  /** Wait until the leader of this node becomes known or time out. */
   public void waitLeader() {
     long startTime = System.currentTimeMillis();
     while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -1057,7 +1029,7 @@ public abstract class RaftMember {
    *
    * @return true if this node has caught up before timeout, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *                                   value after timeout
+   *     value after timeout
    */
   protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
       throws CheckConsistencyException {
@@ -1090,7 +1062,7 @@ public abstract class RaftMember {
    * sync local applyId to leader commitId
    *
    * @param leaderCommitId leader commit id
-   * @param fastFail       if enable, when log differ too much, return false directly.
+   * @param fastFail if enable, when log differ too much, return false directly.
    * @return true if leaderCommitId <= localAppliedId
    */
   public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
@@ -1144,7 +1116,7 @@ public abstract class RaftMember {
    * call this method. Will commit the log locally and send it to followers
    *
    * @return OK if over half of the followers accept the log or null if the leadership is lost
-   * during the appending
+   *     during the appending
    */
   public TSStatus processPlanLocally(PhysicalPlan plan) {
     if (USE_LOG_DISPATCHER) {
@@ -1198,7 +1170,7 @@ public abstract class RaftMember {
     return StatusUtils.TIME_OUT;
   }
 
-  private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+  protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
     logger.debug("{}: Processing plan {}", name, plan);
     if (readOnly) {
       return StatusUtils.NODE_READ_ONLY;
@@ -1277,7 +1249,7 @@ public abstract class RaftMember {
         votingLog, leaderShipStale, newLeaderTerm, appendEntryRequest, allNodes.size() / 2);
   }
 
-  protected VotingLog buildVotingLog(Log log) {
+  public VotingLog buildVotingLog(Log log) {
     return new VotingLog(log, allNodes.size());
   }
 
@@ -1369,9 +1341,7 @@ public abstract class RaftMember {
     return peerMap;
   }
 
-  /**
-   * @return true if there is a log whose index is "index" and term is "term", false otherwise
-   */
+  /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
   public boolean matchLog(long index, long term) {
     boolean matched = logManager.matchTerm(term, index);
     logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1390,18 +1360,15 @@ public abstract class RaftMember {
     return syncLock;
   }
 
-  /**
-   * Sub-classes will add their own process of HeartBeatRequest in this method.
-   */
-  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
-  }
+  /** Sub-classes will add their own process of HeartBeatRequest in this method. */
+  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
 
   /**
    * Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
    * request is valid.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkElectorLogProgress(ElectionRequest electionRequest) {
 
@@ -1445,7 +1412,7 @@ public abstract class RaftMember {
    * lastLogIndex is smaller than the voter's Otherwise accept the election.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkLogProgress(long lastLogIndex, long lastLogTerm) {
     long response;
@@ -1462,10 +1429,10 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to a node using the default client.
    *
-   * @param plan   a non-query plan
-   * @param node   cannot be the local node
+   * @param plan a non-query plan
+   * @param node cannot be the local node
    * @param header must be set for data group communication, set to null for meta group
-   *               communication
+   *     communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
   public TSStatus forwardPlan(PhysicalPlan plan, Node node, RaftNode header) {
@@ -1496,7 +1463,7 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to "receiver" using "client".
    *
-   * @param plan   a non-query plan
+   * @param plan a non-query plan
    * @param header to determine which DataGroupMember of "receiver" will process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
@@ -1578,7 +1545,7 @@ public abstract class RaftMember {
    * Get an asynchronous thrift client of the given node.
    *
    * @return an asynchronous thrift client or null if the caller tries to connect the local node or
-   * the node cannot be reached.
+   *     the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
     return getAsyncClient(node, asyncClientPool, true);
@@ -1812,7 +1779,7 @@ public abstract class RaftMember {
    * heartbeat timer.
    *
    * @param fromLeader true if the request is from a leader, false if the request is from an
-   *                   elector.
+   *     elector.
    */
   public void stepDown(long newTerm, boolean fromLeader) {
     synchronized (term) {
@@ -1844,9 +1811,7 @@ public abstract class RaftMember {
     this.thisNode = thisNode;
   }
 
-  /**
-   * @return the header of the data raft group or null if this is in a meta group.
-   */
+  /** @return the header of the data raft group or null if this is in a meta group. */
   public RaftNode getHeader() {
     return null;
   }
@@ -2005,9 +1970,7 @@ public abstract class RaftMember {
         log, node, leaderShipStale, newLeaderTerm, request, quorumSize, Collections.emptyList());
   }
 
-  /**
-   * Send "log" to "node".
-   */
+  /** Send "log" to "node". */
   public void sendLogToFollower(
       VotingLog log,
       Node node,
@@ -2140,7 +2103,7 @@ public abstract class RaftMember {
    * and append "log" to it. Otherwise report a log mismatch.
    *
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   protected AppendEntryResult appendEntry(
       long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
@@ -2170,9 +2133,7 @@ public abstract class RaftMember {
     return result;
   }
 
-  /**
-   * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
-   */
+  /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
@@ -2218,7 +2179,7 @@ public abstract class RaftMember {
    *
    * @param logs append logs
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   protected AppendEntryResult appendEntries(
       long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {