You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/04/07 01:16:55 UTC

[iotdb] branch expr_plus updated (87cc4b641c -> 1da27c431b)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 87cc4b641c remove old interface
     new 1bac74e9ae fix ack leader with wrong receiver
     new 64011f9038 fix load calculation
     new 1da27c431b add relay_first_level_size config

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 cluster/distribute-dc.sh                           |   2 +-
 .../resources/conf/iotdb-cluster.properties        |   3 +-
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  13 ++
 .../cluster/client/async/AsyncDataClient.java      |   4 -
 .../apache/iotdb/cluster/config/ClusterConfig.java |  20 ++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  11 +
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 128 +++++++++--
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 160 ++++++++++----
 .../java/org/apache/iotdb/cluster/log/Log.java     |   2 +-
 .../org/apache/iotdb/cluster/log/LogAckSender.java | 188 ++++++++++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  69 ++++--
 .../org/apache/iotdb/cluster/log/LogRelay.java     | 132 ++++++++++--
 .../apache/iotdb/cluster/log/VotingLogList.java    |  16 ++
 .../cluster/log/appender/BlockingLogAppender.java  |  47 ++--
 .../iotdb/cluster/log/appender/LogAppender.java    |   7 +-
 .../log/appender/SlidingWindowLogAppender.java     |  88 ++++++--
 .../cluster/log/logtypes/PhysicalPlanLog.java      |  11 +
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  17 +-
 .../cluster/query/manage/QueryCoordinator.java     |   3 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |  55 ++---
 .../server/handlers/caller/HeartbeatHandler.java   |  16 +-
 .../cluster/server/heartbeat/HeartbeatThread.java  |  18 +-
 .../cluster/server/member/DataGroupMember.java     |  36 ++--
 .../cluster/server/member/MetaGroupMember.java     |   3 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 238 ++++++++-------------
 .../iotdb/cluster/server/monitor/NodeReport.java   |  95 ++++----
 .../iotdb/cluster/server/monitor/NodeStatus.java   |  55 +++++
 .../cluster/server/monitor/NodeStatusManager.java  |  16 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java | 102 ++++++++-
 .../cluster/server/service/MetaSyncService.java    |   3 +-
 .../apache/iotdb/cluster/utils/WeightedList.java   |  87 ++++++++
 .../iotdb/cluster/utils/WindowStatistic.java}      |  35 ++-
 .../caller/AppendNodeEntryHandlerTest.java         |  12 +-
 thrift-cluster/src/main/thrift/cluster.thrift      |   3 +-
 34 files changed, 1291 insertions(+), 404 deletions(-)
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java
 copy cluster/src/{test/java/org/apache/iotdb/cluster/utils/Constants.java => main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java} (56%)


[iotdb] 01/03: fix ack leader with wrong receiver

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1bac74e9ae4ca34a8cf49f86ca260e6e95e1151d
Author: jt <jt...@163.com>
AuthorDate: Tue Mar 29 11:28:17 2022 +0800

    fix ack leader with wrong receiver
---
 cluster/distribute-dc.sh                           |   2 +-
 .../resources/conf/iotdb-cluster.properties        |   3 +-
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  19 +++
 .../cluster/client/async/AsyncDataClient.java      |   4 -
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 ++
 .../iotdb/cluster/config/ClusterDescriptor.java    |   6 +
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 128 +++++++++++---
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 137 ++++++++++-----
 .../java/org/apache/iotdb/cluster/log/Log.java     |   2 +-
 .../org/apache/iotdb/cluster/log/LogAckSender.java | 186 +++++++++++++++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  65 +++++--
 .../org/apache/iotdb/cluster/log/LogRelay.java     |  51 +++++-
 .../apache/iotdb/cluster/log/VotingLogList.java    |  16 ++
 .../cluster/log/appender/BlockingLogAppender.java  |  47 ++++--
 .../iotdb/cluster/log/appender/LogAppender.java    |   7 +-
 .../log/appender/SlidingWindowLogAppender.java     |  88 ++++++++--
 .../cluster/log/logtypes/PhysicalPlanLog.java      |  11 ++
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  17 +-
 .../cluster/query/manage/QueryCoordinator.java     |   3 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |  55 +++---
 .../server/handlers/caller/HeartbeatHandler.java   |  16 +-
 .../cluster/server/heartbeat/HeartbeatThread.java  |  18 +-
 .../cluster/server/member/DataGroupMember.java     |   3 +-
 .../cluster/server/member/MetaGroupMember.java     |   3 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 174 +++++++++----------
 .../iotdb/cluster/server/monitor/NodeReport.java   |  20 ++-
 .../iotdb/cluster/server/monitor/NodeStatus.java   |  34 ++++
 .../cluster/server/monitor/NodeStatusManager.java  |   9 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java |  64 ++++++-
 .../cluster/server/service/MetaSyncService.java    |   5 +-
 .../apache/iotdb/cluster/utils/WeightedList.java   |  87 ++++++++++
 .../caller/AppendNodeEntryHandlerTest.java         |  12 +-
 thrift-cluster/src/main/thrift/cluster.thrift      |   3 +-
 33 files changed, 1018 insertions(+), 287 deletions(-)

diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 1eba6f4264..5e1af6b312 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,6 +1,6 @@
 src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
 
-ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
+ips=(dc13 dc14 dc15 dc16 dc17 dc18)
 target_lib_path=/home/jt/iotdb_expr/lib
 
 for ip in ${ips[*]}
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index ff363749b4..56566c327e 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -212,4 +212,5 @@ multi_raft_factor=1
 # replicas is high, which may make thread switching costly.
 # dispatcher_binding_thread_num=16
 
-use_indirect_broadcasting=true
\ No newline at end of file
+use_indirect_broadcasting=true
+optimize_indirect_broadcasting=false
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 36c40d04bc..2e5b02cc58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,6 +45,9 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
 import org.apache.iotdb.cluster.server.raft.DataRaftService;
 import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -79,6 +82,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -250,6 +254,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     }
 
     // we start IoTDB kernel first. then we start the cluster module.
+    Runtime.getRuntime().addShutdownHook(new ShutdownHook());
     if (MODE_START.equals(mode)) {
       cluster.activeStartNodeMode();
     } else if (MODE_ADD.equals(mode)) {
@@ -287,6 +292,20 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     return true;
   }
 
+  private static class ShutdownHook extends Thread {
+
+    @Override
+    public void run() {
+      logger.info(
+          "Total request fanout: {}",
+          Statistic.RAFT_SENDER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
+      for (Entry<Node, NodeStatus> nodeNodeStatusEntry :
+          NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet()) {
+        logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
+      }
+    }
+  }
+
   private String clusterConfigCheck() {
     if (IoTDBDescriptor.getInstance().getConfig().isReplaceHostNameWithIp()) {
       try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index df6d5ead74..cb9c798b68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -147,17 +147,13 @@ public class AsyncDataClient extends TSDataService.AsyncClient {
 
   public static class AsyncDataClientFactory extends AsyncBaseFactory<Node, AsyncDataClient> {
 
-    Exception createStack;
-
     public AsyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       super(protocolFactory, category);
-      createStack = new Exception();
     }
 
     public AsyncDataClientFactory(
         TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
       super(protocolFactory, category, clientManager);
-      createStack = new Exception();
     }
 
     @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index c75237beca..187509b2ef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,6 +192,8 @@ public class ClusterConfig {
 
   private int relaySenderNum = 8;
 
+  private boolean optimizeIndirectBroadcasting = false;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
    * there is something error for getting the ip of the hostname, then set the internalIp as
@@ -590,4 +592,12 @@ public class ClusterConfig {
   public void setRelaySenderNum(int relaySenderNum) {
     this.relaySenderNum = relaySenderNum;
   }
+
+  public boolean isOptimizeIndirectBroadcasting() {
+    return optimizeIndirectBroadcasting;
+  }
+
+  public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) {
+    this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index c8bfbdc51f..a089e2d75a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -345,6 +345,12 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "use_indirect_broadcasting", String.valueOf(config.isUseIndirectBroadcasting()))));
 
+    config.setOptimizeIndirectBroadcasting(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "optimize_indirect_broadcasting",
+                String.valueOf(config.isOptimizeIndirectBroadcasting()))));
+
     config.setRelaySenderNum(
         Integer.parseInt(
             properties.getProperty(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 7673e116c7..d94957f74d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -28,20 +28,28 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Random;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class ExprBench {
 
+  private static final Logger logger = LoggerFactory.getLogger(ExprBench.class);
+
   private AtomicLong requestCounter = new AtomicLong();
   private AtomicLong latencySum = new AtomicLong();
   private long maxLatency = 0;
@@ -49,45 +57,86 @@ public class ExprBench {
   private int workloadSize = 64 * 1024;
   private int printInterval = 1000;
   private ClientManager clientPool;
-  private Node target;
   private int maxRequestNum;
   private ExecutorService pool = Executors.newCachedThreadPool();
   private List<Node> nodeList = new ArrayList<>();
-  private int raftFactor = 1;
+  private int[] raftFactors;
+  private int[] rateLimits;
+  private List<EndPoint> endPoints = new ArrayList<>();
+  private Map<EndPoint, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
+  private Map<EndPoint, Statistic> latencyMap = new ConcurrentHashMap<>();
 
   public ExprBench(Node target) {
-    this.target = target;
     clientPool = new ClientManager(false, Type.MetaGroupClient);
   }
 
+  private static class EndPoint {
+    private Node node;
+    private int raftId;
+
+    public EndPoint(Node node, int raftId) {
+      this.node = node;
+      this.raftId = raftId;
+    }
+
+    @Override
+    public String toString() {
+      return "EndPoint{" + "node=" + node.getInternalIp() + ", raftId=" + raftId + '}';
+    }
+  }
+
+  private static class Statistic {
+    private AtomicLong sum = new AtomicLong();
+    private AtomicLong cnt = new AtomicLong();
+
+    public void add(long val) {
+      sum.addAndGet(val);
+      cnt.incrementAndGet();
+    }
+
+    @Override
+    public String toString() {
+      return "{" + sum.get() + "," + cnt.get() + "," + (sum.get() * 1.0 / cnt.get()) + "}";
+    }
+  }
+
   public void benchmark() {
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < threadNum; i++) {
       int finalI = i;
       pool.submit(
           () -> {
-            Random random = new Random(123456L + finalI);
+            int endPointIdx = finalI % endPoints.size();
             Client client = null;
-            try {
-              client = clientPool.borrowSyncClient(target, ClientCategory.META);
-            } catch (IOException e) {
-              e.printStackTrace();
-            }
+
             ExecutNonQueryReq request = new ExecutNonQueryReq();
             DummyPlan plan = new DummyPlan();
             plan.setWorkload(new byte[workloadSize]);
             plan.setNeedForward(true);
 
             ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+            Map<EndPoint, Node> endPointLeaderMap = new HashMap<>();
 
+            Node target = null;
             long currRequsetNum = -1;
             while (true) {
 
-              if (raftFactor > 0) {
-                Node node = nodeList.get(random.nextInt(nodeList.size()));
-                int raftId = random.nextInt(raftFactor);
-                plan.setGroupIdentifier(ClusterUtils.nodeToString(node) + "#" + raftId);
+              EndPoint endPoint = endPoints.get(endPointIdx);
+              RateLimiter rateLimiter = rateLimiterMap.get(endPoint);
+              if (rateLimiter != null) {
+                rateLimiter.acquire(1);
               }
+
+              target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node);
+              int raftId = endPoint.raftId;
+              plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId);
+
+              try {
+                client = clientPool.borrowSyncClient(target, ClientCategory.META);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+
               byteBuffer.clear();
               plan.serialize(byteBuffer);
               byteBuffer.flip();
@@ -96,12 +145,20 @@ public class ExprBench {
 
               long reqLatency = System.nanoTime();
               try {
-                client.executeNonQueryPlan(request);
+                TSStatus status = client.executeNonQueryPlan(request);
+                clientPool.returnSyncClient(client, target, ClientCategory.META);
+                if (status.isSetRedirectNode()) {
+                  Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880);
+                  endPointLeaderMap.put(endPoint, leader);
+                  logger.info("Leader of {} is changed to {}", endPoint, leader);
+                }
+
                 currRequsetNum = requestCounter.incrementAndGet();
                 if (currRequsetNum > threadNum * 10) {
                   reqLatency = System.nanoTime() - reqLatency;
                   maxLatency = Math.max(maxLatency, reqLatency);
                   latencySum.addAndGet(reqLatency);
+                  latencyMap.get(endPoint).add(reqLatency);
                 }
               } catch (TException e) {
                 e.printStackTrace();
@@ -118,6 +175,7 @@ public class ExprBench {
                         currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
                         maxLatency / 1000.0,
                         (latencySum.get() + 0.0) / currRequsetNum));
+                System.out.println(latencyMap);
               }
 
               if (currRequsetNum >= maxRequestNum) {
@@ -136,14 +194,12 @@ public class ExprBench {
   public static void main(String[] args) {
     ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
     Node target = new Node();
-    target.setInternalIp(args[0]);
-    target.setMetaPort(Integer.parseInt(args[1]));
     ExprBench bench = new ExprBench(target);
-    bench.maxRequestNum = Integer.parseInt(args[2]);
-    bench.threadNum = Integer.parseInt(args[3]);
-    bench.workloadSize = Integer.parseInt(args[4]) * 1024;
-    bench.printInterval = Integer.parseInt(args[5]);
-    String[] nodesSplit = args[6].split(",");
+    bench.maxRequestNum = Integer.parseInt(args[0]);
+    bench.threadNum = Integer.parseInt(args[1]);
+    bench.workloadSize = Integer.parseInt(args[2]) * 1024;
+    bench.printInterval = Integer.parseInt(args[3]);
+    String[] nodesSplit = args[4].split(",");
     for (String s : nodesSplit) {
       String[] nodeSplit = s.split(":");
       Node node = new Node();
@@ -151,8 +207,34 @@ public class ExprBench {
       node.setMetaPort(Integer.parseInt(nodeSplit[1]));
       bench.nodeList.add(node);
     }
-    bench.raftFactor = Integer.parseInt(args[7]);
+    String[] raftFactorSplit = args[5].split(",");
+    bench.raftFactors = new int[raftFactorSplit.length];
+    for (int i = 0; i < raftFactorSplit.length; i++) {
+      bench.raftFactors[i] = Integer.parseInt(raftFactorSplit[i]);
+    }
+    if (args.length >= 7) {
+      String[] ratesSplit = args[6].split(",");
+      bench.rateLimits = new int[ratesSplit.length];
+      for (int i = 0; i < ratesSplit.length; i++) {
+        bench.rateLimits[i] = Integer.parseInt(ratesSplit[i]);
+      }
+    }
+
+    List<Node> list = bench.nodeList;
+    for (int i = 0, listSize = list.size(); i < listSize; i++) {
+      Node node = list.get(i);
+      for (int j = 0; j < bench.raftFactors[i]; j++) {
+        EndPoint endPoint = new EndPoint(node, j);
+        bench.endPoints.add(endPoint);
+        bench.latencyMap.put(endPoint, new Statistic());
+        if (bench.rateLimits != null) {
+          bench.rateLimiterMap.put(endPoint, RateLimiter.create(bench.rateLimits[i]));
+        }
+      }
+    }
 
     bench.benchmark();
+
+    System.out.println(bench.latencyMap);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 343cb3d21f..c02f911778 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,24 +19,25 @@
 
 package org.apache.iotdb.cluster.log;
 
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.WeightedList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 /**
  * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
@@ -45,10 +46,13 @@ import java.util.concurrent.TimeUnit;
 public class IndirectLogDispatcher extends LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(IndirectLogDispatcher.class);
-  private Map<Node, List<Node>> directToIndirectFollowerMap;
+
+  private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
 
   public IndirectLogDispatcher(RaftMember member) {
     super(member);
+    recalculateDirectFollowerMap();
+    useBatchInLogCatchUp = false;
   }
 
   @Override
@@ -59,25 +63,101 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
   @Override
   void createQueueAndBindingThreads() {
+    for (Node node : member.getAllNodes()) {
+      if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
+        nodesEnabled.put(node, false);
+        nodesLogQueues.put(node, createQueueAndBindingThread(node));
+      }
+    }
+  }
+
+  @Override
+  public void offer(SendLogRequest request) {
+    super.offer(request);
     recalculateDirectFollowerMap();
   }
 
+  @Override
+  protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
+    SendLogRequest newRequest = new SendLogRequest(request);
+    // copy the RPC request so each request can have different sub-receivers but the same log
+    // binary and other fields
+    newRequest.setAppendEntryRequest(new AppendEntryRequest(newRequest.getAppendEntryRequest()));
+    newRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(node));
+    return newRequest;
+  }
+
   public void recalculateDirectFollowerMap() {
     List<Node> allNodes = new ArrayList<>(member.getAllNodes());
     allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
-    QueryCoordinator instance = QueryCoordinator.getINSTANCE();
-    List<Node> orderedNodes = instance.reorderNodes(allNodes);
-    synchronized (this) {
-      executorService.shutdown();
-      try {
-        executorService.awaitTermination(10, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.warn("Dispatcher thread pool of {} cannot be shutdown within 10s", member);
+    Collections.shuffle(allNodes);
+    List<Node> orderedNodes = allNodes;
+
+    nodesEnabled.clear();
+    directToIndirectFollowerMap.clear();
+
+    if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
+      QueryCoordinator instance = QueryCoordinator.getINSTANCE();
+      orderedNodes = instance.reorderNodes(allNodes);
+      long thisLoad =
+          Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt() + 1;
+      long minLoad =
+          NodeStatusManager.getINSTANCE()
+                  .getNodeStatus(orderedNodes.get(0), false)
+                  .getStatus()
+                  .fanoutRequestNum
+              + 1;
+      double loadFactor = 1.05;
+      WeightedList<Node> firstLevelCandidates = new WeightedList<>();
+      firstLevelCandidates.insert(
+          orderedNodes.get(0),
+          1.0
+              / (NodeStatusManager.getINSTANCE()
+                      .getNodeStatus(orderedNodes.get(0), false)
+                      .getStatus()
+                      .fanoutRequestNum
+                  + 1));
+      int firstLevelSize = 1;
+
+      for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
+        Node orderedNode = orderedNodes.get(i);
+        long nodeLoad =
+            NodeStatusManager.getINSTANCE()
+                    .getNodeStatus(orderedNode, false)
+                    .getStatus()
+                    .fanoutRequestNum
+                + 1;
+        if (nodeLoad * 1.0 <= minLoad * loadFactor) {
+          firstLevelCandidates.insert(orderedNode, 1.0 / nodeLoad);
+        }
+        if (nodeLoad > thisLoad) {
+          firstLevelSize = (int) Math.max(firstLevelSize, nodeLoad / thisLoad);
+        }
+      }
+
+      if (firstLevelSize > firstLevelCandidates.size()) {
+        firstLevelSize = firstLevelCandidates.size();
       }
-      executorService = Executors.newCachedThreadPool();
 
-      directToIndirectFollowerMap = new HashMap<>();
+      List<Node> firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
+
+      Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
+      orderedNodes.removeAll(firstLevelNodes);
+      for (int i = 0; i < orderedNodes.size(); i++) {
+        Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
+        secondLevelNodeMap
+            .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
+            .add(orderedNodes.get(i));
+      }
+
+      for (Node firstLevelNode : firstLevelNodes) {
+        directToIndirectFollowerMap.put(
+            firstLevelNode,
+            secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
+        nodesEnabled.put(firstLevelNode, true);
+      }
+
+    } else {
       for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
         if (i != j) {
           directToIndirectFollowerMap.put(
@@ -85,31 +165,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
         } else {
           directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
         }
+        nodesEnabled.put(orderedNodes.get(i), true);
       }
     }
 
-    for (Node node : directToIndirectFollowerMap.keySet()) {
-      nodesLogQueues.put(node, createQueueAndBindingThread(node));
-    }
-  }
-
-  class DispatcherThread extends LogDispatcher.DispatcherThread {
-
-    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
-      super(receiver, logBlockingDeque);
-    }
-
-    @Override
-    void sendLog(SendLogRequest logRequest) {
-      logRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(receiver));
-      super.sendLog(logRequest);
-    }
-
-    @Override
-    protected AppendEntriesRequest prepareRequest(
-        List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
-      return super.prepareRequest(logList, currBatch, firstIndex)
-          .setSubReceivers(directToIndirectFollowerMap.get(receiver));
-    }
+    logger.debug("New relay map: {}", directToIndirectFollowerMap);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index efa095032a..c2b8b70259 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -50,7 +50,7 @@ public abstract class Log implements Comparable<Log> {
 
   private int byteSize = 0;
 
-  public static int getDefaultBufferSize() {
+  public int getDefaultBufferSize() {
     return DEFAULT_BUFFER_SIZE;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
new file mode 100644
index 0000000000..3441a01432
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+
+public class LogAckSender {
+
+  private static final Logger logger = LoggerFactory.getLogger(LogAckSender.class);
+
+  private ExecutorService ackSenderPool;
+  private RaftNode header;
+  private RaftMember member;
+  private BlockingQueue<AckRequest> requestQueue = new ArrayBlockingQueue<>(4096);
+  private String baseThreadName;
+
+  public LogAckSender(RaftMember member) {
+    this.member = member;
+    this.header = member.getHeader();
+    ackSenderPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(member.getName() + "-ACKSender");
+    ackSenderPool.submit(this::appendAckLeaderTask);
+  }
+
+  public static class AckRequest {
+
+    private Node leader;
+    private long index;
+    private long term;
+    private long response;
+
+    public AckRequest(Node leader, long index, long term, long response) {
+      this.leader = leader;
+      this.index = index;
+      this.term = term;
+      this.response = response;
+    }
+  }
+
+  public void offer(Node leader, long index, long term, long response) {
+    requestQueue.add(new AckRequest(leader, index, term, response));
+  }
+
+  private void appendAckLeaderTask() {
+    List<AckRequest> ackRequestList = new ArrayList<>();
+    baseThreadName = Thread.currentThread().getName();
+    try {
+      while (!Thread.interrupted()) {
+        ackRequestList.clear();
+        synchronized (requestQueue) {
+          AckRequest req = requestQueue.take();
+          ackRequestList.add(req);
+          requestQueue.drainTo(ackRequestList);
+        }
+
+        appendAckLeader(ackRequestList);
+        Thread.sleep(10);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void appendAckLeader(List<AckRequest> requests) {
+    if (requests.isEmpty()) {
+      return;
+    }
+    int index = 0;
+    AckRequest requestToSend = null;
+    for (; index < requests.size(); index++) {
+      if (requestToSend == null) {
+        requestToSend = requests.get(index);
+      } else {
+        AckRequest currRequest = requests.get(index);
+
+        if (requestToSend.term == currRequest.term
+            && requestToSend.index >= currRequest.index
+            && (requestToSend.response == currRequest.response
+                || requestToSend.response == Response.RESPONSE_STRONG_ACCEPT)) {
+          // currRequest has the same response and leader as requestToSend, but has a smaller
+          // index, so it can be covered by requestToSend, ignore it
+          // continue
+        } else if (requestToSend.term == currRequest.term
+            && requestToSend.index < currRequest.index
+            && (requestToSend.response == currRequest.response
+                || currRequest.response == Response.RESPONSE_STRONG_ACCEPT)) {
+          // currRequest has the same response and leader as requestToSend, but has a larger
+          // index, so it can replace requestToSend
+          requestToSend = currRequest;
+        } else {
+          // the requests cannot cover each other, send requestToSend first
+          appendAckLeader(
+              requestToSend.leader,
+              requestToSend.index,
+              requestToSend.term,
+              requestToSend.response);
+          requestToSend = currRequest;
+        }
+      }
+    }
+
+    if (requestToSend != null) {
+      appendAckLeader(
+          requestToSend.leader, requestToSend.index, requestToSend.term, requestToSend.response);
+    }
+  }
+
+  private void appendAckLeader(Node leader, long index, long term, long response) {
+    long ackStartTime = Statistic.RAFT_RECEIVER_APPEND_ACK.getOperationStartTime();
+    AppendEntryResult result = new AppendEntryResult();
+    result.setLastLogIndex(index);
+    result.setLastLogTerm(term);
+    result.status = response;
+    result.setHeader(header);
+    result.setReceiver(member.getThisNode());
+
+    Client syncClient = null;
+    try {
+      if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+        GenericHandler<Void> handler = new GenericHandler<>(leader, null);
+        member.getAsyncClient(leader).acknowledgeAppendEntry(result, handler);
+      } else {
+        syncClient = member.getSyncClient(leader);
+        syncClient.acknowledgeAppendEntry(result);
+      }
+    } catch (TException e) {
+      logger.warn("Cannot send ack of {}-{} to leader {}", index, term, leader, e);
+    } finally {
+      if (syncClient != null) {
+        ClientUtils.putBackSyncClient(syncClient);
+      }
+    }
+    Thread.currentThread().setName(baseThreadName + "-" + index + "-" + response);
+    Statistic.RAFT_SEND_RELAY_ACK.add(1);
+    Statistic.RAFT_RECEIVER_APPEND_ACK.calOperationCostTimeFromStart(ackStartTime);
+  }
+
+  public void stop() {
+    ackSenderPool.shutdownNow();
+    try {
+      ackSenderPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.error("Unexpected interruption when waiting for ackSenderPool to end", e);
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 2410b2feaa..3e659a1468 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -58,6 +60,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -72,8 +75,9 @@ public class LogDispatcher {
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
   RaftMember member;
   private static final ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
-  private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
+  protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
   Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueues = new HashMap<>();
+  Map<Node, Boolean> nodesEnabled = new HashMap<>();
   ExecutorService executorService;
   private static ExecutorService serializationService =
       IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
@@ -81,17 +85,21 @@ public class LogDispatcher {
 
   public static int bindingThreadNum = clusterConfig.getDispatcherBindingThreadNum();
   public static int maxBatchSize = 10;
+  public static AtomicInteger concurrentSenderNum = new AtomicInteger();
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
     executorService =
-        IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + member.getName());
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            bindingThreadNum * (member.getAllNodes().size() - 1),
+            "LogDispatcher-" + member.getName());
     createQueueAndBindingThreads();
   }
 
   void createQueueAndBindingThreads() {
     for (Node node : member.getAllNodes()) {
       if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
+        nodesEnabled.put(node, true);
         nodesLogQueues.put(node, createQueueAndBindingThread(node));
       }
     }
@@ -109,15 +117,27 @@ public class LogDispatcher {
     return byteBuffer;
   }
 
+  protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
+    return request;
+  }
+
   public void offer(SendLogRequest request) {
     // do serialization here to avoid taking LogManager for too long
     if (!nodesLogQueues.isEmpty()) {
-      request.serializedLogFuture = serializationService.submit(() -> serializeTask(request));
+      SendLogRequest finalRequest = request;
+      request.serializedLogFuture = serializationService.submit(() -> serializeTask(finalRequest));
     }
 
     long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
     for (Entry<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueues.entrySet()) {
+      boolean nodeEnabled = this.nodesEnabled.getOrDefault(entry.getKey(), false);
+      if (!nodeEnabled) {
+        continue;
+      }
+
+      request = transformRequest(entry.getKey(), request);
+
       BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
       try {
         boolean addSucceeded;
@@ -173,6 +193,7 @@ public class LogDispatcher {
 
   public static class SendLogRequest {
 
+    private AppendNodeEntryHandler handler;
     private VotingLog votingLog;
     private AtomicBoolean leaderShipStale;
     private AtomicLong newLeaderTerm;
@@ -201,6 +222,7 @@ public class LogDispatcher {
       this.setAppendEntryRequest(request.appendEntryRequest);
       this.setQuorumSize(request.quorumSize);
       this.setEnqueueTime(request.enqueueTime);
+      this.serializedLogFuture = request.serializedLogFuture;
     }
 
     public VotingLog getVotingLog() {
@@ -265,28 +287,27 @@ public class LogDispatcher {
     private Peer peer;
     Client syncClient;
     AsyncClient asyncClient;
+    private String baseName;
 
     DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
       this.receiver = receiver;
       this.logBlockingDeque = logBlockingDeque;
-      this.peer =
-          member
-              .getPeerMap()
-              .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
+      this.peer = member.getPeer(receiver);
       if (!clusterConfig.isUseAsyncServer()) {
         syncClient = member.getSyncClient(receiver);
       }
+      baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
     }
 
     @Override
     public void run() {
-      Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
+      Thread.currentThread().setName(baseName);
       try {
         while (!Thread.interrupted()) {
           synchronized (logBlockingDeque) {
             SendLogRequest poll = logBlockingDeque.take();
             currBatch.add(poll);
-            if (maxBatchSize > 1) {
+            if (maxBatchSize > 1 && useBatchInLogCatchUp) {
               logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
             }
           }
@@ -310,6 +331,8 @@ public class LogDispatcher {
       for (SendLogRequest request : currBatch) {
         Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
             request.getVotingLog().getLog().getEnqueueTime());
+        Statistic.LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE.calOperationCostTimeFromStart(
+            request.getVotingLog().getLog().getCreateTime());
         long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
         request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
         Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
@@ -427,6 +450,8 @@ public class LogDispatcher {
           sendLogs(currBatch);
         } else {
           for (SendLogRequest batch : currBatch) {
+            Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+                batch.getVotingLog().getLog().getCreateTime());
             sendLog(batch);
           }
         }
@@ -442,21 +467,29 @@ public class LogDispatcher {
               receiver,
               logRequest.leaderShipStale,
               logRequest.newLeaderTerm,
-              peer,
               logRequest.quorumSize);
       // TODO add async interface
       int retries = 5;
       try {
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         for (int i = 0; i < retries; i++) {
-          logRequest.getVotingLog().getFailedNodeIds().remove(receiver.nodeIdentifier);
-          logRequest.getVotingLog().getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
+          int concurrentSender = concurrentSenderNum.incrementAndGet();
+          Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
           AppendEntryResult result = syncClient.appendEntry(logRequest.appendEntryRequest);
+          concurrentSenderNum.decrementAndGet();
           if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
             Thread.sleep(100);
+            Statistic.RAFT_SENDER_OOW.add(1);
           } else {
-            Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+            long sendLogTime =
+                Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+            NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(receiver, false);
+            nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+            nodeStatus.getSendEntryNum().incrementAndGet();
+
+            long handleStart = Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.getOperationStartTime();
             handler.onComplete(result);
+            Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.calOperationCostTimeFromStart(handleStart);
             break;
           }
         }
@@ -477,7 +510,6 @@ public class LogDispatcher {
               receiver,
               logRequest.leaderShipStale,
               logRequest.newLeaderTerm,
-              peer,
               logRequest.quorumSize);
 
       AsyncClient client = member.getAsyncClient(receiver);
@@ -491,11 +523,15 @@ public class LogDispatcher {
     }
 
     void sendLog(SendLogRequest logRequest) {
+      Thread.currentThread()
+          .setName(baseName + "-" + logRequest.getVotingLog().getLog().getCurrLogIndex());
       if (clusterConfig.isUseAsyncServer()) {
         sendLogAsync(logRequest);
       } else {
         sendLogSync(logRequest);
       }
+      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
+          logRequest.getVotingLog().getLog().getCreateTime());
     }
 
     class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
@@ -511,7 +547,6 @@ public class LogDispatcher {
                   receiver,
                   sendLogRequest.getLeaderShipStale(),
                   sendLogRequest.getNewLeaderTerm(),
-                  peer,
                   sendLogRequest.getQuorumSize());
           singleEntryHandlers.add(handler);
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 69645586d7..5972fba588 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -25,18 +25,22 @@ import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /** LogRelay is used by followers to forward entries from the leader to other followers. */
 public class LogRelay {
 
+  private static final Logger logger = LoggerFactory.getLogger(LogRelay.class);
+
   private ConcurrentSkipListSet<RelayEntry> entryHeap = new ConcurrentSkipListSet<>();
   private static final int RELAY_NUMBER =
       ClusterDescriptor.getInstance().getConfig().getRelaySenderNum();
@@ -46,11 +50,8 @@ public class LogRelay {
   public LogRelay(RaftMember raftMember) {
     this.raftMember = raftMember;
     relaySenders =
-        Executors.newFixedThreadPool(
-            RELAY_NUMBER,
-            new ThreadFactoryBuilder()
-                .setNameFormat(raftMember.getName() + "-RelaySender-%d")
-                .build());
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            RELAY_NUMBER, raftMember.getName() + "-RelaySender");
     for (int i = 0; i < RELAY_NUMBER; i++) {
       relaySenders.submit(new RelayThread());
     }
@@ -65,6 +66,7 @@ public class LogRelay {
   }
 
   private void offer(RelayEntry entry) {
+    long operationStartTime = Statistic.RAFT_SENDER_RELAY_OFFER_LOG.getOperationStartTime();
     synchronized (entryHeap) {
       while (entryHeap.size()
           > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
@@ -77,6 +79,7 @@ public class LogRelay {
       entryHeap.add(entry);
       entryHeap.notifyAll();
     }
+    Statistic.RAFT_SENDER_RELAY_OFFER_LOG.calOperationCostTimeFromStart(operationStartTime);
   }
 
   public void offer(AppendEntriesRequest request, List<Node> receivers) {
@@ -87,6 +90,7 @@ public class LogRelay {
 
     @Override
     public void run() {
+      String baseName = Thread.currentThread().getName();
       while (!Thread.interrupted()) {
         RelayEntry relayEntry;
         synchronized (entryHeap) {
@@ -102,9 +106,25 @@ public class LogRelay {
           }
         }
 
+        logger.debug("Relaying {}", relayEntry);
+
         if (relayEntry.singleRequest != null) {
+          Thread.currentThread()
+              .setName(
+                  baseName
+                      + "-"
+                      + (relayEntry.singleRequest.prevLogIndex + 1)
+                      + "-"
+                      + relayEntry.receivers);
           raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
         } else if (relayEntry.batchRequest != null) {
+          Thread.currentThread()
+              .setName(
+                  baseName
+                      + "-"
+                      + (relayEntry.batchRequest.prevLogIndex + 1)
+                      + "-"
+                      + relayEntry.receivers);
           raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
         }
 
@@ -138,6 +158,13 @@ public class LogRelay {
       return 0;
     }
 
+    @Override
+    public String toString() {
+      long index = singleRequest != null ? singleRequest.prevLogIndex : batchRequest.prevLogIndex;
+      index++;
+      return "RelayEntry{" + index + "," + receivers + "}";
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -153,7 +180,7 @@ public class LogRelay {
 
     @Override
     public int hashCode() {
-      return Objects.hash(singleRequest);
+      return Objects.hash(singleRequest, batchRequest);
     }
 
     @Override
@@ -161,4 +188,12 @@ public class LogRelay {
       return Long.compare(this.getIndex(), o.getIndex());
     }
   }
+
+  public RelayEntry first() {
+    try {
+      return entryHeap.isEmpty() ? null : entryHeap.first();
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 59bc40bf25..7b1dde2d3e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -20,14 +20,20 @@
 package org.apache.iotdb.cluster.log;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 
 public class VotingLogList {
 
+  private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
+
   private List<VotingLog> logList = new ArrayList<>();
   private volatile long currTerm = -1;
   private int quorumSize;
@@ -63,6 +69,7 @@ public class VotingLogList {
    * @return the lastly removed entry if any.
    */
   public void onStronglyAccept(long index, long term, int acceptingNodeId) {
+    logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNodeId);
     int lastEntryIndexToCommit = -1;
 
     List<VotingLog> acceptedLogs;
@@ -93,6 +100,15 @@ public class VotingLogList {
     }
 
     if (lastEntryIndexToCommit != -1) {
+      Log lastLog = acceptedLogs.get(acceptedLogs.size() - 1).log;
+      synchronized (member.getLogManager()) {
+        try {
+          member.getLogManager().commitTo(lastLog.getCurrLogIndex());
+        } catch (LogExecutionException e) {
+          logger.error("Fail to commit {}", lastLog, e);
+        }
+      }
+
       for (VotingLog acceptedLog : acceptedLogs) {
         synchronized (acceptedLog) {
           acceptedLog.acceptedTime.set(System.nanoTime());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
index d2d9056a16..a08b542f93 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.cluster.log.appender;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -30,6 +32,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.Buffer;
 import java.util.List;
 
 /**
@@ -57,24 +60,28 @@ public class BlockingLogAppender implements LogAppender {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  public AppendEntryResult appendEntry(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
-    long resp = checkPrevLogIndex(prevLogIndex);
+  public AppendEntryResult appendEntry(AppendEntryRequest request, Log log) {
+    long resp = checkPrevLogIndex(request.prevLogIndex);
     if (resp != Response.RESPONSE_AGREE) {
       return new AppendEntryResult(resp).setHeader(member.getHeader());
     }
 
-    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     long success;
     AppendEntryResult result = new AppendEntryResult();
     synchronized (logManager) {
-      success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+      success =
+          logManager.maybeAppend(
+              request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
       if (success != -1) {
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
+        if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty()) {
+          request.entry.rewind();
+          member.getLogRelay().offer(request, request.subReceivers);
+        }
       }
     }
-    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+
     if (success != -1) {
       logger.debug("{} append a new log {}", member.getName(), log);
       result.status = Response.RESPONSE_STRONG_ACCEPT;
@@ -82,6 +89,7 @@ public class BlockingLogAppender implements LogAppender {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
     }
+    result.setHeader(request.getHeader());
     return result;
   }
 
@@ -134,36 +142,43 @@ public class BlockingLogAppender implements LogAppender {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  public AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+  public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs) {
     logger.debug(
         "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
         member.getName(),
-        prevLogIndex,
-        prevLogTerm,
-        leaderCommit);
+        request.prevLogIndex,
+        request.prevLogTerm,
+        request.leaderCommit);
     if (logs.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(member.getHeader());
     }
 
-    long resp = checkPrevLogIndex(prevLogIndex);
+    long resp = checkPrevLogIndex(request.prevLogIndex);
     if (resp != Response.RESPONSE_AGREE) {
       return new AppendEntryResult(resp).setHeader(member.getHeader());
     }
 
     AppendEntryResult result = new AppendEntryResult();
     synchronized (logManager) {
-      long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
-      resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
-      Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+      resp =
+          logManager.maybeAppend(
+              request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
       if (resp != -1) {
         if (logger.isDebugEnabled()) {
           logger.debug(
-              "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
+              "{} append a new log list {}, commit to {}",
+              member.getName(),
+              logs,
+              request.leaderCommit);
         }
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
+
+        if (request.isSetSubReceivers()) {
+          request.entries.forEach(Buffer::rewind);
+          member.getLogRelay().offer(request, request.subReceivers);
+        }
       } else {
         // the incoming log points to an illegal position, reject it
         result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
index 01f16daf09..ca0eea3bf0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.cluster.log.appender;
 
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 
 import java.util.List;
@@ -30,10 +32,9 @@ import java.util.List;
  */
 public interface LogAppender {
 
-  AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs);
+  AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs);
 
-  AppendEntryResult appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log);
+  AppendEntryResult appendEntry(AppendEntryRequest request, Log log);
 
   void reset();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index f2a19e8039..1ab08449bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -22,17 +22,24 @@ package org.apache.iotdb.cluster.log.appender;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.Buffer;
 import java.util.Arrays;
 import java.util.List;
 
 public class SlidingWindowLogAppender implements LogAppender {
 
+  private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLogAppender.class);
+
   private int windowCapacity = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
   private int windowLength = 0;
   private Log[] logWindow = new Log[windowCapacity];
@@ -115,25 +122,43 @@ public class SlidingWindowLogAppender implements LogAppender {
 
     // flush [0, flushPos)
     List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+    logger.debug(
+        "Flushing {} entries to log, first {}, last {}",
+        logs.size(),
+        logs.get(0),
+        logs.get(logs.size() - 1));
     long success =
         logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
     if (success != -1) {
-      System.arraycopy(logWindow, flushPos, logWindow, 0, windowCapacity - flushPos);
-      System.arraycopy(prevTerms, flushPos, prevTerms, 0, windowCapacity - flushPos);
-      for (int i = 1; i <= flushPos; i++) {
-        logWindow[windowCapacity - i] = null;
-      }
+      moveWindowRightward(flushPos);
     }
-    firstPosPrevIndex = logManager.getLastLogIndex();
     result.status = Response.RESPONSE_STRONG_ACCEPT;
     result.setLastLogIndex(firstPosPrevIndex);
     result.setLastLogTerm(logManager.getLastLogTerm());
     return success;
   }
 
+  private void moveWindowRightward(int step) {
+    System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
+    System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
+    for (int i = 1; i <= step; i++) {
+      logWindow[windowCapacity - i] = null;
+    }
+    firstPosPrevIndex = logManager.getLastLogIndex();
+  }
+
+  private void moveWindowLeftward(int step) {
+    int length = Math.max(windowCapacity - step, 0);
+    System.arraycopy(logWindow, 0, logWindow, step, length);
+    System.arraycopy(prevTerms, 0, prevTerms, step, length);
+    for (int i = 0; i < length; i++) {
+      logWindow[i] = null;
+    }
+    firstPosPrevIndex = logManager.getLastLogIndex();
+  }
+
   @Override
-  public AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+  public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs) {
     if (logs.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE)
           .setHeader(member.getPartitionGroup().getHeader());
@@ -141,24 +166,56 @@ public class SlidingWindowLogAppender implements LogAppender {
 
     AppendEntryResult result = null;
     for (Log log : logs) {
-      result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
+      result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
 
       if (result.status != Response.RESPONSE_AGREE
           && result.status != Response.RESPONSE_STRONG_ACCEPT
           && result.status != Response.RESPONSE_WEAK_ACCEPT) {
         return result;
       }
-      prevLogIndex = log.getCurrLogIndex();
-      prevLogTerm = log.getCurrLogTerm();
+      request.prevLogIndex = log.getCurrLogIndex();
+      request.prevLogTerm = log.getCurrLogTerm();
+    }
+    if (request.isSetSubReceivers()) {
+      request.entries.forEach(Buffer::rewind);
+      member.getLogRelay().offer(request, request.subReceivers);
     }
 
     return result;
   }
 
   @Override
-  public AppendEntryResult appendEntry(
+  public AppendEntryResult appendEntry(AppendEntryRequest request, Log log) {
+
+    AppendEntryResult result = null;
+    long start = System.currentTimeMillis();
+    long retryTime = 0;
+    long maxRetry = 10000;
+    while (result == null
+        || result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) {
+      result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
+      retryTime = System.currentTimeMillis() - start;
+      if (result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+    result.setHeader(request.getHeader());
+
+    if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty()) {
+      request.entry.rewind();
+      member.getLogRelay().offer(request, request.subReceivers);
+    }
+
+    return result;
+  }
+
+  private AppendEntryResult appendEntry(
       long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
-    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     long appendedPos = 0;
 
     AppendEntryResult result = new AppendEntryResult();
@@ -170,6 +227,7 @@ public class SlidingWindowLogAppender implements LogAppender {
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
+        moveWindowLeftward(-windowPos);
       } else if (windowPos < windowCapacity) {
         // the new entry falls into the window
         logWindow[windowPos] = log;
@@ -186,14 +244,12 @@ public class SlidingWindowLogAppender implements LogAppender {
 
         Statistic.RAFT_WINDOW_LENGTH.add(windowLength);
       } else {
-        Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
         result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
         result.setHeader(member.getPartitionGroup().getHeader());
         return result;
       }
     }
 
-    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
     if (appendedPos == -1) {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index 1f3882b398..5d5793d350 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.log.logtypes;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.slf4j.Logger;
@@ -46,6 +47,16 @@ public class PhysicalPlanLog extends Log {
     this.plan = plan;
   }
 
+  @Override
+  public int getDefaultBufferSize() {
+    if (plan instanceof DummyPlan) {
+      int workloadSize =
+          ((DummyPlan) plan).getWorkload() == null ? 0 : ((DummyPlan) plan).getWorkload().length;
+      return workloadSize + 512;
+    }
+    return DEFAULT_BUFFER_SIZE;
+  }
+
   @Override
   public ByteBuffer serialize() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 4b2309281d..406245582c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -682,8 +682,15 @@ public abstract class RaftLogManager {
           "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
           unappliedLogSize);
       try {
-        Thread.sleep(
-            unappliedLogSize - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+        synchronized (changeApplyCommitIndexCond) {
+          changeApplyCommitIndexCond.wait(
+              Math.min(
+                  (unappliedLogSize
+                              - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem())
+                          / 10
+                      + 1,
+                  1000));
+        }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -954,7 +961,7 @@ public abstract class RaftLogManager {
   }
 
   public void checkAppliedLogIndex() {
-    while (!Thread.currentThread().isInterrupted()) {
+    while (!Thread.interrupted()) {
       try {
         doCheckAppliedLogIndex();
       } catch (IndexOutOfBoundsException e) {
@@ -976,7 +983,7 @@ public abstract class RaftLogManager {
           || nextToCheckIndex > getCommittedEntryManager().getLastIndex()
           || (blockAppliedCommitIndex > 0 && blockAppliedCommitIndex < nextToCheckIndex)) {
         // avoid spinning
-        Thread.sleep(0);
+        Thread.sleep(100);
         return;
       }
       Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
@@ -992,7 +999,7 @@ public abstract class RaftLogManager {
         synchronized (log) {
           while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
             // wait until the log is applied or a newer snapshot is installed
-            log.wait(1);
+            log.wait(10);
           }
         }
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
index 21ec9f92ad..8353dda742 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
@@ -37,7 +37,8 @@ public class QueryCoordinator {
   private static final QueryCoordinator INSTANCE = new QueryCoordinator();
   private static final NodeStatusManager STATUS_MANAGER = NodeStatusManager.getINSTANCE();
 
-  private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
+  private final Comparator<Node> nodeComparator =
+      Comparator.comparingLong(o -> getNodeStatus(o).getStatus().fanoutRequestNum);
 
   private QueryCoordinator() {
     // singleton class
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 7e703a6dc1..7f8c7df4f4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -56,8 +55,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   protected AtomicLong receiverTerm;
   protected VotingLog log;
   protected AtomicBoolean leaderShipStale;
-  protected Node receiver;
-  protected Peer peer;
+  protected Node directReceiver;
   protected int quorumSize;
 
   // nano start time when the send begins
@@ -79,8 +77,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       // the request already failed
       return;
     }
+
+    Node trueReceiver = response.isSetReceiver() ? response.receiver : directReceiver;
+
     logger.debug(
-        "{}: Append response {} from {} for log {}", member.getName(), response, receiver, log);
+        "{}: Append response {} from {} for log {}", member.getName(), response, trueReceiver, log);
     if (leaderShipStale.get()) {
       // someone has rejected this log because the leadership is stale
       return;
@@ -94,8 +95,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
           .onStronglyAccept(
               log.getLog().getCurrLogIndex(),
               log.getLog().getCurrLogTerm(),
-              receiver.nodeIdentifier);
-      peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
+              trueReceiver.nodeIdentifier);
+      member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex);
     } else if (resp > 0) {
       // a response > 0 is the follower's term
       // the leader ship is stale, wait for the new leader's heartbeat
@@ -103,7 +104,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       logger.debug(
           "{}: Received a rejection from {} because term is stale: {}/{}, log: {}",
           member.getName(),
-          receiver,
+          trueReceiver,
           prevReceiverTerm,
           resp,
           log);
@@ -120,14 +121,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (log) {
-        log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
+        log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier);
         if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
             >= quorumSize) {
           log.acceptedTime.set(System.nanoTime());
-          if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
-            member.removeAppendLogHandler(
-                new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
-          }
         }
         log.notifyAll();
       }
@@ -135,13 +132,20 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       // e.g., Response.RESPONSE_LOG_MISMATCH
       if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) {
         logger.debug(
-            "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+            "{}: The log {} is rejected by {} because: {}",
+            member.getName(),
+            log,
+            trueReceiver,
+            resp);
       } else {
         logger.warn(
-            "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+            "{}: The log {} is rejected by {} because: {}",
+            member.getName(),
+            log,
+            trueReceiver,
+            resp);
+        onFail(trueReceiver);
       }
-
-      onFail();
     }
     // rejected because the receiver's logs are stale or the receiver has no cluster info, just
     // wait for the heartbeat to handle
@@ -154,17 +158,18 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
           "{}: Cannot append log {}: cannot connect to {}: {}",
           member.getName(),
           log,
-          receiver,
+          directReceiver,
           exception.getMessage());
     } else {
-      logger.warn("{}: Cannot append log {} to {}", member.getName(), log, receiver, exception);
+      logger.warn(
+          "{}: Cannot append log {} to {}", member.getName(), log, directReceiver, exception);
     }
-    onFail();
+    onFail(directReceiver);
   }
 
-  private void onFail() {
+  private void onFail(Node trueReceiver) {
     synchronized (log) {
-      log.getFailedNodeIds().add(receiver.nodeIdentifier);
+      log.getFailedNodeIds().add(trueReceiver.nodeIdentifier);
       if (log.getFailedNodeIds().size() > quorumSize) {
         // quorum members have failed, there is no need to wait for others
         log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
@@ -189,12 +194,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     this.leaderShipStale = leaderShipStale;
   }
 
-  public void setPeer(Peer peer) {
-    this.peer = peer;
-  }
-
-  public void setReceiver(Node follower) {
-    this.receiver = follower;
+  public void setDirectReceiver(Node follower) {
+    this.directReceiver = follower;
   }
 
   public void setReceiverTerm(AtomicLong receiverTerm) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index af4fbc5445..f89cd56241 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -53,8 +53,8 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
   @Override
   public void onComplete(HeartBeatResponse resp) {
     long followerTerm = resp.getTerm();
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (logger.isTraceEnabled()) {
+      logger.trace(
           "{}: Received a heartbeat response {} for last log index {}",
           memberName,
           followerTerm,
@@ -89,8 +89,8 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
     long lastLogTerm = resp.getLastLogTerm();
     long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
     long localLastLogTerm = localMember.getLogManager().getLastLogTerm();
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (logger.isTraceEnabled()) {
+      logger.trace(
           "{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
           memberName,
           follower,
@@ -100,11 +100,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
           localLastLogTerm);
     }
 
-    Peer peer =
-        localMember
-            .getPeerMap()
-            .computeIfAbsent(
-                follower, k -> new Peer(localMember.getLogManager().getLastLogIndex()));
+    Peer peer = localMember.getPeer(follower);
     if (!localMember.getLogManager().isLogUpToDate(lastLogTerm, lastLogIdx)
         || !localMember.getLogManager().matchTerm(lastLogTerm, lastLogIdx)) {
       // the follower is not up-to-date
@@ -119,7 +115,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
       if (lastLogIdx == peer.getLastHeartBeatIndex()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 10) {
+        if (inconsistentNum >= 1000) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index c5032966ea..acc34cf6c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -101,7 +101,7 @@ public class HeartbeatThread implements Runnable {
               localMember.setCharacter(NodeCharacter.ELECTOR);
               localMember.setLeader(ClusterConstant.EMPTY_NODE);
             } else {
-              logger.debug(
+              logger.trace(
                   "{}: Heartbeat from leader {} is still valid",
                   memberName,
                   localMember.getLeader());
@@ -158,7 +158,7 @@ public class HeartbeatThread implements Runnable {
   @SuppressWarnings("java:S2445")
   private void sendHeartbeats(Collection<Node> nodes) {
     if (logger.isDebugEnabled()) {
-      logger.debug(
+      logger.trace(
           "{}: Send heartbeat to {} followers, commit log index = {}",
           memberName,
           nodes.size() - 1,
@@ -200,7 +200,7 @@ public class HeartbeatThread implements Runnable {
     if (client != null) {
       // connecting to the local node results in a null
       try {
-        logger.debug("{}: Sending heartbeat to {}", memberName, node);
+        logger.trace("{}: Sending heartbeat to {}", memberName, node);
         client.sendHeartbeat(request, new HeartbeatHandler(localMember, node));
       } catch (Exception e) {
         logger.warn("{}: Cannot send heart beat to node {}", memberName, node, e);
@@ -231,7 +231,7 @@ public class HeartbeatThread implements Runnable {
               Client client = localMember.getSyncHeartbeatClient(node);
               if (client != null) {
                 try {
-                  logger.debug("{}: Sending heartbeat to {}", memberName, node);
+                  logger.trace("{}: Sending heartbeat to {}", memberName, node);
                   HeartBeatResponse heartBeatResponse = client.sendHeartbeat(req);
                   heartbeatHandler.onComplete(heartBeatResponse);
                 } catch (TTransportException e) {
@@ -266,6 +266,16 @@ public class HeartbeatThread implements Runnable {
       logger.info("{}: Winning the election because the node is the only node.", memberName);
     }
 
+    if (!ClusterUtils.isNodeEquals(
+        localMember.getThisNode(), localMember.getPartitionGroup().getHeader().node)) {
+      long electionWait = getElectionRandomWaitMs();
+      logger.info(
+          "{}: Sleep {}ms before the first election as this node is not the preferred " + "leader",
+          memberName,
+          electionWait);
+      Thread.sleep(electionWait);
+    }
+
     // the election goes on until this node becomes a follower or a leader
     while (localMember.getCharacter() == NodeCharacter.ELECTOR) {
       startElection();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index c43401af57..3114182960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -1006,7 +1006,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
         NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
         lastHeartbeatReceivedTime,
         prevLastLogIndex,
-        logManager.getMaxHaveAppliedCommitIndex());
+        logManager.getMaxHaveAppliedCommitIndex(),
+        logRelay != null ? logRelay.first() : null);
   }
 
   @TestOnly
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index e07452cbd1..637ce44542 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1800,7 +1800,8 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
         readOnly,
         lastHeartbeatReceivedTime,
         prevLastLogIndex,
-        logManager.getMaxHaveAppliedCommitIndex());
+        logManager.getMaxHaveAppliedCommitIndex(),
+        logRelay != null ? logRelay.first() : null);
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4df17e12e9..db75f3b1c2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogAckSender;
 import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
@@ -72,6 +73,7 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -110,7 +112,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
-import java.nio.Buffer;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -132,6 +133,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
 
 /**
  * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
@@ -287,6 +289,10 @@ public abstract class RaftMember implements RaftMemberMBean {
 
   protected LogRelay logRelay;
 
+  protected LogAckSender ackSender;
+
+  private ThreadLocal<String> threadBaseName = new ThreadLocal<>();
+
   protected RaftMember() {}
 
   protected RaftMember(String name, ClientManager clientManager) {
@@ -325,6 +331,8 @@ public abstract class RaftMember implements RaftMemberMBean {
             getName() + "-SerialToParallel");
     commitLogPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("RaftCommitLog");
 
+    ackSender = new LogAckSender(this);
+
     if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
       logRelay = new LogRelay(this);
     }
@@ -416,6 +424,10 @@ public abstract class RaftMember implements RaftMemberMBean {
     if (logRelay != null) {
       logRelay.stop();
     }
+    if (ackSender != null) {
+      ackSender.stop();
+    }
+
     leader.set(ClusterConstant.EMPTY_NODE);
     catchUpService = null;
     heartBeatService = null;
@@ -572,16 +584,22 @@ public abstract class RaftMember implements RaftMemberMBean {
     return Response.RESPONSE_AGREE;
   }
 
+  private String getThreadBaseName() {
+    if (threadBaseName.get() == null) {
+      threadBaseName.set(Thread.currentThread().getName());
+    }
+    return threadBaseName.get();
+  }
   /**
    * Process an AppendEntryRequest. First check the term of the leader, then parse the log and
    * finally see if we can find a position to append the log.
    */
   public AppendEntryResult appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+    long operationStartTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
+    Thread.currentThread()
+        .setName(getThreadBaseName() + "-appending-" + (request.prevLogIndex + 1));
     AppendEntryResult result = appendEntryInternal(request);
-    if (request.isSetSubReceivers()) {
-      request.entry.rewind();
-      logRelay.offer(request, request.subReceivers);
-    }
+    Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(operationStartTime);
     return result;
   }
 
@@ -600,54 +618,19 @@ public abstract class RaftMember implements RaftMemberMBean {
     log.setByteSize(logByteSize);
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
-    AppendEntryResult result =
-        getLogAppender()
-            .appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
-    result.setHeader(request.getHeader());
+    long appendStartTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    AppendEntryResult result = getLogAppender().appendEntry(request, log);
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
 
     logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result.status);
 
     if (!request.isFromLeader) {
-      appendAckLeader(request.leader, log, result.status);
-      Statistic.RAFT_SEND_RELAY_ACK.add(1);
+      ackSender.offer(request.leader, log.getCurrLogIndex(), log.getCurrLogTerm(), result.status);
     }
 
     return result;
   }
 
-  private void appendAckLeader(Node leader, Log log, long response) {
-    AppendEntryResult result = new AppendEntryResult();
-    result.setLastLogIndex(log.getCurrLogIndex());
-    result.setLastLogTerm(log.getCurrLogTerm());
-    result.status = response;
-    result.setHeader(getHeader());
-
-    Client syncClient = null;
-    try {
-      if (config.isUseAsyncServer()) {
-        GenericHandler<Void> handler = new GenericHandler<>(leader, null);
-        getAsyncClient(leader).acknowledgeAppendEntry(result, handler);
-      } else {
-        syncClient = getSyncClient(leader);
-        syncClient.acknowledgeAppendEntry(result);
-      }
-    } catch (TException e) {
-      logger.warn("Cannot send ack of {} to leader {}", log, leader, e);
-    } finally {
-      if (syncClient != null) {
-        ClientUtils.putBackSyncClient(syncClient);
-      }
-    }
-  }
-
-  public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers)
-      throws UnknownLogTypeException {
-    AppendEntryResult result = appendEntry(request);
-    request.entry.rewind();
-    logRelay.offer(request, subFollowers);
-    return result;
-  }
-
   public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
     request.setIsFromLeader(false);
     request.setSubReceiversIsSet(false);
@@ -658,8 +641,19 @@ public abstract class RaftMember implements RaftMemberMBean {
           getAsyncClient(subFollower)
               .appendEntry(request, new IndirectAppendHandler(subFollower, request));
         } else {
+          long operationStartTime = Statistic.RAFT_SENDER_RELAY_LOG.getOperationStartTime();
           syncClient = getSyncClient(subFollower);
+
+          int concurrentSender = concurrentSenderNum.incrementAndGet();
+          Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
           syncClient.appendEntry(request);
+          concurrentSenderNum.decrementAndGet();
+
+          long sendLogTime =
+              Statistic.RAFT_SENDER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
+          NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
+          nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+          nodeStatus.getSendEntryNum().incrementAndGet();
         }
       } catch (TException e) {
         logger.error("Cannot send {} to {}", request, subFollower, e);
@@ -697,12 +691,7 @@ public abstract class RaftMember implements RaftMemberMBean {
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   public AppendEntryResult appendEntries(AppendEntriesRequest request)
       throws UnknownLogTypeException {
-    AppendEntryResult result = appendEntriesInternal(request);
-    if (request.isSetSubReceivers()) {
-      request.entries.forEach(Buffer::rewind);
-      logRelay.offer(request, request.subReceivers);
-    }
-    return result;
+    return appendEntriesInternal(request);
   }
 
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
@@ -736,9 +725,10 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
-    response =
-        getLogAppender()
-            .appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+    long appendStartTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    response = getLogAppender().appendEntries(request, logs);
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
+
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{} AppendEntriesRequest of log size {} completed with result {}",
@@ -750,7 +740,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     if (!request.isFromLeader) {
       // TODO: use batch ack
       for (Log log : logs) {
-        appendAckLeader(request.leader, log, response.status);
+        ackSender.offer(
+            request.leader, log.getCurrLogIndex(), log.getCurrLogTerm(), response.status);
       }
       Statistic.RAFT_SEND_RELAY_ACK.add(1);
     }
@@ -771,12 +762,11 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer,
       int quorumSize) {
     AsyncClient client = getSendLogAsyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
-          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
+          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
       try {
         client.appendEntry(request, handler);
         logger.debug("{} sending a log to {}: {}", name, node, log);
@@ -1414,8 +1404,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
   }
 
-  public Map<Node, Peer> getPeerMap() {
-    return peerMap;
+  public Peer getPeer(Node node) {
+    return peerMap.computeIfAbsent(node, r -> new Peer(getLogManager().getLastLogIndex()));
   }
 
   /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
@@ -1752,9 +1742,10 @@ public abstract class RaftMember implements RaftMemberMBean {
     int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
     long nextTimeToPrint = 5000;
 
-    long waitStart = System.currentTimeMillis();
+    long waitStart = System.nanoTime();
     long alreadyWait = 0;
 
+    String threadBaseName = Thread.currentThread().getName();
     synchronized (log) {
       while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
           || stronglyAcceptedNodeNum < quorumSize
@@ -1769,7 +1760,16 @@ public abstract class RaftMember implements RaftMemberMBean {
           Thread.currentThread().interrupt();
           logger.warn("Unexpected interruption when sending a log", e);
         }
-        alreadyWait = System.currentTimeMillis() - waitStart;
+        Thread.currentThread()
+            .setName(
+                threadBaseName
+                    + "-waiting-"
+                    + log.getLog().getCurrLogIndex()
+                    + "-"
+                    + log.getStronglyAcceptedNodeIds()
+                    + "-"
+                    + log.getWeaklyAcceptedNodeIds());
+        alreadyWait = (System.nanoTime() - waitStart) / 1000000;
         if (alreadyWait > nextTimeToPrint) {
           logger.info(
               "Still not receive enough votes for {}, strongly accepted {}, weakly "
@@ -1780,6 +1780,7 @@ public abstract class RaftMember implements RaftMemberMBean {
               log.getStronglyAcceptedNodeIds(),
               log.getWeaklyAcceptedNodeIds(),
               votingLogList.size(),
+              alreadyWait,
               (log.getLog().getSequenceStartTime() - waitStart) / 1000000,
               (log.getLog().getEnqueueTime() - waitStart) / 1000000,
               (log.acceptedTime.get() - waitStart) / 1000000);
@@ -1790,6 +1791,7 @@ public abstract class RaftMember implements RaftMemberMBean {
         totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
       }
     }
+    Thread.currentThread().setName(threadBaseName);
 
     if (alreadyWait > 15000) {
       logger.info(
@@ -1820,6 +1822,7 @@ public abstract class RaftMember implements RaftMemberMBean {
                 || (totalAccepted < quorumSize)
                 || votingLogList.size() > config.getMaxNumOfLogsInMem())
             && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE))) {
+
       waitAppendResultLoop(log, quorumSize);
     }
     stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
@@ -2177,9 +2180,9 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
 
     if (config.isUseAsyncServer()) {
-      sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+      sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize);
     } else {
-      sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+      sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize);
     }
   }
 
@@ -2217,12 +2220,11 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer,
       int quorumSize) {
     Client client = getSyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
-          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
+          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -2248,14 +2250,12 @@ public abstract class RaftMember implements RaftMemberMBean {
       Node node,
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
-      Peer peer,
       int quorumSize) {
     AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
-    handler.setReceiver(node);
+    handler.setDirectReceiver(node);
     handler.setLeaderShipStale(leaderShipStale);
     handler.setLog(log);
     handler.setMember(this);
-    handler.setPeer(peer);
     handler.setReceiverTerm(newLeaderTerm);
     handler.setQuorumSize(quorumSize);
     if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
@@ -2283,26 +2283,22 @@ public abstract class RaftMember implements RaftMemberMBean {
   private long checkRequestTerm(long leaderTerm, Node leader) {
     long localTerm;
 
-    synchronized (logManager) {
-      // if the request comes before the heartbeat arrives, the local term may be smaller than the
-      // leader term
-      localTerm = term.get();
-      if (leaderTerm < localTerm) {
-        logger.debug(
-            "{} rejected the AppendEntriesRequest for term: {}/{}", name, leaderTerm, localTerm);
-        return localTerm;
+    // if the request comes before the heartbeat arrives, the local term may be smaller than the
+    // leader term
+    localTerm = term.get();
+    if (leaderTerm < localTerm) {
+      logger.debug(
+          "{} rejected the AppendEntriesRequest for term: {}/{}", name, leaderTerm, localTerm);
+      return localTerm;
+    } else {
+      if (leaderTerm > localTerm) {
+        stepDown(leaderTerm, true);
       } else {
-        if (leaderTerm > localTerm) {
-          stepDown(leaderTerm, true);
-        } else {
-          lastHeartbeatReceivedTime = System.currentTimeMillis();
-        }
-        setLeader(leader);
-        if (character != NodeCharacter.FOLLOWER) {
-          term.notifyAll();
-        }
+        lastHeartbeatReceivedTime = System.currentTimeMillis();
       }
+      setLeader(leader);
     }
+
     logger.debug("{} accepted the AppendEntryRequest for term: {}", name, localTerm);
     return Response.RESPONSE_AGREE;
   }
@@ -2324,17 +2320,19 @@ public abstract class RaftMember implements RaftMemberMBean {
    * @param ack acknowledgement from an indirect receiver.
    */
   public void acknowledgeAppendLog(AppendEntryResult ack) {
+    long operationStartTime = Statistic.RAFT_RECEIVER_HANDLE_APPEND_ACK.getOperationStartTime();
     AppendNodeEntryHandler appendNodeEntryHandler =
         sentLogHandlers.get(new Pair<>(ack.lastLogIndex, ack.lastLogTerm));
+    Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
     if (appendNodeEntryHandler != null) {
       appendNodeEntryHandler.onComplete(ack);
-      Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
     }
+    Statistic.RAFT_RECEIVER_HANDLE_APPEND_ACK.calOperationCostTimeFromStart(operationStartTime);
   }
 
   public void registerAppendLogHandler(
       Pair<Long, Long> indexTerm, AppendNodeEntryHandler appendNodeEntryHandler) {
-    sentLogHandlers.put(indexTerm, appendNodeEntryHandler);
+    sentLogHandlers.putIfAbsent(indexTerm, appendNodeEntryHandler);
   }
 
   public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
@@ -2404,4 +2402,8 @@ public abstract class RaftMember implements RaftMemberMBean {
   public void setClientManager(ClientManager clientManager) {
     this.clientManager = clientManager;
   }
+
+  public LogRelay getLogRelay() {
+    return logRelay;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index 10538be31e..cc284f0bb4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.monitor;
 
+import org.apache.iotdb.cluster.log.LogRelay.RelayEntry;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -78,6 +79,7 @@ public class NodeReport {
     long lastHeartbeatReceivedTime;
     long prevLastLogIndex;
     long maxAppliedLogIndex;
+    RelayEntry nextToRelay;
 
     RaftMemberReport(
         NodeCharacter character,
@@ -90,7 +92,8 @@ public class NodeReport {
         boolean isReadOnly,
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
-        long maxAppliedLogIndex) {
+        long maxAppliedLogIndex,
+        RelayEntry nextToRelay) {
       this.character = character;
       this.leader = leader;
       this.term = term;
@@ -102,6 +105,7 @@ public class NodeReport {
       this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
       this.prevLastLogIndex = prevLastLogIndex;
       this.maxAppliedLogIndex = maxAppliedLogIndex;
+      this.nextToRelay = nextToRelay;
     }
   }
 
@@ -119,7 +123,8 @@ public class NodeReport {
         boolean isReadOnly,
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
-        long maxAppliedLogIndex) {
+        long maxAppliedLogIndex,
+        RelayEntry nextToRelay) {
       super(
           character,
           leader,
@@ -131,7 +136,8 @@ public class NodeReport {
           isReadOnly,
           lastHeartbeatReceivedTime,
           prevLastLogIndex,
-          maxAppliedLogIndex);
+          maxAppliedLogIndex,
+          nextToRelay);
     }
 
     @Override
@@ -212,7 +218,8 @@ public class NodeReport {
         long headerLatency,
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
-        long maxAppliedLogIndex) {
+        long maxAppliedLogIndex,
+        RelayEntry nextToRelay) {
       super(
           character,
           leader,
@@ -224,7 +231,8 @@ public class NodeReport {
           isReadOnly,
           lastHeartbeatReceivedTime,
           prevLastLogIndex,
-          maxAppliedLogIndex);
+          maxAppliedLogIndex,
+          nextToRelay);
       this.header = header;
       this.headerLatency = headerLatency;
     }
@@ -254,6 +262,8 @@ public class NodeReport {
           + maxAppliedLogIndex
           + ", readOnly="
           + isReadOnly
+          + ", nextToRelay="
+          + nextToRelay
           + ", headerLatency="
           + headerLatency
           + "ns"
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 4524667b9d..395048eb80 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.server.monitor;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** NodeStatus contains the last-known spec and load of a node in the cluster. */
 @SuppressWarnings("java:S1135")
@@ -53,6 +54,9 @@ public class NodeStatus implements Comparable<NodeStatus> {
   // its lastDeactivatedTime is too old.
   private long lastDeactivatedTime;
 
+  private AtomicLong sendEntryNum = new AtomicLong();
+  private AtomicLong sendEntryLatencySum = new AtomicLong();
+
   // TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
   @Override
   public int compareTo(NodeStatus o) {
@@ -115,4 +119,34 @@ public class NodeStatus implements Comparable<NodeStatus> {
     return isActivated
         || (System.currentTimeMillis() - lastDeactivatedTime) > DEACTIVATION_VALID_INTERVAL_MS;
   }
+
+  public AtomicLong getSendEntryNum() {
+    return sendEntryNum;
+  }
+
+  public AtomicLong getSendEntryLatencySum() {
+    return sendEntryLatencySum;
+  }
+
+  @Override
+  public String toString() {
+    return "NodeStatus{"
+        + "status="
+        + status
+        + ", lastUpdateTime="
+        + lastUpdateTime
+        + ", lastResponseLatency="
+        + lastResponseLatency
+        + ", isActivated="
+        + isActivated
+        + ", lastDeactivatedTime="
+        + lastDeactivatedTime
+        + ", sendEntryNum="
+        + sendEntryNum
+        + ", sendEntryLatencySum="
+        + sendEntryLatencySum
+        + ", sendEntryLatencyAvg="
+        + (sendEntryLatencySum.get() * 1.0 / sendEntryNum.get())
+        + '}';
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index 6989bf97a0..eb79a84d2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -46,7 +47,7 @@ public class NodeStatusManager {
 
   private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
   // a status is considered stale if it is older than one minute and should be updated
-  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
+  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 10L;
   private static final NodeStatusManager INSTANCE = new NodeStatusManager();
 
   private MetaGroupMember metaGroupMember;
@@ -147,7 +148,7 @@ public class NodeStatusManager {
     } else {
       nodeStatus.setLastResponseLatency(Long.MAX_VALUE);
     }
-    logger.info(
+    logger.debug(
         "NodeStatus of {} is updated, status: {}, response time: {}",
         node,
         nodeStatus.getStatus(),
@@ -180,4 +181,8 @@ public class NodeStatusManager {
   public boolean isActivated(Node node) {
     return getNodeStatus(node, false).isActivated();
   }
+
+  public Map<Node, NodeStatus> getNodeStatusMap() {
+    return Collections.unmodifiableMap(nodeStatusMap);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 4169935f64..13b8ec7bc3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -146,6 +146,16 @@ public class Timer {
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_SEND_LOG(
         RAFT_MEMBER_SENDER, "send log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_HANDLE_SEND_RESULT(
+        RAFT_MEMBER_SENDER,
+        "handle send log result",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_RELAY_OFFER_LOG(
+        RAFT_MEMBER_SENDER, "relay offer log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_RELAY_LOG(
+        RAFT_MEMBER_SENDER, "relay log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_VOTE_COUNTER(
         RAFT_MEMBER_SENDER,
         "wait for votes",
@@ -237,14 +247,26 @@ public class Timer {
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_APPEND_ENTRY(
         RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
-    RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
-    // log dispatcher
-    LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
-        LOG_DISPATCHER,
-        "from create to queue",
+    RAFT_RECEIVER_APPEND_ACK(
+        RAFT_MEMBER_RECEIVER,
+        "ack append entrys",
         TIME_SCALE,
         true,
-        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_APPEND_ENTRY_FULL(
+        RAFT_MEMBER_RECEIVER,
+        "append entrys(full)",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_HANDLE_APPEND_ACK(
+        RAFT_MEMBER_SENDER,
+        "handle append entrys ack",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
+    // log dispatcher
     LOG_DISPATCHER_LOG_ENQUEUE(
         LOG_DISPATCHER,
         "enqueue",
@@ -265,6 +287,24 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
+        LOG_DISPATCHER,
+        "from create to queue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE(
+        LOG_DISPATCHER,
+        "from create to dequeue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_SENDING(
+        LOG_DISPATCHER,
+        "from create to sending",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_CREATE_TO_SENT(
         LOG_DISPATCHER,
         "from create to sent",
@@ -288,7 +328,9 @@ public class Timer {
     RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
-    RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
+    RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
+    RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
+    RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT);
 
     String className;
     String blockName;
@@ -335,11 +377,13 @@ public class Timer {
      * This method equals `add(System.nanoTime() - start)`. We wrap `System.nanoTime()` in this
      * method to avoid unnecessary calls when instrumenting is disabled.
      */
-    public void calOperationCostTimeFromStart(long startTime) {
+    public long calOperationCostTimeFromStart(long startTime) {
       if (ENABLE_INSTRUMENTING && startTime != Long.MIN_VALUE && startTime != 0) {
         long consumed = System.nanoTime() - startTime;
         add(consumed);
+        return consumed;
       }
+      return 0;
     }
 
     /** WARN: no current safety guarantee. */
@@ -363,6 +407,10 @@ public class Timer {
       double avg = s / cnt;
       return String.format("%s - %s: %.2f, %d, %.2f, %d", className, blockName, s, cnt, avg, max);
     }
+
+    public long getCnt() {
+      return counter.get();
+    }
   }
 
   public static String getReport() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 7a6eb2c0c4..486c50959e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 
@@ -160,7 +161,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
    */
   @Override
   public TNodeStatus queryNodeStatus() {
-    return new TNodeStatus();
+    return new TNodeStatus()
+        .setFanoutRequestNum(
+            Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt());
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java
new file mode 100644
index 0000000000..b2b94aa2e0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.utils;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class WeightedList<T> {
+  private List<Pair<T, Double>> elements = new ArrayList<>();
+  private Random random = new Random();
+  private double[] probabilities;
+  private double weightSum = 0.0;
+
+  public void insert(T t, Double weight) {
+    elements.add(new Pair<>(t, weight));
+    weightSum += weight;
+  }
+
+  public List<T> select(int num) {
+    List<T> rst = new ArrayList<>(num);
+    if (num >= elements.size()) {
+      for (Pair<T, Double> element : elements) {
+        rst.add(element.left);
+      }
+      elements.clear();
+      weightSum = 0.0;
+    } else {
+      for (int i = 0; i < num; i++) {
+        rst.add(select());
+      }
+    }
+
+    return rst;
+  }
+
+  public T select() {
+    if (elements.isEmpty()) {
+      return null;
+    }
+
+    if (probabilities == null || probabilities.length < elements.size()) {
+      probabilities = new double[elements.size()];
+    }
+
+    probabilities[0] = elements.get(0).right / weightSum;
+    for (int i = 1; i < elements.size(); i++) {
+      probabilities[i] = elements.get(i).right / weightSum + probabilities[i - 1];
+    }
+
+    double p = random.nextDouble();
+    int selectedIndex = elements.size() - 1;
+    for (int i = 0; i < elements.size() - 1; i++) {
+      if (p <= probabilities[i]) {
+        selectedIndex = i;
+        break;
+      }
+    }
+
+    Pair<T, Double> rst = elements.remove(selectedIndex);
+    weightSum -= rst.right;
+    return rst.left;
+  }
+
+  public int size() {
+    return elements.size();
+  }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 77111cb0f3..3c82a4241a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -78,8 +78,7 @@ public class AppendNodeEntryHandlerTest {
         handler.setLog(votingLog);
         handler.setMember(member);
         handler.setReceiverTerm(receiverTerm);
-        handler.setReceiver(TestUtils.getNode(i));
-        handler.setPeer(peer);
+        handler.setDirectReceiver(TestUtils.getNode(i));
         handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
         long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
         AppendEntryResult result = new AppendEntryResult();
@@ -114,8 +113,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setLog(votingLog);
       handler.setMember(member);
       handler.setReceiverTerm(receiverTerm);
-      handler.setReceiver(TestUtils.getNode(i));
-      handler.setPeer(peer);
+      handler.setDirectReceiver(TestUtils.getNode(i));
       handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       AppendEntryResult result = new AppendEntryResult();
       result.setStatus(Response.RESPONSE_AGREE);
@@ -141,8 +139,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setLog(votingLog);
       handler.setMember(member);
       handler.setReceiverTerm(receiverTerm);
-      handler.setReceiver(TestUtils.getNode(0));
-      handler.setPeer(peer);
+      handler.setDirectReceiver(TestUtils.getNode(0));
       handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
       votingLog.wait();
@@ -168,8 +165,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setLog(votingLog);
       handler.setMember(member);
       handler.setReceiverTerm(receiverTerm);
-      handler.setReceiver(TestUtils.getNode(0));
-      handler.setPeer(peer);
+      handler.setDirectReceiver(TestUtils.getNode(0));
       handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       handler.onError(new TestException());
 
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index ea8edf73d9..f627c13621 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -235,7 +235,7 @@ struct PreviousFillRequest {
 
 // the spec and load of a node, for query coordinating
 struct TNodeStatus {
-
+  1: required long fanoutRequestNum
 }
 
 struct GetAggrResultRequest {
@@ -283,6 +283,7 @@ struct AppendEntryResult {
   2: optional i64 lastLogTerm;
   3: optional i64 lastLogIndex;
   4: optional RaftNode header;
+  5: optional Node receiver;
 }
 
 


[iotdb] 03/03: add relay_first_level_size config

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1da27c431b059cc1c6184075436a13334e381582
Author: jt <jt...@163.com>
AuthorDate: Thu Apr 7 09:15:49 2022 +0800

    add relay_first_level_size config
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  14 +--
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 ++
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 +
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 109 ++++++++++++---------
 .../org/apache/iotdb/cluster/log/LogAckSender.java |   8 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |   6 +-
 .../org/apache/iotdb/cluster/log/LogRelay.java     |  81 ++++++++++++++-
 .../cluster/server/heartbeat/HeartbeatThread.java  |   2 +-
 .../cluster/server/member/DataGroupMember.java     |  37 ++++---
 .../iotdb/cluster/server/member/RaftMember.java    |  90 +++--------------
 .../iotdb/cluster/server/monitor/NodeReport.java   |  79 ++++++++-------
 .../iotdb/cluster/server/monitor/NodeStatus.java   |  20 ++++
 .../cluster/server/monitor/NodeStatusManager.java  |   9 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java |  28 +++++-
 .../cluster/server/service/MetaSyncService.java    |   4 +-
 .../iotdb/cluster/utils/WindowStatistic.java       |  55 +++++++++++
 16 files changed, 364 insertions(+), 193 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index a626b88425..f399c78b2b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,9 +45,8 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.NodeReport;
-import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
 import org.apache.iotdb.cluster.server.raft.DataRaftService;
 import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -82,7 +81,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -216,6 +214,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
         report.setMetaMemberReport(metaGroupMember.genMemberReport());
         report.setDataMemberReportList(dataGroupEngine.genMemberReports());
         logger.info(report.toString());
+        NodeStatusManager.getINSTANCE().report();
       } catch (Exception e) {
         logger.error("exception occurred when generating node report", e);
       }
@@ -296,13 +295,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
 
     @Override
     public void run() {
-      logger.info(
-          "Total request fanout: {}",
-          Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
-      for (Entry<Node, NodeStatus> nodeNodeStatusEntry :
-          NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet()) {
-        logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
-      }
+      logger.info(Timer.getReport());
+      NodeStatusManager.getINSTANCE().report();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 187509b2ef..0f265dee76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,6 +192,8 @@ public class ClusterConfig {
 
   private int relaySenderNum = 8;
 
+  private int relayFirstLevelSize = 1;
+
   private boolean optimizeIndirectBroadcasting = false;
 
   /**
@@ -600,4 +602,12 @@ public class ClusterConfig {
   public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) {
     this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
   }
+
+  public int getRelayFirstLevelSize() {
+    return relayFirstLevelSize;
+  }
+
+  public void setRelayFirstLevelSize(int relayFirstLevelSize) {
+    this.relayFirstLevelSize = relayFirstLevelSize;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a089e2d75a..8e9f8934bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -356,6 +356,11 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "relay_sender_number", String.valueOf(config.getRelaySenderNum()))));
 
+    config.setRelayFirstLevelSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "relay_first_level_size", String.valueOf(config.getRelayFirstLevelSize()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index d32fb405e3..ecbd1dbabe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
@@ -38,6 +39,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_RELAYED_LEVEL1_NUM;
 
 /**
  * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
@@ -47,7 +51,9 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(IndirectLogDispatcher.class);
 
-  private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
+  private Map<Node, List<Node>> directToIndirectFollowerMap = new ConcurrentHashMap<>();
+  private long dispatchedEntryNum;
+  private int recalculateMapInterval = 1;
 
   public IndirectLogDispatcher(RaftMember member) {
     super(member);
@@ -74,7 +80,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
   @Override
   public void offer(SendLogRequest request) {
     super.offer(request);
-    recalculateDirectFollowerMap();
+    dispatchedEntryNum++;
+    if (dispatchedEntryNum % recalculateMapInterval == 0) {
+      recalculateDirectFollowerMap();
+    }
   }
 
   @Override
@@ -87,6 +96,15 @@ public class IndirectLogDispatcher extends LogDispatcher {
     return newRequest;
   }
 
+  private double getNodeWeight(Node node, double maxLatency) {
+    NodeStatus status = NodeStatusManager.getINSTANCE().getNodeStatus(node, false);
+    //    return 1.0
+    //        / (status.getStatus().fanoutRequestNum + 1);
+    double pow = Math.pow(100.0, maxLatency / status.getSendEntryLatencyStatistic().getAvg());
+    status.setRelayWeight(pow);
+    return pow;
+  }
+
   public void recalculateDirectFollowerMap() {
     List<Node> allNodes = new ArrayList<>(member.getAllNodes());
     allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
@@ -96,28 +114,31 @@ public class IndirectLogDispatcher extends LogDispatcher {
     nodesEnabled.clear();
     directToIndirectFollowerMap.clear();
 
+    int firstLevelSize = ClusterDescriptor.getInstance().getConfig().getRelayFirstLevelSize();
+    List<Node> firstLevelNodes;
+
     if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
       QueryCoordinator instance = QueryCoordinator.getINSTANCE();
       orderedNodes = instance.reorderNodes(allNodes);
-      long thisLoad =
-          Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + 1;
-      long minLoad =
+      long thisLoad = Statistic.getTotalFanout() + 1;
+      double maxLatency =
           NodeStatusManager.getINSTANCE()
-                  .getNodeStatus(orderedNodes.get(0), false)
-                  .getStatus()
-                  .fanoutRequestNum
-              + 1;
-      double loadFactor = 1.05;
+              .getNodeStatus(orderedNodes.get(0), false)
+              .getSendEntryLatencyStatistic()
+              .getAvg();
+      for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
+        maxLatency =
+            Double.max(
+                maxLatency,
+                NodeStatusManager.getINSTANCE()
+                    .getNodeStatus(orderedNodes.get(i), false)
+                    .getSendEntryLatencyStatistic()
+                    .getAvg());
+      }
+
       WeightedList<Node> firstLevelCandidates = new WeightedList<>();
       firstLevelCandidates.insert(
-          orderedNodes.get(0),
-          1.0
-              / (NodeStatusManager.getINSTANCE()
-                      .getNodeStatus(orderedNodes.get(0), false)
-                      .getStatus()
-                      .fanoutRequestNum
-                  + 1));
-      int firstLevelSize = 1;
+          orderedNodes.get(0), getNodeWeight(orderedNodes.get(0), maxLatency));
 
       for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
         Node orderedNode = orderedNodes.get(i);
@@ -127,9 +148,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
                     .getStatus()
                     .fanoutRequestNum
                 + 1;
-        if (nodeLoad * 1.0 <= minLoad * loadFactor) {
-          firstLevelCandidates.insert(orderedNode, 1.0 / nodeLoad);
-        }
+        firstLevelCandidates.insert(orderedNode, getNodeWeight(orderedNode, maxLatency));
         if (nodeLoad > thisLoad) {
           firstLevelSize = (int) Math.max(firstLevelSize, nodeLoad / thisLoad);
         }
@@ -139,36 +158,34 @@ public class IndirectLogDispatcher extends LogDispatcher {
         firstLevelSize = firstLevelCandidates.size();
       }
 
-      List<Node> firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
-
-      Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
-      orderedNodes.removeAll(firstLevelNodes);
-      for (int i = 0; i < orderedNodes.size(); i++) {
-        Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
-        secondLevelNodeMap
-            .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
-            .add(orderedNodes.get(i));
+      firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
+    } else {
+      firstLevelNodes = new ArrayList<>(orderedNodes.subList(0, firstLevelSize));
+      if (firstLevelSize > orderedNodes.size()) {
+        firstLevelSize = orderedNodes.size();
       }
+    }
 
-      for (Node firstLevelNode : firstLevelNodes) {
-        directToIndirectFollowerMap.put(
-            firstLevelNode,
-            secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
-        nodesEnabled.put(firstLevelNode, true);
-      }
+    Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
+    orderedNodes.removeAll(firstLevelNodes);
+    for (int i = 0; i < orderedNodes.size(); i++) {
+      Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
+      secondLevelNodeMap
+          .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
+          .add(orderedNodes.get(i));
+    }
 
-    } else {
-      for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
-        if (i != j) {
-          directToIndirectFollowerMap.put(
-              orderedNodes.get(i), Collections.singletonList(orderedNodes.get(j)));
-        } else {
-          directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
-        }
-        nodesEnabled.put(orderedNodes.get(i), true);
-      }
+    for (Node firstLevelNode : firstLevelNodes) {
+      directToIndirectFollowerMap.put(
+          firstLevelNode, secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
+      nodesEnabled.put(firstLevelNode, true);
     }
 
+    RAFT_RELAYED_LEVEL1_NUM.add(directToIndirectFollowerMap.size());
     logger.debug("New relay map: {}", directToIndirectFollowerMap);
   }
+
+  public Map<Node, List<Node>> getDirectToIndirectFollowerMap() {
+    return Collections.unmodifiableMap(directToIndirectFollowerMap);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
index 3441a01432..3a15dca6d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
@@ -57,8 +57,10 @@ public class LogAckSender {
   public LogAckSender(RaftMember member) {
     this.member = member;
     this.header = member.getHeader();
-    ackSenderPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(member.getName() + "-ACKSender");
-    ackSenderPool.submit(this::appendAckLeaderTask);
+    ackSenderPool = IoTDBThreadPoolFactory.newFixedThreadPool(4, member.getName() + "-ACKSender");
+    for (int i = 0; i < 4; i++) {
+      ackSenderPool.submit(this::appendAckLeaderTask);
+    }
   }
 
   public static class AckRequest {
@@ -93,7 +95,7 @@ public class LogAckSender {
         }
 
         appendAckLeader(ackRequestList);
-        Thread.sleep(10);
+        // Thread.sleep(10);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 3e659a1468..f1c50952bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -193,7 +193,6 @@ public class LogDispatcher {
 
   public static class SendLogRequest {
 
-    private AppendNodeEntryHandler handler;
     private VotingLog votingLog;
     private AtomicBoolean leaderShipStale;
     private AtomicLong newLeaderTerm;
@@ -469,6 +468,10 @@ public class LogDispatcher {
               logRequest.newLeaderTerm,
               logRequest.quorumSize);
       // TODO add async interface
+      if (syncClient == null) {
+        syncClient = member.getSyncClient(receiver);
+      }
+
       int retries = 5;
       try {
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -486,6 +489,7 @@ public class LogDispatcher {
             NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(receiver, false);
             nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
             nodeStatus.getSendEntryNum().incrementAndGet();
+            nodeStatus.getSendEntryLatencyStatistic().add(sendLogTime);
 
             long handleStart = Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.getOperationStartTime();
             handler.onComplete(result);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 842f2eb3ca..350e3fed0b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -23,10 +23,16 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +42,8 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 
+import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
+
 /** LogRelay is used by followers to forward entries from the leader to other followers. */
 public class LogRelay {
 
@@ -51,8 +59,8 @@ public class LogRelay {
     this.raftMember = raftMember;
     relaySenders =
         IoTDBThreadPoolFactory.newFixedThreadPool(
-            RELAY_NUMBER, raftMember.getName() + "-RelaySender");
-    for (int i = 0; i < RELAY_NUMBER; i++) {
+            RELAY_NUMBER + 4, raftMember.getName() + "-RelaySender");
+    for (int i = 0; i < 4; i++) {
       relaySenders.submit(new RelayThread());
     }
   }
@@ -116,7 +124,7 @@ public class LogRelay {
                       + (relayEntry.singleRequest.prevLogIndex + 1)
                       + "-"
                       + relayEntry.receivers);
-          raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+          sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
         } else if (relayEntry.batchRequest != null) {
           Thread.currentThread()
               .setName(
@@ -125,7 +133,7 @@ public class LogRelay {
                       + (relayEntry.batchRequest.prevLogIndex + 1)
                       + "-"
                       + relayEntry.receivers);
-          raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
+          sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
         }
 
         Statistic.RAFT_RELAYED_ENTRY.add(1);
@@ -133,6 +141,71 @@ public class LogRelay {
     }
   }
 
+  public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
+    request.setIsFromLeader(false);
+    request.setSubReceiversIsSet(false);
+    for (Node subFollower : subFollowers) {
+      relaySenders.submit(
+          () -> {
+            Client syncClient = null;
+            try {
+              if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+                raftMember
+                    .getAsyncClient(subFollower)
+                    .appendEntry(request, new IndirectAppendHandler(subFollower, request));
+              } else {
+                long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_LOG.getOperationStartTime();
+                syncClient = raftMember.getSyncClient(subFollower);
+
+                int concurrentSender = concurrentSenderNum.incrementAndGet();
+                Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
+                syncClient.appendEntry(request);
+                concurrentSenderNum.decrementAndGet();
+
+                long sendLogTime =
+                    Statistic.RAFT_RECEIVER_RELAY_LOG.calOperationCostTimeFromStart(
+                        operationStartTime);
+                NodeStatus nodeStatus =
+                    NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
+                nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+                nodeStatus.getSendEntryNum().incrementAndGet();
+                nodeStatus.getSendEntryLatencyStatistic().add(sendLogTime);
+              }
+            } catch (TException e) {
+              logger.error("Cannot send {} to {}", request, subFollower, e);
+            } finally {
+              if (syncClient != null) {
+                ClientUtils.putBackSyncClient(syncClient);
+              }
+            }
+          });
+    }
+  }
+
+  public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
+    request.setIsFromLeader(false);
+    request.setSubReceiversIsSet(false);
+    for (Node subFollower : subFollowers) {
+      Client syncClient = null;
+      try {
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          raftMember
+              .getAsyncClient(subFollower)
+              .appendEntries(request, new IndirectAppendHandler(subFollower, request));
+        } else {
+          syncClient = raftMember.getSyncClient(subFollower);
+          syncClient.appendEntries(request);
+        }
+      } catch (TException e) {
+        logger.error("Cannot send {} to {}", request, subFollower, e);
+      } finally {
+        if (syncClient != null) {
+          ClientUtils.putBackSyncClient(syncClient);
+        }
+      }
+    }
+  }
+
   public static class RelayEntry implements Comparable<RelayEntry> {
 
     private AppendEntryRequest singleRequest;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index acc34cf6c0..266f767b96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -268,7 +268,7 @@ public class HeartbeatThread implements Runnable {
 
     if (!ClusterUtils.isNodeEquals(
         localMember.getThisNode(), localMember.getPartitionGroup().getHeader().node)) {
-      long electionWait = getElectionRandomWaitMs();
+      long electionWait = getElectionRandomWaitMs() + 5000;
       logger.info(
           "{}: Sleep {}ms before the first election as this node is not the preferred " + "leader",
           memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 3114182960..11f0cf0a4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.LogParser;
@@ -993,21 +994,27 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
   public DataMemberReport genReport() {
     long prevLastLogIndex = lastReportedLogIndex;
     lastReportedLogIndex = logManager.getLastLogIndex();
-    return new DataMemberReport(
-        character,
-        leader.get(),
-        term.get(),
-        logManager.getLastLogTerm(),
-        lastReportedLogIndex,
-        logManager.getCommitLogIndex(),
-        logManager.getCommitLogTerm(),
-        getHeader(),
-        readOnly,
-        NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
-        lastHeartbeatReceivedTime,
-        prevLastLogIndex,
-        logManager.getMaxHaveAppliedCommitIndex(),
-        logRelay != null ? logRelay.first() : null);
+    DataMemberReport dataMemberReport =
+        new DataMemberReport(
+            character,
+            leader.get(),
+            term.get(),
+            logManager.getLastLogTerm(),
+            lastReportedLogIndex,
+            logManager.getCommitLogIndex(),
+            logManager.getCommitLogTerm(),
+            getHeader(),
+            readOnly,
+            NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
+            lastHeartbeatReceivedTime,
+            prevLastLogIndex,
+            logManager.getMaxHaveAppliedCommitIndex(),
+            logRelay != null ? logRelay.first() : null);
+    if (character == NodeCharacter.LEADER && config.isUseIndirectBroadcasting()) {
+      dataMemberReport.setDirectToIndirectFollowerMap(
+          ((IndirectLogDispatcher) getLogDispatcher()).getDirectToIndirectFollowerMap());
+    }
+    return dataMemberReport;
   }
 
   @TestOnly
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0d605d3f46..f554a69048 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -72,8 +72,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
-import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -133,7 +131,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
-import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
 
 /**
  * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
@@ -631,63 +628,6 @@ public abstract class RaftMember implements RaftMemberMBean {
     return result;
   }
 
-  public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
-    request.setIsFromLeader(false);
-    request.setSubReceiversIsSet(false);
-    for (Node subFollower : subFollowers) {
-      Client syncClient = null;
-      try {
-        if (config.isUseAsyncServer()) {
-          getAsyncClient(subFollower)
-              .appendEntry(request, new IndirectAppendHandler(subFollower, request));
-        } else {
-          long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_LOG.getOperationStartTime();
-          syncClient = getSyncClient(subFollower);
-
-          int concurrentSender = concurrentSenderNum.incrementAndGet();
-          Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
-          syncClient.appendEntry(request);
-          concurrentSenderNum.decrementAndGet();
-
-          long sendLogTime =
-              Statistic.RAFT_RECEIVER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
-          NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
-          nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
-          nodeStatus.getSendEntryNum().incrementAndGet();
-        }
-      } catch (TException e) {
-        logger.error("Cannot send {} to {}", request, subFollower, e);
-      } finally {
-        if (syncClient != null) {
-          ClientUtils.putBackSyncClient(syncClient);
-        }
-      }
-    }
-  }
-
-  public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
-    request.setIsFromLeader(false);
-    request.setSubReceiversIsSet(false);
-    for (Node subFollower : subFollowers) {
-      Client syncClient = null;
-      try {
-        if (config.isUseAsyncServer()) {
-          getAsyncClient(subFollower)
-              .appendEntries(request, new IndirectAppendHandler(subFollower, request));
-        } else {
-          syncClient = getSyncClient(subFollower);
-          syncClient.appendEntries(request);
-        }
-      } catch (TException e) {
-        logger.error("Cannot send {} to {}", request, subFollower, e);
-      } finally {
-        if (syncClient != null) {
-          ClientUtils.putBackSyncClient(syncClient);
-        }
-      }
-    }
-  }
-
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   public AppendEntryResult appendEntries(AppendEntriesRequest request)
       throws UnknownLogTypeException {
@@ -1860,20 +1800,22 @@ public abstract class RaftMember implements RaftMemberMBean {
   @SuppressWarnings("java:S2445")
   protected void commitLog(Log log) throws LogExecutionException {
     long startTime;
-    if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-      startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
-      synchronized (logManager) {
-        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
-            startTime);
-        if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-          startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
-          logManager.commitTo(log.getCurrLogIndex());
-          Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
-        }
-        startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
-      }
-      Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
-    }
+    //    if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+    //      startTime =
+    // Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+    //      synchronized (logManager) {
+    //        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+    //            startTime);
+    //        if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+    //          startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+    //          logManager.commitTo(log.getCurrLogIndex());
+    //
+    // Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
+    //        }
+    //        startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
+    //      }
+    //      Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
+    //    }
 
     // when using async applier, the log here may not be applied. To return the execution
     // result, we must wait until the log is applied.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index cc284f0bb4..2734b19b7b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A node report collects the current runtime information of the local node, which contains: 1. The
@@ -204,6 +205,7 @@ public class NodeReport {
   public static class DataMemberReport extends RaftMemberReport {
     RaftNode header;
     long headerLatency;
+    private Map<Node, List<Node>> directToIndirectFollowerMap;
 
     public DataMemberReport(
         NodeCharacter character,
@@ -239,40 +241,49 @@ public class NodeReport {
 
     @Override
     public String toString() {
-      return "DataMemberReport{"
-          + "header="
-          + header.getNode()
-          + ", raftId="
-          + header.getRaftId()
-          + ", character="
-          + character
-          + ", Leader="
-          + leader
-          + ", term="
-          + term
-          + ", lastLogTerm="
-          + lastLogTerm
-          + ", lastLogIndex="
-          + lastLogIndex
-          + ", commitIndex="
-          + commitIndex
-          + ", commitTerm="
-          + commitTerm
-          + ", appliedLogIndex="
-          + maxAppliedLogIndex
-          + ", readOnly="
-          + isReadOnly
-          + ", nextToRelay="
-          + nextToRelay
-          + ", headerLatency="
-          + headerLatency
-          + "ns"
-          + ", lastHeartbeat="
-          + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
-          + "ms ago"
-          + ", logIncrement="
-          + (lastLogIndex - prevLastLogIndex)
-          + '}';
+      String s =
+          "DataMemberReport{"
+              + "header="
+              + header.getNode()
+              + ", raftId="
+              + header.getRaftId()
+              + ", character="
+              + character
+              + ", Leader="
+              + leader
+              + ", term="
+              + term
+              + ", lastLogTerm="
+              + lastLogTerm
+              + ", lastLogIndex="
+              + lastLogIndex
+              + ", commitIndex="
+              + commitIndex
+              + ", commitTerm="
+              + commitTerm
+              + ", appliedLogIndex="
+              + maxAppliedLogIndex
+              + ", readOnly="
+              + isReadOnly
+              + ", nextToRelay="
+              + nextToRelay
+              + ", headerLatency="
+              + headerLatency
+              + "ns"
+              + ", lastHeartbeat="
+              + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
+              + "ms ago"
+              + ", logIncrement="
+              + (lastLogIndex - prevLastLogIndex);
+      if (directToIndirectFollowerMap != null) {
+        s = s + ", relayMap=" + directToIndirectFollowerMap;
+      }
+      s = s + '}';
+      return s;
+    }
+
+    public void setDirectToIndirectFollowerMap(Map<Node, List<Node>> directToIndirectFollowerMap) {
+      this.directToIndirectFollowerMap = directToIndirectFollowerMap;
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 6f4039456b..e1456e307b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.server.monitor;
 
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
+import org.apache.iotdb.cluster.utils.WindowStatistic;
 
 import java.util.Date;
 import java.util.Objects;
@@ -57,6 +58,9 @@ public class NodeStatus implements Comparable<NodeStatus> {
 
   private AtomicLong sendEntryNum = new AtomicLong();
   private AtomicLong sendEntryLatencySum = new AtomicLong();
+  private WindowStatistic sendEntryLatencyStatistic = new WindowStatistic();
+
+  private double relayWeight;
 
   // TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
   @Override
@@ -107,6 +111,14 @@ public class NodeStatus implements Comparable<NodeStatus> {
     this.lastResponseLatency = lastResponseLatency;
   }
 
+  public double getRelayWeight() {
+    return relayWeight;
+  }
+
+  public void setRelayWeight(double relayWeight) {
+    this.relayWeight = relayWeight;
+  }
+
   public void activate() {
     isActivated = true;
   }
@@ -129,6 +141,10 @@ public class NodeStatus implements Comparable<NodeStatus> {
     return sendEntryLatencySum;
   }
 
+  public WindowStatistic getSendEntryLatencyStatistic() {
+    return sendEntryLatencyStatistic;
+  }
+
   @Override
   public String toString() {
     return "NodeStatus{"
@@ -148,6 +164,10 @@ public class NodeStatus implements Comparable<NodeStatus> {
         + sendEntryLatencySum
         + ", sendEntryLatencyAvg="
         + (sendEntryLatencySum.get() * 1.0 / sendEntryNum.get())
+        + ", latestSendEntryLatencyAvg="
+        + sendEntryLatencyStatistic.getAvg()
+        + ", nodeRelayWeight="
+        + relayWeight
         + '}';
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index eb79a84d2a..4edcbb8d57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.net.ConnectException;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -47,7 +48,7 @@ public class NodeStatusManager {
 
   private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
   // a status is considered stale if it is older than one minute and should be updated
-  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 10L;
+  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 1000L;
   private static final NodeStatusManager INSTANCE = new NodeStatusManager();
 
   private MetaGroupMember metaGroupMember;
@@ -185,4 +186,10 @@ public class NodeStatusManager {
   public Map<Node, NodeStatus> getNodeStatusMap() {
     return Collections.unmodifiableMap(nodeStatusMap);
   }
+
+  public void report() {
+    for (Entry<Node, NodeStatus> nodeNodeStatusEntry : getNodeStatusMap().entrySet()) {
+      logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 72892bdb56..7ded0c910e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.monitor;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.utils.WindowStatistic;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -334,6 +335,7 @@ public class Timer {
     RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
     RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
     RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+    RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
     RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
@@ -344,6 +346,7 @@ public class Timer {
     String blockName;
     AtomicLong sum = new AtomicLong(0);
     AtomicLong counter = new AtomicLong(0);
+    private WindowStatistic latestWindow = new WindowStatistic();
     long max;
     double scale;
     boolean valid;
@@ -370,6 +373,7 @@ public class Timer {
         sum.addAndGet(val);
         counter.incrementAndGet();
         max = Math.max(max, val);
+        latestWindow.add(val);
       }
     }
 
@@ -399,6 +403,7 @@ public class Timer {
       sum.set(0);
       counter.set(0);
       max = 0;
+      latestWindow.reset();
     }
 
     /** WARN: no current safety guarantee. */
@@ -413,12 +418,28 @@ public class Timer {
       double s = sum.get() / scale;
       long cnt = counter.get();
       double avg = s / cnt;
-      return String.format("%s - %s: %.2f, %d, %.2f, %d", className, blockName, s, cnt, avg, max);
+      return String.format(
+          "%s - %s: %.2f, %d, %.2f, %d, %.2f",
+          className, blockName, s, cnt, avg, max, latestWindow.getAvg());
     }
 
     public long getCnt() {
       return counter.get();
     }
+
+    public long getSum() {
+      return sum.get();
+    }
+
+    public static long getTotalFanout() {
+      return Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt();
+    }
+
+    public static double getSendLatency() {
+      return (Statistic.RAFT_SENDER_SEND_LOG.getSum() + Statistic.RAFT_RECEIVER_RELAY_LOG.getSum())
+          * 1.0
+          / (Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt());
+    }
   }
 
   public static String getReport() {
@@ -427,6 +448,11 @@ public class Timer {
     }
     StringBuilder result = new StringBuilder();
     printTo(Statistic.ROOT, result);
+    result.append(System.lineSeparator());
+    result.append(
+        String.format(
+            "Total request fanout: %d, send entry latency: %f",
+            Statistic.getTotalFanout(), Statistic.getSendLatency()));
     return result.toString();
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index f8df9d8d83..7887cc8084 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -161,9 +161,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
    */
   @Override
   public TNodeStatus queryNodeStatus() {
-    return new TNodeStatus()
-        .setFanoutRequestNum(
-            Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt());
+    return new TNodeStatus().setFanoutRequestNum(Statistic.getTotalFanout());
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java
new file mode 100644
index 0000000000..854d974a95
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.utils;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class WindowStatistic {
+
+  private static final int DEFAULT_LENGTH = 1000;
+
+  private Queue<Long> values = new ArrayDeque<>();
+  private volatile long sum;
+  private volatile int cnt;
+  private int windowLength = DEFAULT_LENGTH;
+
+  public synchronized void add(long val) {
+    values.add(val);
+    sum += val;
+    cnt++;
+
+    if (cnt > windowLength) {
+      Long poll = values.poll();
+      sum -= poll;
+      cnt--;
+    }
+  }
+
+  public double getAvg() {
+    return cnt == 0 ? 0.0 : sum * 1.0 / cnt;
+  }
+
+  public synchronized void reset() {
+    values.clear();
+    cnt = 0;
+    sum = 0;
+  }
+}


[iotdb] 02/03: fix load calculation

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64011f9038275a2549b5ce51837975fd4443ae50
Author: jt <jt...@163.com>
AuthorDate: Tue Mar 29 12:06:31 2022 +0800

    fix load calculation
---
 .../java/org/apache/iotdb/cluster/ClusterIoTDB.java    |  2 +-
 .../iotdb/cluster/log/IndirectLogDispatcher.java       |  2 +-
 .../java/org/apache/iotdb/cluster/log/LogRelay.java    |  6 +++---
 .../apache/iotdb/cluster/server/member/RaftMember.java |  4 ++--
 .../iotdb/cluster/server/monitor/NodeStatus.java       |  3 ++-
 .../org/apache/iotdb/cluster/server/monitor/Timer.java | 18 +++++++++++++-----
 .../iotdb/cluster/server/service/MetaSyncService.java  |  2 +-
 7 files changed, 23 insertions(+), 14 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 2e5b02cc58..a626b88425 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -298,7 +298,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     public void run() {
       logger.info(
           "Total request fanout: {}",
-          Statistic.RAFT_SENDER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
+          Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
       for (Entry<Node, NodeStatus> nodeNodeStatusEntry :
           NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet()) {
         logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index c02f911778..d32fb405e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -100,7 +100,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
       QueryCoordinator instance = QueryCoordinator.getINSTANCE();
       orderedNodes = instance.reorderNodes(allNodes);
       long thisLoad =
-          Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt() + 1;
+          Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + 1;
       long minLoad =
           NodeStatusManager.getINSTANCE()
                   .getNodeStatus(orderedNodes.get(0), false)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 5972fba588..842f2eb3ca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -66,7 +66,7 @@ public class LogRelay {
   }
 
   private void offer(RelayEntry entry) {
-    long operationStartTime = Statistic.RAFT_SENDER_RELAY_OFFER_LOG.getOperationStartTime();
+    long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_OFFER_LOG.getOperationStartTime();
     synchronized (entryHeap) {
       while (entryHeap.size()
           > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
@@ -79,7 +79,7 @@ public class LogRelay {
       entryHeap.add(entry);
       entryHeap.notifyAll();
     }
-    Statistic.RAFT_SENDER_RELAY_OFFER_LOG.calOperationCostTimeFromStart(operationStartTime);
+    Statistic.RAFT_RECEIVER_RELAY_OFFER_LOG.calOperationCostTimeFromStart(operationStartTime);
   }
 
   public void offer(AppendEntriesRequest request, List<Node> receivers) {
@@ -128,7 +128,7 @@ public class LogRelay {
           raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
         }
 
-        Statistic.RAFT_SEND_RELAY.add(1);
+        Statistic.RAFT_RELAYED_ENTRY.add(1);
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index db75f3b1c2..0d605d3f46 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -641,7 +641,7 @@ public abstract class RaftMember implements RaftMemberMBean {
           getAsyncClient(subFollower)
               .appendEntry(request, new IndirectAppendHandler(subFollower, request));
         } else {
-          long operationStartTime = Statistic.RAFT_SENDER_RELAY_LOG.getOperationStartTime();
+          long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_LOG.getOperationStartTime();
           syncClient = getSyncClient(subFollower);
 
           int concurrentSender = concurrentSenderNum.incrementAndGet();
@@ -650,7 +650,7 @@ public abstract class RaftMember implements RaftMemberMBean {
           concurrentSenderNum.decrementAndGet();
 
           long sendLogTime =
-              Statistic.RAFT_SENDER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
+              Statistic.RAFT_RECEIVER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
           NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
           nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
           nodeStatus.getSendEntryNum().incrementAndGet();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 395048eb80..6f4039456b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.monitor;
 
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 
+import java.util.Date;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -134,7 +135,7 @@ public class NodeStatus implements Comparable<NodeStatus> {
         + "status="
         + status
         + ", lastUpdateTime="
-        + lastUpdateTime
+        + new Date(lastUpdateTime)
         + ", lastResponseLatency="
         + lastResponseLatency
         + ", isActivated="
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 13b8ec7bc3..72892bdb56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -152,10 +152,18 @@ public class Timer {
         TIME_SCALE,
         true,
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
-    RAFT_SENDER_RELAY_OFFER_LOG(
-        RAFT_MEMBER_SENDER, "relay offer log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
-    RAFT_SENDER_RELAY_LOG(
-        RAFT_MEMBER_SENDER, "relay log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_RELAY_OFFER_LOG(
+        RAFT_MEMBER_RECEIVER,
+        "relay offer log",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_RECEIVER_RELAY_LOG(
+        RAFT_MEMBER_RECEIVER,
+        "relay entry to a follower",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_VOTE_COUNTER(
         RAFT_MEMBER_SENDER,
         "wait for votes",
@@ -324,7 +332,7 @@ public class Timer {
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
-    RAFT_SEND_RELAY(RAFT_MEMBER_RECEIVER, "send relay entries", 1, true, ROOT),
+    RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
     RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 486c50959e..f8df9d8d83 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -163,7 +163,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
   public TNodeStatus queryNodeStatus() {
     return new TNodeStatus()
         .setFanoutRequestNum(
-            Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt());
+            Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt());
   }
 
   @Override