You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/04/07 01:16:56 UTC

[iotdb] 01/03: fix ack leader with wrong receiver

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1bac74e9ae4ca34a8cf49f86ca260e6e95e1151d
Author: jt <jt...@163.com>
AuthorDate: Tue Mar 29 11:28:17 2022 +0800

    fix ack leader with wrong receiver
---
 cluster/distribute-dc.sh                           |   2 +-
 .../resources/conf/iotdb-cluster.properties        |   3 +-
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  19 +++
 .../cluster/client/async/AsyncDataClient.java      |   4 -
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 ++
 .../iotdb/cluster/config/ClusterDescriptor.java    |   6 +
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 128 +++++++++++---
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 137 ++++++++++-----
 .../java/org/apache/iotdb/cluster/log/Log.java     |   2 +-
 .../org/apache/iotdb/cluster/log/LogAckSender.java | 186 +++++++++++++++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  65 +++++--
 .../org/apache/iotdb/cluster/log/LogRelay.java     |  51 +++++-
 .../apache/iotdb/cluster/log/VotingLogList.java    |  16 ++
 .../cluster/log/appender/BlockingLogAppender.java  |  47 ++++--
 .../iotdb/cluster/log/appender/LogAppender.java    |   7 +-
 .../log/appender/SlidingWindowLogAppender.java     |  88 ++++++++--
 .../cluster/log/logtypes/PhysicalPlanLog.java      |  11 ++
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  17 +-
 .../cluster/query/manage/QueryCoordinator.java     |   3 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |  55 +++---
 .../server/handlers/caller/HeartbeatHandler.java   |  16 +-
 .../cluster/server/heartbeat/HeartbeatThread.java  |  18 +-
 .../cluster/server/member/DataGroupMember.java     |   3 +-
 .../cluster/server/member/MetaGroupMember.java     |   3 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 174 +++++++++----------
 .../iotdb/cluster/server/monitor/NodeReport.java   |  20 ++-
 .../iotdb/cluster/server/monitor/NodeStatus.java   |  34 ++++
 .../cluster/server/monitor/NodeStatusManager.java  |   9 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java |  64 ++++++-
 .../cluster/server/service/MetaSyncService.java    |   5 +-
 .../apache/iotdb/cluster/utils/WeightedList.java   |  87 ++++++++++
 .../caller/AppendNodeEntryHandlerTest.java         |  12 +-
 thrift-cluster/src/main/thrift/cluster.thrift      |   3 +-
 33 files changed, 1018 insertions(+), 287 deletions(-)

diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 1eba6f4264..5e1af6b312 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,6 +1,6 @@
 src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
 
-ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
+ips=(dc13 dc14 dc15 dc16 dc17 dc18)
 target_lib_path=/home/jt/iotdb_expr/lib
 
 for ip in ${ips[*]}
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index ff363749b4..56566c327e 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -212,4 +212,5 @@ multi_raft_factor=1
 # replicas is high, which may make thread switching costly.
 # dispatcher_binding_thread_num=16
 
-use_indirect_broadcasting=true
\ No newline at end of file
+use_indirect_broadcasting=true
+optimize_indirect_broadcasting=false
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 36c40d04bc..2e5b02cc58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,6 +45,9 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
 import org.apache.iotdb.cluster.server.raft.DataRaftService;
 import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -79,6 +82,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -250,6 +254,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     }
 
     // we start IoTDB kernel first. then we start the cluster module.
+    Runtime.getRuntime().addShutdownHook(new ShutdownHook());
     if (MODE_START.equals(mode)) {
       cluster.activeStartNodeMode();
     } else if (MODE_ADD.equals(mode)) {
@@ -287,6 +292,20 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     return true;
   }
 
+  private static class ShutdownHook extends Thread {
+
+    @Override
+    public void run() {
+      logger.info(
+          "Total request fanout: {}",
+          Statistic.RAFT_SENDER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
+      for (Entry<Node, NodeStatus> nodeNodeStatusEntry :
+          NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet()) {
+        logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
+      }
+    }
+  }
+
   private String clusterConfigCheck() {
     if (IoTDBDescriptor.getInstance().getConfig().isReplaceHostNameWithIp()) {
       try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index df6d5ead74..cb9c798b68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -147,17 +147,13 @@ public class AsyncDataClient extends TSDataService.AsyncClient {
 
   public static class AsyncDataClientFactory extends AsyncBaseFactory<Node, AsyncDataClient> {
 
-    Exception createStack;
-
     public AsyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       super(protocolFactory, category);
-      createStack = new Exception();
     }
 
     public AsyncDataClientFactory(
         TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
       super(protocolFactory, category, clientManager);
-      createStack = new Exception();
     }
 
     @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index c75237beca..187509b2ef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,6 +192,8 @@ public class ClusterConfig {
 
   private int relaySenderNum = 8;
 
+  private boolean optimizeIndirectBroadcasting = false;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
    * there is something error for getting the ip of the hostname, then set the internalIp as
@@ -590,4 +592,12 @@ public class ClusterConfig {
   public void setRelaySenderNum(int relaySenderNum) {
     this.relaySenderNum = relaySenderNum;
   }
+
+  public boolean isOptimizeIndirectBroadcasting() {
+    return optimizeIndirectBroadcasting;
+  }
+
+  public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) {
+    this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index c8bfbdc51f..a089e2d75a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -345,6 +345,12 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "use_indirect_broadcasting", String.valueOf(config.isUseIndirectBroadcasting()))));
 
+    config.setOptimizeIndirectBroadcasting(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "optimize_indirect_broadcasting",
+                String.valueOf(config.isOptimizeIndirectBroadcasting()))));
+
     config.setRelaySenderNum(
         Integer.parseInt(
             properties.getProperty(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 7673e116c7..d94957f74d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -28,20 +28,28 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Random;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class ExprBench {
 
+  private static final Logger logger = LoggerFactory.getLogger(ExprBench.class);
+
   private AtomicLong requestCounter = new AtomicLong();
   private AtomicLong latencySum = new AtomicLong();
   private long maxLatency = 0;
@@ -49,45 +57,86 @@ public class ExprBench {
   private int workloadSize = 64 * 1024;
   private int printInterval = 1000;
   private ClientManager clientPool;
-  private Node target;
   private int maxRequestNum;
   private ExecutorService pool = Executors.newCachedThreadPool();
   private List<Node> nodeList = new ArrayList<>();
-  private int raftFactor = 1;
+  private int[] raftFactors;
+  private int[] rateLimits;
+  private List<EndPoint> endPoints = new ArrayList<>();
+  private Map<EndPoint, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
+  private Map<EndPoint, Statistic> latencyMap = new ConcurrentHashMap<>();
 
   public ExprBench(Node target) {
-    this.target = target;
     clientPool = new ClientManager(false, Type.MetaGroupClient);
   }
 
+  private static class EndPoint {
+    private Node node;
+    private int raftId;
+
+    public EndPoint(Node node, int raftId) {
+      this.node = node;
+      this.raftId = raftId;
+    }
+
+    @Override
+    public String toString() {
+      return "EndPoint{" + "node=" + node.getInternalIp() + ", raftId=" + raftId + '}';
+    }
+  }
+
+  private static class Statistic {
+    private AtomicLong sum = new AtomicLong();
+    private AtomicLong cnt = new AtomicLong();
+
+    public void add(long val) {
+      sum.addAndGet(val);
+      cnt.incrementAndGet();
+    }
+
+    @Override
+    public String toString() {
+      return "{" + sum.get() + "," + cnt.get() + "," + (sum.get() * 1.0 / cnt.get()) + "}";
+    }
+  }
+
   public void benchmark() {
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < threadNum; i++) {
       int finalI = i;
       pool.submit(
           () -> {
-            Random random = new Random(123456L + finalI);
+            int endPointIdx = finalI % endPoints.size();
             Client client = null;
-            try {
-              client = clientPool.borrowSyncClient(target, ClientCategory.META);
-            } catch (IOException e) {
-              e.printStackTrace();
-            }
+
             ExecutNonQueryReq request = new ExecutNonQueryReq();
             DummyPlan plan = new DummyPlan();
             plan.setWorkload(new byte[workloadSize]);
             plan.setNeedForward(true);
 
             ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+            Map<EndPoint, Node> endPointLeaderMap = new HashMap<>();
 
+            Node target = null;
             long currRequsetNum = -1;
             while (true) {
 
-              if (raftFactor > 0) {
-                Node node = nodeList.get(random.nextInt(nodeList.size()));
-                int raftId = random.nextInt(raftFactor);
-                plan.setGroupIdentifier(ClusterUtils.nodeToString(node) + "#" + raftId);
+              EndPoint endPoint = endPoints.get(endPointIdx);
+              RateLimiter rateLimiter = rateLimiterMap.get(endPoint);
+              if (rateLimiter != null) {
+                rateLimiter.acquire(1);
               }
+
+              target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node);
+              int raftId = endPoint.raftId;
+              plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId);
+
+              try {
+                client = clientPool.borrowSyncClient(target, ClientCategory.META);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+
               byteBuffer.clear();
               plan.serialize(byteBuffer);
               byteBuffer.flip();
@@ -96,12 +145,20 @@ public class ExprBench {
 
               long reqLatency = System.nanoTime();
               try {
-                client.executeNonQueryPlan(request);
+                TSStatus status = client.executeNonQueryPlan(request);
+                clientPool.returnSyncClient(client, target, ClientCategory.META);
+                if (status.isSetRedirectNode()) {
+                  Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880);
+                  endPointLeaderMap.put(endPoint, leader);
+                  logger.info("Leader of {} is changed to {}", endPoint, leader);
+                }
+
                 currRequsetNum = requestCounter.incrementAndGet();
                 if (currRequsetNum > threadNum * 10) {
                   reqLatency = System.nanoTime() - reqLatency;
                   maxLatency = Math.max(maxLatency, reqLatency);
                   latencySum.addAndGet(reqLatency);
+                  latencyMap.get(endPoint).add(reqLatency);
                 }
               } catch (TException e) {
                 e.printStackTrace();
@@ -118,6 +175,7 @@ public class ExprBench {
                         currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
                         maxLatency / 1000.0,
                         (latencySum.get() + 0.0) / currRequsetNum));
+                System.out.println(latencyMap);
               }
 
               if (currRequsetNum >= maxRequestNum) {
@@ -136,14 +194,12 @@ public class ExprBench {
   public static void main(String[] args) {
     ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
     Node target = new Node();
-    target.setInternalIp(args[0]);
-    target.setMetaPort(Integer.parseInt(args[1]));
     ExprBench bench = new ExprBench(target);
-    bench.maxRequestNum = Integer.parseInt(args[2]);
-    bench.threadNum = Integer.parseInt(args[3]);
-    bench.workloadSize = Integer.parseInt(args[4]) * 1024;
-    bench.printInterval = Integer.parseInt(args[5]);
-    String[] nodesSplit = args[6].split(",");
+    bench.maxRequestNum = Integer.parseInt(args[0]);
+    bench.threadNum = Integer.parseInt(args[1]);
+    bench.workloadSize = Integer.parseInt(args[2]) * 1024;
+    bench.printInterval = Integer.parseInt(args[3]);
+    String[] nodesSplit = args[4].split(",");
     for (String s : nodesSplit) {
       String[] nodeSplit = s.split(":");
       Node node = new Node();
@@ -151,8 +207,34 @@ public class ExprBench {
       node.setMetaPort(Integer.parseInt(nodeSplit[1]));
       bench.nodeList.add(node);
     }
-    bench.raftFactor = Integer.parseInt(args[7]);
+    String[] raftFactorSplit = args[5].split(",");
+    bench.raftFactors = new int[raftFactorSplit.length];
+    for (int i = 0; i < raftFactorSplit.length; i++) {
+      bench.raftFactors[i] = Integer.parseInt(raftFactorSplit[i]);
+    }
+    if (args.length >= 7) {
+      String[] ratesSplit = args[6].split(",");
+      bench.rateLimits = new int[ratesSplit.length];
+      for (int i = 0; i < ratesSplit.length; i++) {
+        bench.rateLimits[i] = Integer.parseInt(ratesSplit[i]);
+      }
+    }
+
+    List<Node> list = bench.nodeList;
+    for (int i = 0, listSize = list.size(); i < listSize; i++) {
+      Node node = list.get(i);
+      for (int j = 0; j < bench.raftFactors[i]; j++) {
+        EndPoint endPoint = new EndPoint(node, j);
+        bench.endPoints.add(endPoint);
+        bench.latencyMap.put(endPoint, new Statistic());
+        if (bench.rateLimits != null) {
+          bench.rateLimiterMap.put(endPoint, RateLimiter.create(bench.rateLimits[i]));
+        }
+      }
+    }
 
     bench.benchmark();
+
+    System.out.println(bench.latencyMap);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 343cb3d21f..c02f911778 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,24 +19,25 @@
 
 package org.apache.iotdb.cluster.log;
 
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.WeightedList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 /**
  * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
@@ -45,10 +46,13 @@ import java.util.concurrent.TimeUnit;
 public class IndirectLogDispatcher extends LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(IndirectLogDispatcher.class);
-  private Map<Node, List<Node>> directToIndirectFollowerMap;
+
+  private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
 
   public IndirectLogDispatcher(RaftMember member) {
     super(member);
+    recalculateDirectFollowerMap();
+    useBatchInLogCatchUp = false;
   }
 
   @Override
@@ -59,25 +63,101 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
   @Override
   void createQueueAndBindingThreads() {
+    for (Node node : member.getAllNodes()) {
+      if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
+        nodesEnabled.put(node, false);
+        nodesLogQueues.put(node, createQueueAndBindingThread(node));
+      }
+    }
+  }
+
+  @Override
+  public void offer(SendLogRequest request) {
+    super.offer(request);
     recalculateDirectFollowerMap();
   }
 
+  @Override
+  protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
+    SendLogRequest newRequest = new SendLogRequest(request);
+    // copy the RPC request so each request can have different sub-receivers but the same log
+    // binary and other fields
+    newRequest.setAppendEntryRequest(new AppendEntryRequest(newRequest.getAppendEntryRequest()));
+    newRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(node));
+    return newRequest;
+  }
+
   public void recalculateDirectFollowerMap() {
     List<Node> allNodes = new ArrayList<>(member.getAllNodes());
     allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
-    QueryCoordinator instance = QueryCoordinator.getINSTANCE();
-    List<Node> orderedNodes = instance.reorderNodes(allNodes);
-    synchronized (this) {
-      executorService.shutdown();
-      try {
-        executorService.awaitTermination(10, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.warn("Dispatcher thread pool of {} cannot be shutdown within 10s", member);
+    Collections.shuffle(allNodes);
+    List<Node> orderedNodes = allNodes;
+
+    nodesEnabled.clear();
+    directToIndirectFollowerMap.clear();
+
+    if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
+      QueryCoordinator instance = QueryCoordinator.getINSTANCE();
+      orderedNodes = instance.reorderNodes(allNodes);
+      long thisLoad =
+          Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt() + 1;
+      long minLoad =
+          NodeStatusManager.getINSTANCE()
+                  .getNodeStatus(orderedNodes.get(0), false)
+                  .getStatus()
+                  .fanoutRequestNum
+              + 1;
+      double loadFactor = 1.05;
+      WeightedList<Node> firstLevelCandidates = new WeightedList<>();
+      firstLevelCandidates.insert(
+          orderedNodes.get(0),
+          1.0
+              / (NodeStatusManager.getINSTANCE()
+                      .getNodeStatus(orderedNodes.get(0), false)
+                      .getStatus()
+                      .fanoutRequestNum
+                  + 1));
+      int firstLevelSize = 1;
+
+      for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
+        Node orderedNode = orderedNodes.get(i);
+        long nodeLoad =
+            NodeStatusManager.getINSTANCE()
+                    .getNodeStatus(orderedNode, false)
+                    .getStatus()
+                    .fanoutRequestNum
+                + 1;
+        if (nodeLoad * 1.0 <= minLoad * loadFactor) {
+          firstLevelCandidates.insert(orderedNode, 1.0 / nodeLoad);
+        }
+        if (nodeLoad > thisLoad) {
+          firstLevelSize = (int) Math.max(firstLevelSize, nodeLoad / thisLoad);
+        }
+      }
+
+      if (firstLevelSize > firstLevelCandidates.size()) {
+        firstLevelSize = firstLevelCandidates.size();
       }
-      executorService = Executors.newCachedThreadPool();
 
-      directToIndirectFollowerMap = new HashMap<>();
+      List<Node> firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
+
+      Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
+      orderedNodes.removeAll(firstLevelNodes);
+      for (int i = 0; i < orderedNodes.size(); i++) {
+        Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
+        secondLevelNodeMap
+            .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
+            .add(orderedNodes.get(i));
+      }
+
+      for (Node firstLevelNode : firstLevelNodes) {
+        directToIndirectFollowerMap.put(
+            firstLevelNode,
+            secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
+        nodesEnabled.put(firstLevelNode, true);
+      }
+
+    } else {
       for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
         if (i != j) {
           directToIndirectFollowerMap.put(
@@ -85,31 +165,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
         } else {
           directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
         }
+        nodesEnabled.put(orderedNodes.get(i), true);
       }
     }
 
-    for (Node node : directToIndirectFollowerMap.keySet()) {
-      nodesLogQueues.put(node, createQueueAndBindingThread(node));
-    }
-  }
-
-  class DispatcherThread extends LogDispatcher.DispatcherThread {
-
-    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
-      super(receiver, logBlockingDeque);
-    }
-
-    @Override
-    void sendLog(SendLogRequest logRequest) {
-      logRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(receiver));
-      super.sendLog(logRequest);
-    }
-
-    @Override
-    protected AppendEntriesRequest prepareRequest(
-        List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
-      return super.prepareRequest(logList, currBatch, firstIndex)
-          .setSubReceivers(directToIndirectFollowerMap.get(receiver));
-    }
+    logger.debug("New relay map: {}", directToIndirectFollowerMap);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index efa095032a..c2b8b70259 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -50,7 +50,7 @@ public abstract class Log implements Comparable<Log> {
 
   private int byteSize = 0;
 
-  public static int getDefaultBufferSize() {
+  public int getDefaultBufferSize() {
     return DEFAULT_BUFFER_SIZE;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
new file mode 100644
index 0000000000..3441a01432
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+
+public class LogAckSender {
+
+  private static final Logger logger = LoggerFactory.getLogger(LogAckSender.class);
+
+  private ExecutorService ackSenderPool;
+  private RaftNode header;
+  private RaftMember member;
+  private BlockingQueue<AckRequest> requestQueue = new ArrayBlockingQueue<>(4096);
+  private String baseThreadName;
+
+  public LogAckSender(RaftMember member) {
+    this.member = member;
+    this.header = member.getHeader();
+    ackSenderPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(member.getName() + "-ACKSender");
+    ackSenderPool.submit(this::appendAckLeaderTask);
+  }
+
+  public static class AckRequest {
+
+    private Node leader;
+    private long index;
+    private long term;
+    private long response;
+
+    public AckRequest(Node leader, long index, long term, long response) {
+      this.leader = leader;
+      this.index = index;
+      this.term = term;
+      this.response = response;
+    }
+  }
+
+  public void offer(Node leader, long index, long term, long response) {
+    requestQueue.add(new AckRequest(leader, index, term, response));
+  }
+
+  private void appendAckLeaderTask() {
+    List<AckRequest> ackRequestList = new ArrayList<>();
+    baseThreadName = Thread.currentThread().getName();
+    try {
+      while (!Thread.interrupted()) {
+        ackRequestList.clear();
+        synchronized (requestQueue) {
+          AckRequest req = requestQueue.take();
+          ackRequestList.add(req);
+          requestQueue.drainTo(ackRequestList);
+        }
+
+        appendAckLeader(ackRequestList);
+        Thread.sleep(10);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void appendAckLeader(List<AckRequest> requests) {
+    if (requests.isEmpty()) {
+      return;
+    }
+    int index = 0;
+    AckRequest requestToSend = null;
+    for (; index < requests.size(); index++) {
+      if (requestToSend == null) {
+        requestToSend = requests.get(index);
+      } else {
+        AckRequest currRequest = requests.get(index);
+
+        if (requestToSend.term == currRequest.term
+            && requestToSend.index >= currRequest.index
+            && (requestToSend.response == currRequest.response
+                || requestToSend.response == Response.RESPONSE_STRONG_ACCEPT)) {
+          // currRequest has the same response and leader as requestToSend, but has a smaller
+          // index, so it can be covered by requestToSend, ignore it
+          // continue
+        } else if (requestToSend.term == currRequest.term
+            && requestToSend.index < currRequest.index
+            && (requestToSend.response == currRequest.response
+                || currRequest.response == Response.RESPONSE_STRONG_ACCEPT)) {
+          // currRequest has the same response and leader as requestToSend, but has a larger
+          // index, so it can replace requestToSend
+          requestToSend = currRequest;
+        } else {
+          // the requests cannot cover each other, send requestToSend first
+          appendAckLeader(
+              requestToSend.leader,
+              requestToSend.index,
+              requestToSend.term,
+              requestToSend.response);
+          requestToSend = currRequest;
+        }
+      }
+    }
+
+    if (requestToSend != null) {
+      appendAckLeader(
+          requestToSend.leader, requestToSend.index, requestToSend.term, requestToSend.response);
+    }
+  }
+
+  private void appendAckLeader(Node leader, long index, long term, long response) {
+    long ackStartTime = Statistic.RAFT_RECEIVER_APPEND_ACK.getOperationStartTime();
+    AppendEntryResult result = new AppendEntryResult();
+    result.setLastLogIndex(index);
+    result.setLastLogTerm(term);
+    result.status = response;
+    result.setHeader(header);
+    result.setReceiver(member.getThisNode());
+
+    Client syncClient = null;
+    try {
+      if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+        GenericHandler<Void> handler = new GenericHandler<>(leader, null);
+        member.getAsyncClient(leader).acknowledgeAppendEntry(result, handler);
+      } else {
+        syncClient = member.getSyncClient(leader);
+        syncClient.acknowledgeAppendEntry(result);
+      }
+    } catch (TException e) {
+      logger.warn("Cannot send ack of {}-{} to leader {}", index, term, leader, e);
+    } finally {
+      if (syncClient != null) {
+        ClientUtils.putBackSyncClient(syncClient);
+      }
+    }
+    Thread.currentThread().setName(baseThreadName + "-" + index + "-" + response);
+    Statistic.RAFT_SEND_RELAY_ACK.add(1);
+    Statistic.RAFT_RECEIVER_APPEND_ACK.calOperationCostTimeFromStart(ackStartTime);
+  }
+
+  public void stop() {
+    ackSenderPool.shutdownNow();
+    try {
+      ackSenderPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.error("Unexpected interruption when waiting for ackSenderPool to end", e);
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 2410b2feaa..3e659a1468 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -58,6 +60,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -72,8 +75,9 @@ public class LogDispatcher {
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
   RaftMember member;
   private static final ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
-  private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
+  protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
   Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueues = new HashMap<>();
+  Map<Node, Boolean> nodesEnabled = new HashMap<>();
   ExecutorService executorService;
   private static ExecutorService serializationService =
       IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
@@ -81,17 +85,21 @@ public class LogDispatcher {
 
   public static int bindingThreadNum = clusterConfig.getDispatcherBindingThreadNum();
   public static int maxBatchSize = 10;
+  public static AtomicInteger concurrentSenderNum = new AtomicInteger();
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
     executorService =
-        IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + member.getName());
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            bindingThreadNum * (member.getAllNodes().size() - 1),
+            "LogDispatcher-" + member.getName());
     createQueueAndBindingThreads();
   }
 
   void createQueueAndBindingThreads() {
     for (Node node : member.getAllNodes()) {
       if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
+        nodesEnabled.put(node, true);
         nodesLogQueues.put(node, createQueueAndBindingThread(node));
       }
     }
@@ -109,15 +117,27 @@ public class LogDispatcher {
     return byteBuffer;
   }
 
+  protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
+    return request;
+  }
+
   public void offer(SendLogRequest request) {
     // do serialization here to avoid taking LogManager for too long
     if (!nodesLogQueues.isEmpty()) {
-      request.serializedLogFuture = serializationService.submit(() -> serializeTask(request));
+      SendLogRequest finalRequest = request;
+      request.serializedLogFuture = serializationService.submit(() -> serializeTask(finalRequest));
     }
 
     long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
     for (Entry<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueues.entrySet()) {
+      boolean nodeEnabled = this.nodesEnabled.getOrDefault(entry.getKey(), false);
+      if (!nodeEnabled) {
+        continue;
+      }
+
+      request = transformRequest(entry.getKey(), request);
+
       BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
       try {
         boolean addSucceeded;
@@ -173,6 +193,7 @@ public class LogDispatcher {
 
   public static class SendLogRequest {
 
+    private AppendNodeEntryHandler handler;
     private VotingLog votingLog;
     private AtomicBoolean leaderShipStale;
     private AtomicLong newLeaderTerm;
@@ -201,6 +222,7 @@ public class LogDispatcher {
       this.setAppendEntryRequest(request.appendEntryRequest);
       this.setQuorumSize(request.quorumSize);
       this.setEnqueueTime(request.enqueueTime);
+      this.serializedLogFuture = request.serializedLogFuture;
     }
 
     public VotingLog getVotingLog() {
@@ -265,28 +287,27 @@ public class LogDispatcher {
     private Peer peer;
     Client syncClient;
     AsyncClient asyncClient;
+    private String baseName;
 
     DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
       this.receiver = receiver;
       this.logBlockingDeque = logBlockingDeque;
-      this.peer =
-          member
-              .getPeerMap()
-              .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
+      this.peer = member.getPeer(receiver);
       if (!clusterConfig.isUseAsyncServer()) {
         syncClient = member.getSyncClient(receiver);
       }
+      baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
     }
 
     @Override
     public void run() {
-      Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
+      Thread.currentThread().setName(baseName);
       try {
         while (!Thread.interrupted()) {
           synchronized (logBlockingDeque) {
             SendLogRequest poll = logBlockingDeque.take();
             currBatch.add(poll);
-            if (maxBatchSize > 1) {
+            if (maxBatchSize > 1 && useBatchInLogCatchUp) {
               logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
             }
           }
@@ -310,6 +331,8 @@ public class LogDispatcher {
       for (SendLogRequest request : currBatch) {
         Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
             request.getVotingLog().getLog().getEnqueueTime());
+        Statistic.LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE.calOperationCostTimeFromStart(
+            request.getVotingLog().getLog().getCreateTime());
         long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
         request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
         Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
@@ -427,6 +450,8 @@ public class LogDispatcher {
           sendLogs(currBatch);
         } else {
           for (SendLogRequest batch : currBatch) {
+            Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+                batch.getVotingLog().getLog().getCreateTime());
             sendLog(batch);
           }
         }
@@ -442,21 +467,29 @@ public class LogDispatcher {
               receiver,
               logRequest.leaderShipStale,
               logRequest.newLeaderTerm,
-              peer,
               logRequest.quorumSize);
       // TODO add async interface
       int retries = 5;
       try {
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         for (int i = 0; i < retries; i++) {
-          logRequest.getVotingLog().getFailedNodeIds().remove(receiver.nodeIdentifier);
-          logRequest.getVotingLog().getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
+          int concurrentSender = concurrentSenderNum.incrementAndGet();
+          Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
           AppendEntryResult result = syncClient.appendEntry(logRequest.appendEntryRequest);
+          concurrentSenderNum.decrementAndGet();
           if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
             Thread.sleep(100);
+            Statistic.RAFT_SENDER_OOW.add(1);
           } else {
-            Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+            long sendLogTime =
+                Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+            NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(receiver, false);
+            nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+            nodeStatus.getSendEntryNum().incrementAndGet();
+
+            long handleStart = Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.getOperationStartTime();
             handler.onComplete(result);
+            Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.calOperationCostTimeFromStart(handleStart);
             break;
           }
         }
@@ -477,7 +510,6 @@ public class LogDispatcher {
               receiver,
               logRequest.leaderShipStale,
               logRequest.newLeaderTerm,
-              peer,
               logRequest.quorumSize);
 
       AsyncClient client = member.getAsyncClient(receiver);
@@ -491,11 +523,15 @@ public class LogDispatcher {
     }
 
     void sendLog(SendLogRequest logRequest) {
+      Thread.currentThread()
+          .setName(baseName + "-" + logRequest.getVotingLog().getLog().getCurrLogIndex());
       if (clusterConfig.isUseAsyncServer()) {
         sendLogAsync(logRequest);
       } else {
         sendLogSync(logRequest);
       }
+      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
+          logRequest.getVotingLog().getLog().getCreateTime());
     }
 
     class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
@@ -511,7 +547,6 @@ public class LogDispatcher {
                   receiver,
                   sendLogRequest.getLeaderShipStale(),
                   sendLogRequest.getNewLeaderTerm(),
-                  peer,
                   sendLogRequest.getQuorumSize());
           singleEntryHandlers.add(handler);
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 69645586d7..5972fba588 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -25,18 +25,22 @@ import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /** LogRelay is used by followers to forward entries from the leader to other followers. */
 public class LogRelay {
 
+  private static final Logger logger = LoggerFactory.getLogger(LogRelay.class);
+
   private ConcurrentSkipListSet<RelayEntry> entryHeap = new ConcurrentSkipListSet<>();
   private static final int RELAY_NUMBER =
       ClusterDescriptor.getInstance().getConfig().getRelaySenderNum();
@@ -46,11 +50,8 @@ public class LogRelay {
   public LogRelay(RaftMember raftMember) {
     this.raftMember = raftMember;
     relaySenders =
-        Executors.newFixedThreadPool(
-            RELAY_NUMBER,
-            new ThreadFactoryBuilder()
-                .setNameFormat(raftMember.getName() + "-RelaySender-%d")
-                .build());
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            RELAY_NUMBER, raftMember.getName() + "-RelaySender");
     for (int i = 0; i < RELAY_NUMBER; i++) {
       relaySenders.submit(new RelayThread());
     }
@@ -65,6 +66,7 @@ public class LogRelay {
   }
 
   private void offer(RelayEntry entry) {
+    long operationStartTime = Statistic.RAFT_SENDER_RELAY_OFFER_LOG.getOperationStartTime();
     synchronized (entryHeap) {
       while (entryHeap.size()
           > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
@@ -77,6 +79,7 @@ public class LogRelay {
       entryHeap.add(entry);
       entryHeap.notifyAll();
     }
+    Statistic.RAFT_SENDER_RELAY_OFFER_LOG.calOperationCostTimeFromStart(operationStartTime);
   }
 
   public void offer(AppendEntriesRequest request, List<Node> receivers) {
@@ -87,6 +90,7 @@ public class LogRelay {
 
     @Override
     public void run() {
+      String baseName = Thread.currentThread().getName();
       while (!Thread.interrupted()) {
         RelayEntry relayEntry;
         synchronized (entryHeap) {
@@ -102,9 +106,25 @@ public class LogRelay {
           }
         }
 
+        logger.debug("Relaying {}", relayEntry);
+
         if (relayEntry.singleRequest != null) {
+          Thread.currentThread()
+              .setName(
+                  baseName
+                      + "-"
+                      + (relayEntry.singleRequest.prevLogIndex + 1)
+                      + "-"
+                      + relayEntry.receivers);
           raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
         } else if (relayEntry.batchRequest != null) {
+          Thread.currentThread()
+              .setName(
+                  baseName
+                      + "-"
+                      + (relayEntry.batchRequest.prevLogIndex + 1)
+                      + "-"
+                      + relayEntry.receivers);
           raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
         }
 
@@ -138,6 +158,13 @@ public class LogRelay {
       return 0;
     }
 
+    @Override
+    public String toString() {
+      long index = singleRequest != null ? singleRequest.prevLogIndex : batchRequest.prevLogIndex;
+      index++;
+      return "RelayEntry{" + index + "," + receivers + "}";
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -153,7 +180,7 @@ public class LogRelay {
 
     @Override
     public int hashCode() {
-      return Objects.hash(singleRequest);
+      return Objects.hash(singleRequest, batchRequest);
     }
 
     @Override
@@ -161,4 +188,12 @@ public class LogRelay {
       return Long.compare(this.getIndex(), o.getIndex());
     }
   }
+
+  public RelayEntry first() {
+    try {
+      return entryHeap.isEmpty() ? null : entryHeap.first();
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 59bc40bf25..7b1dde2d3e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -20,14 +20,20 @@
 package org.apache.iotdb.cluster.log;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 
 public class VotingLogList {
 
+  private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
+
   private List<VotingLog> logList = new ArrayList<>();
   private volatile long currTerm = -1;
   private int quorumSize;
@@ -63,6 +69,7 @@ public class VotingLogList {
    * @return the lastly removed entry if any.
    */
   public void onStronglyAccept(long index, long term, int acceptingNodeId) {
+    logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNodeId);
     int lastEntryIndexToCommit = -1;
 
     List<VotingLog> acceptedLogs;
@@ -93,6 +100,15 @@ public class VotingLogList {
     }
 
     if (lastEntryIndexToCommit != -1) {
+      Log lastLog = acceptedLogs.get(acceptedLogs.size() - 1).log;
+      synchronized (member.getLogManager()) {
+        try {
+          member.getLogManager().commitTo(lastLog.getCurrLogIndex());
+        } catch (LogExecutionException e) {
+          logger.error("Fail to commit {}", lastLog, e);
+        }
+      }
+
       for (VotingLog acceptedLog : acceptedLogs) {
         synchronized (acceptedLog) {
           acceptedLog.acceptedTime.set(System.nanoTime());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
index d2d9056a16..a08b542f93 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.cluster.log.appender;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -30,6 +32,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.Buffer;
 import java.util.List;
 
 /**
@@ -57,24 +60,28 @@ public class BlockingLogAppender implements LogAppender {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  public AppendEntryResult appendEntry(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
-    long resp = checkPrevLogIndex(prevLogIndex);
+  public AppendEntryResult appendEntry(AppendEntryRequest request, Log log) {
+    long resp = checkPrevLogIndex(request.prevLogIndex);
     if (resp != Response.RESPONSE_AGREE) {
       return new AppendEntryResult(resp).setHeader(member.getHeader());
     }
 
-    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     long success;
     AppendEntryResult result = new AppendEntryResult();
     synchronized (logManager) {
-      success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+      success =
+          logManager.maybeAppend(
+              request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
       if (success != -1) {
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
+        if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty()) {
+          request.entry.rewind();
+          member.getLogRelay().offer(request, request.subReceivers);
+        }
       }
     }
-    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+
     if (success != -1) {
       logger.debug("{} append a new log {}", member.getName(), log);
       result.status = Response.RESPONSE_STRONG_ACCEPT;
@@ -82,6 +89,7 @@ public class BlockingLogAppender implements LogAppender {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
     }
+    result.setHeader(request.getHeader());
     return result;
   }
 
@@ -134,36 +142,43 @@ public class BlockingLogAppender implements LogAppender {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  public AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+  public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs) {
     logger.debug(
         "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
         member.getName(),
-        prevLogIndex,
-        prevLogTerm,
-        leaderCommit);
+        request.prevLogIndex,
+        request.prevLogTerm,
+        request.leaderCommit);
     if (logs.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(member.getHeader());
     }
 
-    long resp = checkPrevLogIndex(prevLogIndex);
+    long resp = checkPrevLogIndex(request.prevLogIndex);
     if (resp != Response.RESPONSE_AGREE) {
       return new AppendEntryResult(resp).setHeader(member.getHeader());
     }
 
     AppendEntryResult result = new AppendEntryResult();
     synchronized (logManager) {
-      long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
-      resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
-      Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+      resp =
+          logManager.maybeAppend(
+              request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
       if (resp != -1) {
         if (logger.isDebugEnabled()) {
           logger.debug(
-              "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
+              "{} append a new log list {}, commit to {}",
+              member.getName(),
+              logs,
+              request.leaderCommit);
         }
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
+
+        if (request.isSetSubReceivers()) {
+          request.entries.forEach(Buffer::rewind);
+          member.getLogRelay().offer(request, request.subReceivers);
+        }
       } else {
         // the incoming log points to an illegal position, reject it
         result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
index 01f16daf09..ca0eea3bf0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.cluster.log.appender;
 
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 
 import java.util.List;
@@ -30,10 +32,9 @@ import java.util.List;
  */
 public interface LogAppender {
 
-  AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs);
+  AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs);
 
-  AppendEntryResult appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log);
+  AppendEntryResult appendEntry(AppendEntryRequest request, Log log);
 
   void reset();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index f2a19e8039..1ab08449bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -22,17 +22,24 @@ package org.apache.iotdb.cluster.log.appender;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.Buffer;
 import java.util.Arrays;
 import java.util.List;
 
 public class SlidingWindowLogAppender implements LogAppender {
 
+  private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLogAppender.class);
+
   private int windowCapacity = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
   private int windowLength = 0;
   private Log[] logWindow = new Log[windowCapacity];
@@ -115,25 +122,43 @@ public class SlidingWindowLogAppender implements LogAppender {
 
     // flush [0, flushPos)
     List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+    logger.debug(
+        "Flushing {} entries to log, first {}, last {}",
+        logs.size(),
+        logs.get(0),
+        logs.get(logs.size() - 1));
     long success =
         logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
     if (success != -1) {
-      System.arraycopy(logWindow, flushPos, logWindow, 0, windowCapacity - flushPos);
-      System.arraycopy(prevTerms, flushPos, prevTerms, 0, windowCapacity - flushPos);
-      for (int i = 1; i <= flushPos; i++) {
-        logWindow[windowCapacity - i] = null;
-      }
+      moveWindowRightward(flushPos);
     }
-    firstPosPrevIndex = logManager.getLastLogIndex();
     result.status = Response.RESPONSE_STRONG_ACCEPT;
     result.setLastLogIndex(firstPosPrevIndex);
     result.setLastLogTerm(logManager.getLastLogTerm());
     return success;
   }
 
+  private void moveWindowRightward(int step) {
+    System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
+    System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
+    for (int i = 1; i <= step; i++) {
+      logWindow[windowCapacity - i] = null;
+    }
+    firstPosPrevIndex = logManager.getLastLogIndex();
+  }
+
+  private void moveWindowLeftward(int step) {
+    int length = Math.max(windowCapacity - step, 0);
+    System.arraycopy(logWindow, 0, logWindow, step, length);
+    System.arraycopy(prevTerms, 0, prevTerms, step, length);
+    for (int i = 0; i < length; i++) {
+      logWindow[i] = null;
+    }
+    firstPosPrevIndex = logManager.getLastLogIndex();
+  }
+
   @Override
-  public AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+  public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs) {
     if (logs.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE)
           .setHeader(member.getPartitionGroup().getHeader());
@@ -141,24 +166,56 @@ public class SlidingWindowLogAppender implements LogAppender {
 
     AppendEntryResult result = null;
     for (Log log : logs) {
-      result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
+      result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
 
       if (result.status != Response.RESPONSE_AGREE
           && result.status != Response.RESPONSE_STRONG_ACCEPT
           && result.status != Response.RESPONSE_WEAK_ACCEPT) {
         return result;
       }
-      prevLogIndex = log.getCurrLogIndex();
-      prevLogTerm = log.getCurrLogTerm();
+      request.prevLogIndex = log.getCurrLogIndex();
+      request.prevLogTerm = log.getCurrLogTerm();
+    }
+    if (request.isSetSubReceivers()) {
+      request.entries.forEach(Buffer::rewind);
+      member.getLogRelay().offer(request, request.subReceivers);
     }
 
     return result;
   }
 
   @Override
-  public AppendEntryResult appendEntry(
+  public AppendEntryResult appendEntry(AppendEntryRequest request, Log log) {
+
+    AppendEntryResult result = null;
+    long start = System.currentTimeMillis();
+    long retryTime = 0;
+    long maxRetry = 10000;
+    while (result == null
+        || result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) {
+      result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
+      retryTime = System.currentTimeMillis() - start;
+      if (result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+    result.setHeader(request.getHeader());
+
+    if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty()) {
+      request.entry.rewind();
+      member.getLogRelay().offer(request, request.subReceivers);
+    }
+
+    return result;
+  }
+
+  private AppendEntryResult appendEntry(
       long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
-    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     long appendedPos = 0;
 
     AppendEntryResult result = new AppendEntryResult();
@@ -170,6 +227,7 @@ public class SlidingWindowLogAppender implements LogAppender {
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
+        moveWindowLeftward(-windowPos);
       } else if (windowPos < windowCapacity) {
         // the new entry falls into the window
         logWindow[windowPos] = log;
@@ -186,14 +244,12 @@ public class SlidingWindowLogAppender implements LogAppender {
 
         Statistic.RAFT_WINDOW_LENGTH.add(windowLength);
       } else {
-        Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
         result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
         result.setHeader(member.getPartitionGroup().getHeader());
         return result;
       }
     }
 
-    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
     if (appendedPos == -1) {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index 1f3882b398..5d5793d350 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.log.logtypes;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.slf4j.Logger;
@@ -46,6 +47,16 @@ public class PhysicalPlanLog extends Log {
     this.plan = plan;
   }
 
+  @Override
+  public int getDefaultBufferSize() {
+    if (plan instanceof DummyPlan) {
+      int workloadSize =
+          ((DummyPlan) plan).getWorkload() == null ? 0 : ((DummyPlan) plan).getWorkload().length;
+      return workloadSize + 512;
+    }
+    return DEFAULT_BUFFER_SIZE;
+  }
+
   @Override
   public ByteBuffer serialize() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 4b2309281d..406245582c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -682,8 +682,15 @@ public abstract class RaftLogManager {
           "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
           unappliedLogSize);
       try {
-        Thread.sleep(
-            unappliedLogSize - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+        synchronized (changeApplyCommitIndexCond) {
+          changeApplyCommitIndexCond.wait(
+              Math.min(
+                  (unappliedLogSize
+                              - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem())
+                          / 10
+                      + 1,
+                  1000));
+        }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -954,7 +961,7 @@ public abstract class RaftLogManager {
   }
 
   public void checkAppliedLogIndex() {
-    while (!Thread.currentThread().isInterrupted()) {
+    while (!Thread.interrupted()) {
       try {
         doCheckAppliedLogIndex();
       } catch (IndexOutOfBoundsException e) {
@@ -976,7 +983,7 @@ public abstract class RaftLogManager {
           || nextToCheckIndex > getCommittedEntryManager().getLastIndex()
           || (blockAppliedCommitIndex > 0 && blockAppliedCommitIndex < nextToCheckIndex)) {
         // avoid spinning
-        Thread.sleep(0);
+        Thread.sleep(100);
         return;
       }
       Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
@@ -992,7 +999,7 @@ public abstract class RaftLogManager {
         synchronized (log) {
           while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
             // wait until the log is applied or a newer snapshot is installed
-            log.wait(1);
+            log.wait(10);
           }
         }
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
index 21ec9f92ad..8353dda742 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
@@ -37,7 +37,8 @@ public class QueryCoordinator {
   private static final QueryCoordinator INSTANCE = new QueryCoordinator();
   private static final NodeStatusManager STATUS_MANAGER = NodeStatusManager.getINSTANCE();
 
-  private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
+  private final Comparator<Node> nodeComparator =
+      Comparator.comparingLong(o -> getNodeStatus(o).getStatus().fanoutRequestNum);
 
   private QueryCoordinator() {
     // singleton class
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 7e703a6dc1..7f8c7df4f4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -56,8 +55,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   protected AtomicLong receiverTerm;
   protected VotingLog log;
   protected AtomicBoolean leaderShipStale;
-  protected Node receiver;
-  protected Peer peer;
+  protected Node directReceiver;
   protected int quorumSize;
 
   // nano start time when the send begins
@@ -79,8 +77,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       // the request already failed
       return;
     }
+
+    Node trueReceiver = response.isSetReceiver() ? response.receiver : directReceiver;
+
     logger.debug(
-        "{}: Append response {} from {} for log {}", member.getName(), response, receiver, log);
+        "{}: Append response {} from {} for log {}", member.getName(), response, trueReceiver, log);
     if (leaderShipStale.get()) {
       // someone has rejected this log because the leadership is stale
       return;
@@ -94,8 +95,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
           .onStronglyAccept(
               log.getLog().getCurrLogIndex(),
               log.getLog().getCurrLogTerm(),
-              receiver.nodeIdentifier);
-      peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
+              trueReceiver.nodeIdentifier);
+      member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex);
     } else if (resp > 0) {
       // a response > 0 is the follower's term
       // the leader ship is stale, wait for the new leader's heartbeat
@@ -103,7 +104,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       logger.debug(
           "{}: Received a rejection from {} because term is stale: {}/{}, log: {}",
           member.getName(),
-          receiver,
+          trueReceiver,
           prevReceiverTerm,
           resp,
           log);
@@ -120,14 +121,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (log) {
-        log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
+        log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier);
         if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
             >= quorumSize) {
           log.acceptedTime.set(System.nanoTime());
-          if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
-            member.removeAppendLogHandler(
-                new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
-          }
         }
         log.notifyAll();
       }
@@ -135,13 +132,20 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       // e.g., Response.RESPONSE_LOG_MISMATCH
       if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) {
         logger.debug(
-            "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+            "{}: The log {} is rejected by {} because: {}",
+            member.getName(),
+            log,
+            trueReceiver,
+            resp);
       } else {
         logger.warn(
-            "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+            "{}: The log {} is rejected by {} because: {}",
+            member.getName(),
+            log,
+            trueReceiver,
+            resp);
+        onFail(trueReceiver);
       }
-
-      onFail();
     }
     // rejected because the receiver's logs are stale or the receiver has no cluster info, just
     // wait for the heartbeat to handle
@@ -154,17 +158,18 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
           "{}: Cannot append log {}: cannot connect to {}: {}",
           member.getName(),
           log,
-          receiver,
+          directReceiver,
           exception.getMessage());
     } else {
-      logger.warn("{}: Cannot append log {} to {}", member.getName(), log, receiver, exception);
+      logger.warn(
+          "{}: Cannot append log {} to {}", member.getName(), log, directReceiver, exception);
     }
-    onFail();
+    onFail(directReceiver);
   }
 
-  private void onFail() {
+  private void onFail(Node trueReceiver) {
     synchronized (log) {
-      log.getFailedNodeIds().add(receiver.nodeIdentifier);
+      log.getFailedNodeIds().add(trueReceiver.nodeIdentifier);
       if (log.getFailedNodeIds().size() > quorumSize) {
         // quorum members have failed, there is no need to wait for others
         log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
@@ -189,12 +194,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     this.leaderShipStale = leaderShipStale;
   }
 
-  public void setPeer(Peer peer) {
-    this.peer = peer;
-  }
-
-  public void setReceiver(Node follower) {
-    this.receiver = follower;
+  public void setDirectReceiver(Node follower) {
+    this.directReceiver = follower;
   }
 
   public void setReceiverTerm(AtomicLong receiverTerm) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index af4fbc5445..f89cd56241 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -53,8 +53,8 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
   @Override
   public void onComplete(HeartBeatResponse resp) {
     long followerTerm = resp.getTerm();
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (logger.isTraceEnabled()) {
+      logger.trace(
           "{}: Received a heartbeat response {} for last log index {}",
           memberName,
           followerTerm,
@@ -89,8 +89,8 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
     long lastLogTerm = resp.getLastLogTerm();
     long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
     long localLastLogTerm = localMember.getLogManager().getLastLogTerm();
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (logger.isTraceEnabled()) {
+      logger.trace(
           "{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
           memberName,
           follower,
@@ -100,11 +100,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
           localLastLogTerm);
     }
 
-    Peer peer =
-        localMember
-            .getPeerMap()
-            .computeIfAbsent(
-                follower, k -> new Peer(localMember.getLogManager().getLastLogIndex()));
+    Peer peer = localMember.getPeer(follower);
     if (!localMember.getLogManager().isLogUpToDate(lastLogTerm, lastLogIdx)
         || !localMember.getLogManager().matchTerm(lastLogTerm, lastLogIdx)) {
       // the follower is not up-to-date
@@ -119,7 +115,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
       if (lastLogIdx == peer.getLastHeartBeatIndex()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 10) {
+        if (inconsistentNum >= 1000) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index c5032966ea..acc34cf6c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -101,7 +101,7 @@ public class HeartbeatThread implements Runnable {
               localMember.setCharacter(NodeCharacter.ELECTOR);
               localMember.setLeader(ClusterConstant.EMPTY_NODE);
             } else {
-              logger.debug(
+              logger.trace(
                   "{}: Heartbeat from leader {} is still valid",
                   memberName,
                   localMember.getLeader());
@@ -158,7 +158,7 @@ public class HeartbeatThread implements Runnable {
   @SuppressWarnings("java:S2445")
   private void sendHeartbeats(Collection<Node> nodes) {
     if (logger.isDebugEnabled()) {
-      logger.debug(
+      logger.trace(
           "{}: Send heartbeat to {} followers, commit log index = {}",
           memberName,
           nodes.size() - 1,
@@ -200,7 +200,7 @@ public class HeartbeatThread implements Runnable {
     if (client != null) {
       // connecting to the local node results in a null
       try {
-        logger.debug("{}: Sending heartbeat to {}", memberName, node);
+        logger.trace("{}: Sending heartbeat to {}", memberName, node);
         client.sendHeartbeat(request, new HeartbeatHandler(localMember, node));
       } catch (Exception e) {
         logger.warn("{}: Cannot send heart beat to node {}", memberName, node, e);
@@ -231,7 +231,7 @@ public class HeartbeatThread implements Runnable {
               Client client = localMember.getSyncHeartbeatClient(node);
               if (client != null) {
                 try {
-                  logger.debug("{}: Sending heartbeat to {}", memberName, node);
+                  logger.trace("{}: Sending heartbeat to {}", memberName, node);
                   HeartBeatResponse heartBeatResponse = client.sendHeartbeat(req);
                   heartbeatHandler.onComplete(heartBeatResponse);
                 } catch (TTransportException e) {
@@ -266,6 +266,16 @@ public class HeartbeatThread implements Runnable {
       logger.info("{}: Winning the election because the node is the only node.", memberName);
     }
 
+    if (!ClusterUtils.isNodeEquals(
+        localMember.getThisNode(), localMember.getPartitionGroup().getHeader().node)) {
+      long electionWait = getElectionRandomWaitMs();
+      logger.info(
+          "{}: Sleep {}ms before the first election as this node is not the preferred " + "leader",
+          memberName,
+          electionWait);
+      Thread.sleep(electionWait);
+    }
+
     // the election goes on until this node becomes a follower or a leader
     while (localMember.getCharacter() == NodeCharacter.ELECTOR) {
       startElection();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index c43401af57..3114182960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -1006,7 +1006,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
         NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
         lastHeartbeatReceivedTime,
         prevLastLogIndex,
-        logManager.getMaxHaveAppliedCommitIndex());
+        logManager.getMaxHaveAppliedCommitIndex(),
+        logRelay != null ? logRelay.first() : null);
   }
 
   @TestOnly
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index e07452cbd1..637ce44542 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1800,7 +1800,8 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
         readOnly,
         lastHeartbeatReceivedTime,
         prevLastLogIndex,
-        logManager.getMaxHaveAppliedCommitIndex());
+        logManager.getMaxHaveAppliedCommitIndex(),
+        logRelay != null ? logRelay.first() : null);
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4df17e12e9..db75f3b1c2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogAckSender;
 import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
@@ -72,6 +73,7 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -110,7 +112,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
-import java.nio.Buffer;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -132,6 +133,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
 
 /**
  * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
@@ -287,6 +289,10 @@ public abstract class RaftMember implements RaftMemberMBean {
 
   protected LogRelay logRelay;
 
+  protected LogAckSender ackSender;
+
+  private ThreadLocal<String> threadBaseName = new ThreadLocal<>();
+
   protected RaftMember() {}
 
   protected RaftMember(String name, ClientManager clientManager) {
@@ -325,6 +331,8 @@ public abstract class RaftMember implements RaftMemberMBean {
             getName() + "-SerialToParallel");
     commitLogPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("RaftCommitLog");
 
+    ackSender = new LogAckSender(this);
+
     if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
       logRelay = new LogRelay(this);
     }
@@ -416,6 +424,10 @@ public abstract class RaftMember implements RaftMemberMBean {
     if (logRelay != null) {
       logRelay.stop();
     }
+    if (ackSender != null) {
+      ackSender.stop();
+    }
+
     leader.set(ClusterConstant.EMPTY_NODE);
     catchUpService = null;
     heartBeatService = null;
@@ -572,16 +584,22 @@ public abstract class RaftMember implements RaftMemberMBean {
     return Response.RESPONSE_AGREE;
   }
 
+  private String getThreadBaseName() {
+    if (threadBaseName.get() == null) {
+      threadBaseName.set(Thread.currentThread().getName());
+    }
+    return threadBaseName.get();
+  }
   /**
    * Process an AppendEntryRequest. First check the term of the leader, then parse the log and
    * finally see if we can find a position to append the log.
    */
   public AppendEntryResult appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+    long operationStartTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
+    Thread.currentThread()
+        .setName(getThreadBaseName() + "-appending-" + (request.prevLogIndex + 1));
     AppendEntryResult result = appendEntryInternal(request);
-    if (request.isSetSubReceivers()) {
-      request.entry.rewind();
-      logRelay.offer(request, request.subReceivers);
-    }
+    Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(operationStartTime);
     return result;
   }
 
@@ -600,54 +618,19 @@ public abstract class RaftMember implements RaftMemberMBean {
     log.setByteSize(logByteSize);
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
-    AppendEntryResult result =
-        getLogAppender()
-            .appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
-    result.setHeader(request.getHeader());
+    long appendStartTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    AppendEntryResult result = getLogAppender().appendEntry(request, log);
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
 
     logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result.status);
 
     if (!request.isFromLeader) {
-      appendAckLeader(request.leader, log, result.status);
-      Statistic.RAFT_SEND_RELAY_ACK.add(1);
+      ackSender.offer(request.leader, log.getCurrLogIndex(), log.getCurrLogTerm(), result.status);
     }
 
     return result;
   }
 
-  private void appendAckLeader(Node leader, Log log, long response) {
-    AppendEntryResult result = new AppendEntryResult();
-    result.setLastLogIndex(log.getCurrLogIndex());
-    result.setLastLogTerm(log.getCurrLogTerm());
-    result.status = response;
-    result.setHeader(getHeader());
-
-    Client syncClient = null;
-    try {
-      if (config.isUseAsyncServer()) {
-        GenericHandler<Void> handler = new GenericHandler<>(leader, null);
-        getAsyncClient(leader).acknowledgeAppendEntry(result, handler);
-      } else {
-        syncClient = getSyncClient(leader);
-        syncClient.acknowledgeAppendEntry(result);
-      }
-    } catch (TException e) {
-      logger.warn("Cannot send ack of {} to leader {}", log, leader, e);
-    } finally {
-      if (syncClient != null) {
-        ClientUtils.putBackSyncClient(syncClient);
-      }
-    }
-  }
-
-  public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers)
-      throws UnknownLogTypeException {
-    AppendEntryResult result = appendEntry(request);
-    request.entry.rewind();
-    logRelay.offer(request, subFollowers);
-    return result;
-  }
-
   public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
     request.setIsFromLeader(false);
     request.setSubReceiversIsSet(false);
@@ -658,8 +641,19 @@ public abstract class RaftMember implements RaftMemberMBean {
           getAsyncClient(subFollower)
               .appendEntry(request, new IndirectAppendHandler(subFollower, request));
         } else {
+          long operationStartTime = Statistic.RAFT_SENDER_RELAY_LOG.getOperationStartTime();
           syncClient = getSyncClient(subFollower);
+
+          int concurrentSender = concurrentSenderNum.incrementAndGet();
+          Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
           syncClient.appendEntry(request);
+          concurrentSenderNum.decrementAndGet();
+
+          long sendLogTime =
+              Statistic.RAFT_SENDER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
+          NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
+          nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+          nodeStatus.getSendEntryNum().incrementAndGet();
         }
       } catch (TException e) {
         logger.error("Cannot send {} to {}", request, subFollower, e);
@@ -697,12 +691,7 @@ public abstract class RaftMember implements RaftMemberMBean {
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   public AppendEntryResult appendEntries(AppendEntriesRequest request)
       throws UnknownLogTypeException {
-    AppendEntryResult result = appendEntriesInternal(request);
-    if (request.isSetSubReceivers()) {
-      request.entries.forEach(Buffer::rewind);
-      logRelay.offer(request, request.subReceivers);
-    }
-    return result;
+    return appendEntriesInternal(request);
   }
 
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
@@ -736,9 +725,10 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
-    response =
-        getLogAppender()
-            .appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+    long appendStartTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    response = getLogAppender().appendEntries(request, logs);
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
+
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{} AppendEntriesRequest of log size {} completed with result {}",
@@ -750,7 +740,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     if (!request.isFromLeader) {
       // TODO: use batch ack
       for (Log log : logs) {
-        appendAckLeader(request.leader, log, response.status);
+        ackSender.offer(
+            request.leader, log.getCurrLogIndex(), log.getCurrLogTerm(), response.status);
       }
       Statistic.RAFT_SEND_RELAY_ACK.add(1);
     }
@@ -771,12 +762,11 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer,
       int quorumSize) {
     AsyncClient client = getSendLogAsyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
-          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
+          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
       try {
         client.appendEntry(request, handler);
         logger.debug("{} sending a log to {}: {}", name, node, log);
@@ -1414,8 +1404,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
   }
 
-  public Map<Node, Peer> getPeerMap() {
-    return peerMap;
+  public Peer getPeer(Node node) {
+    return peerMap.computeIfAbsent(node, r -> new Peer(getLogManager().getLastLogIndex()));
   }
 
   /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
@@ -1752,9 +1742,10 @@ public abstract class RaftMember implements RaftMemberMBean {
     int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
     long nextTimeToPrint = 5000;
 
-    long waitStart = System.currentTimeMillis();
+    long waitStart = System.nanoTime();
     long alreadyWait = 0;
 
+    String threadBaseName = Thread.currentThread().getName();
     synchronized (log) {
       while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
           || stronglyAcceptedNodeNum < quorumSize
@@ -1769,7 +1760,16 @@ public abstract class RaftMember implements RaftMemberMBean {
           Thread.currentThread().interrupt();
           logger.warn("Unexpected interruption when sending a log", e);
         }
-        alreadyWait = System.currentTimeMillis() - waitStart;
+        Thread.currentThread()
+            .setName(
+                threadBaseName
+                    + "-waiting-"
+                    + log.getLog().getCurrLogIndex()
+                    + "-"
+                    + log.getStronglyAcceptedNodeIds()
+                    + "-"
+                    + log.getWeaklyAcceptedNodeIds());
+        alreadyWait = (System.nanoTime() - waitStart) / 1000000;
         if (alreadyWait > nextTimeToPrint) {
           logger.info(
               "Still not receive enough votes for {}, strongly accepted {}, weakly "
@@ -1780,6 +1780,7 @@ public abstract class RaftMember implements RaftMemberMBean {
               log.getStronglyAcceptedNodeIds(),
               log.getWeaklyAcceptedNodeIds(),
               votingLogList.size(),
+              alreadyWait,
               (log.getLog().getSequenceStartTime() - waitStart) / 1000000,
               (log.getLog().getEnqueueTime() - waitStart) / 1000000,
               (log.acceptedTime.get() - waitStart) / 1000000);
@@ -1790,6 +1791,7 @@ public abstract class RaftMember implements RaftMemberMBean {
         totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
       }
     }
+    Thread.currentThread().setName(threadBaseName);
 
     if (alreadyWait > 15000) {
       logger.info(
@@ -1820,6 +1822,7 @@ public abstract class RaftMember implements RaftMemberMBean {
                 || (totalAccepted < quorumSize)
                 || votingLogList.size() > config.getMaxNumOfLogsInMem())
             && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE))) {
+
       waitAppendResultLoop(log, quorumSize);
     }
     stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
@@ -2177,9 +2180,9 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
 
     if (config.isUseAsyncServer()) {
-      sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+      sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize);
     } else {
-      sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+      sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize);
     }
   }
 
@@ -2217,12 +2220,11 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer,
       int quorumSize) {
     Client client = getSyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
-          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
+          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -2248,14 +2250,12 @@ public abstract class RaftMember implements RaftMemberMBean {
       Node node,
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
-      Peer peer,
       int quorumSize) {
     AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
-    handler.setReceiver(node);
+    handler.setDirectReceiver(node);
     handler.setLeaderShipStale(leaderShipStale);
     handler.setLog(log);
     handler.setMember(this);
-    handler.setPeer(peer);
     handler.setReceiverTerm(newLeaderTerm);
     handler.setQuorumSize(quorumSize);
     if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
@@ -2283,26 +2283,22 @@ public abstract class RaftMember implements RaftMemberMBean {
   private long checkRequestTerm(long leaderTerm, Node leader) {
     long localTerm;
 
-    synchronized (logManager) {
-      // if the request comes before the heartbeat arrives, the local term may be smaller than the
-      // leader term
-      localTerm = term.get();
-      if (leaderTerm < localTerm) {
-        logger.debug(
-            "{} rejected the AppendEntriesRequest for term: {}/{}", name, leaderTerm, localTerm);
-        return localTerm;
+    // if the request comes before the heartbeat arrives, the local term may be smaller than the
+    // leader term
+    localTerm = term.get();
+    if (leaderTerm < localTerm) {
+      logger.debug(
+          "{} rejected the AppendEntriesRequest for term: {}/{}", name, leaderTerm, localTerm);
+      return localTerm;
+    } else {
+      if (leaderTerm > localTerm) {
+        stepDown(leaderTerm, true);
       } else {
-        if (leaderTerm > localTerm) {
-          stepDown(leaderTerm, true);
-        } else {
-          lastHeartbeatReceivedTime = System.currentTimeMillis();
-        }
-        setLeader(leader);
-        if (character != NodeCharacter.FOLLOWER) {
-          term.notifyAll();
-        }
+        lastHeartbeatReceivedTime = System.currentTimeMillis();
       }
+      setLeader(leader);
     }
+
     logger.debug("{} accepted the AppendEntryRequest for term: {}", name, localTerm);
     return Response.RESPONSE_AGREE;
   }
@@ -2324,17 +2320,19 @@ public abstract class RaftMember implements RaftMemberMBean {
    * @param ack acknowledgement from an indirect receiver.
    */
   public void acknowledgeAppendLog(AppendEntryResult ack) {
+    long operationStartTime = Statistic.RAFT_RECEIVER_HANDLE_APPEND_ACK.getOperationStartTime();
     AppendNodeEntryHandler appendNodeEntryHandler =
         sentLogHandlers.get(new Pair<>(ack.lastLogIndex, ack.lastLogTerm));
+    Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
     if (appendNodeEntryHandler != null) {
       appendNodeEntryHandler.onComplete(ack);
-      Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
     }
+    Statistic.RAFT_RECEIVER_HANDLE_APPEND_ACK.calOperationCostTimeFromStart(operationStartTime);
   }
 
   public void registerAppendLogHandler(
       Pair<Long, Long> indexTerm, AppendNodeEntryHandler appendNodeEntryHandler) {
-    sentLogHandlers.put(indexTerm, appendNodeEntryHandler);
+    sentLogHandlers.putIfAbsent(indexTerm, appendNodeEntryHandler);
   }
 
   public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
@@ -2404,4 +2402,8 @@ public abstract class RaftMember implements RaftMemberMBean {
   public void setClientManager(ClientManager clientManager) {
     this.clientManager = clientManager;
   }
+
+  public LogRelay getLogRelay() {
+    return logRelay;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index 10538be31e..cc284f0bb4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.monitor;
 
+import org.apache.iotdb.cluster.log.LogRelay.RelayEntry;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -78,6 +79,7 @@ public class NodeReport {
     long lastHeartbeatReceivedTime;
     long prevLastLogIndex;
     long maxAppliedLogIndex;
+    RelayEntry nextToRelay;
 
     RaftMemberReport(
         NodeCharacter character,
@@ -90,7 +92,8 @@ public class NodeReport {
         boolean isReadOnly,
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
-        long maxAppliedLogIndex) {
+        long maxAppliedLogIndex,
+        RelayEntry nextToRelay) {
       this.character = character;
       this.leader = leader;
       this.term = term;
@@ -102,6 +105,7 @@ public class NodeReport {
       this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
       this.prevLastLogIndex = prevLastLogIndex;
       this.maxAppliedLogIndex = maxAppliedLogIndex;
+      this.nextToRelay = nextToRelay;
     }
   }
 
@@ -119,7 +123,8 @@ public class NodeReport {
         boolean isReadOnly,
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
-        long maxAppliedLogIndex) {
+        long maxAppliedLogIndex,
+        RelayEntry nextToRelay) {
       super(
           character,
           leader,
@@ -131,7 +136,8 @@ public class NodeReport {
           isReadOnly,
           lastHeartbeatReceivedTime,
           prevLastLogIndex,
-          maxAppliedLogIndex);
+          maxAppliedLogIndex,
+          nextToRelay);
     }
 
     @Override
@@ -212,7 +218,8 @@ public class NodeReport {
         long headerLatency,
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
-        long maxAppliedLogIndex) {
+        long maxAppliedLogIndex,
+        RelayEntry nextToRelay) {
       super(
           character,
           leader,
@@ -224,7 +231,8 @@ public class NodeReport {
           isReadOnly,
           lastHeartbeatReceivedTime,
           prevLastLogIndex,
-          maxAppliedLogIndex);
+          maxAppliedLogIndex,
+          nextToRelay);
       this.header = header;
       this.headerLatency = headerLatency;
     }
@@ -254,6 +262,8 @@ public class NodeReport {
           + maxAppliedLogIndex
           + ", readOnly="
           + isReadOnly
+          + ", nextToRelay="
+          + nextToRelay
           + ", headerLatency="
           + headerLatency
           + "ns"
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 4524667b9d..395048eb80 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.server.monitor;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** NodeStatus contains the last-known spec and load of a node in the cluster. */
 @SuppressWarnings("java:S1135")
@@ -53,6 +54,9 @@ public class NodeStatus implements Comparable<NodeStatus> {
   // its lastDeactivatedTime is too old.
   private long lastDeactivatedTime;
 
+  private AtomicLong sendEntryNum = new AtomicLong();
+  private AtomicLong sendEntryLatencySum = new AtomicLong();
+
   // TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
   @Override
   public int compareTo(NodeStatus o) {
@@ -115,4 +119,34 @@ public class NodeStatus implements Comparable<NodeStatus> {
     return isActivated
         || (System.currentTimeMillis() - lastDeactivatedTime) > DEACTIVATION_VALID_INTERVAL_MS;
   }
+
+  public AtomicLong getSendEntryNum() {
+    return sendEntryNum;
+  }
+
+  public AtomicLong getSendEntryLatencySum() {
+    return sendEntryLatencySum;
+  }
+
+  @Override
+  public String toString() {
+    return "NodeStatus{"
+        + "status="
+        + status
+        + ", lastUpdateTime="
+        + lastUpdateTime
+        + ", lastResponseLatency="
+        + lastResponseLatency
+        + ", isActivated="
+        + isActivated
+        + ", lastDeactivatedTime="
+        + lastDeactivatedTime
+        + ", sendEntryNum="
+        + sendEntryNum
+        + ", sendEntryLatencySum="
+        + sendEntryLatencySum
+        + ", sendEntryLatencyAvg="
+        + (sendEntryLatencySum.get() * 1.0 / sendEntryNum.get())
+        + '}';
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index 6989bf97a0..eb79a84d2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -46,7 +47,7 @@ public class NodeStatusManager {
 
   private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
   // a status is considered stale if it is older than one minute and should be updated
-  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
+  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 10L;
   private static final NodeStatusManager INSTANCE = new NodeStatusManager();
 
   private MetaGroupMember metaGroupMember;
@@ -147,7 +148,7 @@ public class NodeStatusManager {
     } else {
       nodeStatus.setLastResponseLatency(Long.MAX_VALUE);
     }
-    logger.info(
+    logger.debug(
         "NodeStatus of {} is updated, status: {}, response time: {}",
         node,
         nodeStatus.getStatus(),
@@ -180,4 +181,8 @@ public class NodeStatusManager {
   public boolean isActivated(Node node) {
     return getNodeStatus(node, false).isActivated();
   }
+
+  public Map<Node, NodeStatus> getNodeStatusMap() {
+    return Collections.unmodifiableMap(nodeStatusMap);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 4169935f64..13b8ec7bc3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -146,6 +146,16 @@ public class Timer {
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_SEND_LOG(
         RAFT_MEMBER_SENDER, "send log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_HANDLE_SEND_RESULT(
+        RAFT_MEMBER_SENDER,
+        "handle send log result",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_RELAY_OFFER_LOG(
+        RAFT_MEMBER_SENDER, "relay offer log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_RELAY_LOG(
+        RAFT_MEMBER_SENDER, "relay log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_VOTE_COUNTER(
         RAFT_MEMBER_SENDER,
         "wait for votes",
@@ -237,14 +247,26 @@ public class Timer {
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_APPEND_ENTRY(
         RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
-    RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
-    // log dispatcher
-    LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
-        LOG_DISPATCHER,
-        "from create to queue",
+    RAFT_RECEIVER_APPEND_ACK(
+        RAFT_MEMBER_RECEIVER,
+        "ack append entrys",
         TIME_SCALE,
         true,
-        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_APPEND_ENTRY_FULL(
+        RAFT_MEMBER_RECEIVER,
+        "append entrys(full)",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_HANDLE_APPEND_ACK(
+        RAFT_MEMBER_SENDER,
+        "handle append entrys ack",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
+    // log dispatcher
     LOG_DISPATCHER_LOG_ENQUEUE(
         LOG_DISPATCHER,
         "enqueue",
@@ -265,6 +287,24 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
+        LOG_DISPATCHER,
+        "from create to queue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE(
+        LOG_DISPATCHER,
+        "from create to dequeue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_SENDING(
+        LOG_DISPATCHER,
+        "from create to sending",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_CREATE_TO_SENT(
         LOG_DISPATCHER,
         "from create to sent",
@@ -288,7 +328,9 @@ public class Timer {
     RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
-    RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
+    RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
+    RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
+    RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT);
 
     String className;
     String blockName;
@@ -335,11 +377,13 @@ public class Timer {
      * This method equals `add(System.nanoTime() - start)`. We wrap `System.nanoTime()` in this
      * method to avoid unnecessary calls when instrumenting is disabled.
      */
-    public void calOperationCostTimeFromStart(long startTime) {
+    public long calOperationCostTimeFromStart(long startTime) {
       if (ENABLE_INSTRUMENTING && startTime != Long.MIN_VALUE && startTime != 0) {
         long consumed = System.nanoTime() - startTime;
         add(consumed);
+        return consumed;
       }
+      return 0;
     }
 
     /** WARN: no current safety guarantee. */
@@ -363,6 +407,10 @@ public class Timer {
       double avg = s / cnt;
       return String.format("%s - %s: %.2f, %d, %.2f, %d", className, blockName, s, cnt, avg, max);
     }
+
+    public long getCnt() {
+      return counter.get();
+    }
   }
 
   public static String getReport() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 7a6eb2c0c4..486c50959e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 
@@ -160,7 +161,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
    */
   @Override
   public TNodeStatus queryNodeStatus() {
-    return new TNodeStatus();
+    return new TNodeStatus()
+        .setFanoutRequestNum(
+            Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt());
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java
new file mode 100644
index 0000000000..b2b94aa2e0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.utils;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class WeightedList<T> {
+  private List<Pair<T, Double>> elements = new ArrayList<>();
+  private Random random = new Random();
+  private double[] probabilities;
+  private double weightSum = 0.0;
+
+  public void insert(T t, Double weight) {
+    elements.add(new Pair<>(t, weight));
+    weightSum += weight;
+  }
+
+  public List<T> select(int num) {
+    List<T> rst = new ArrayList<>(num);
+    if (num >= elements.size()) {
+      for (Pair<T, Double> element : elements) {
+        rst.add(element.left);
+      }
+      elements.clear();
+      weightSum = 0.0;
+    } else {
+      for (int i = 0; i < num; i++) {
+        rst.add(select());
+      }
+    }
+
+    return rst;
+  }
+
+  public T select() {
+    if (elements.isEmpty()) {
+      return null;
+    }
+
+    if (probabilities == null || probabilities.length < elements.size()) {
+      probabilities = new double[elements.size()];
+    }
+
+    probabilities[0] = elements.get(0).right / weightSum;
+    for (int i = 1; i < elements.size(); i++) {
+      probabilities[i] = elements.get(i).right / weightSum + probabilities[i - 1];
+    }
+
+    double p = random.nextDouble();
+    int selectedIndex = elements.size() - 1;
+    for (int i = 0; i < elements.size() - 1; i++) {
+      if (p <= probabilities[i]) {
+        selectedIndex = i;
+        break;
+      }
+    }
+
+    Pair<T, Double> rst = elements.remove(selectedIndex);
+    weightSum -= rst.right;
+    return rst.left;
+  }
+
+  public int size() {
+    return elements.size();
+  }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 77111cb0f3..3c82a4241a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -78,8 +78,7 @@ public class AppendNodeEntryHandlerTest {
         handler.setLog(votingLog);
         handler.setMember(member);
         handler.setReceiverTerm(receiverTerm);
-        handler.setReceiver(TestUtils.getNode(i));
-        handler.setPeer(peer);
+        handler.setDirectReceiver(TestUtils.getNode(i));
         handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
         long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
         AppendEntryResult result = new AppendEntryResult();
@@ -114,8 +113,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setLog(votingLog);
       handler.setMember(member);
       handler.setReceiverTerm(receiverTerm);
-      handler.setReceiver(TestUtils.getNode(i));
-      handler.setPeer(peer);
+      handler.setDirectReceiver(TestUtils.getNode(i));
       handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       AppendEntryResult result = new AppendEntryResult();
       result.setStatus(Response.RESPONSE_AGREE);
@@ -141,8 +139,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setLog(votingLog);
       handler.setMember(member);
       handler.setReceiverTerm(receiverTerm);
-      handler.setReceiver(TestUtils.getNode(0));
-      handler.setPeer(peer);
+      handler.setDirectReceiver(TestUtils.getNode(0));
       handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
       votingLog.wait();
@@ -168,8 +165,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setLog(votingLog);
       handler.setMember(member);
       handler.setReceiverTerm(receiverTerm);
-      handler.setReceiver(TestUtils.getNode(0));
-      handler.setPeer(peer);
+      handler.setDirectReceiver(TestUtils.getNode(0));
       handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       handler.onError(new TestException());
 
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index ea8edf73d9..f627c13621 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -235,7 +235,7 @@ struct PreviousFillRequest {
 
 // the spec and load of a node, for query coordinating
 struct TNodeStatus {
-
+  1: required long fanoutRequestNum
 }
 
 struct GetAggrResultRequest {
@@ -283,6 +283,7 @@ struct AppendEntryResult {
   2: optional i64 lastLogTerm;
   3: optional i64 lastLogIndex;
   4: optional RaftNode header;
+  5: optional Node receiver;
 }