You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/13 01:21:17 UTC

[iotdb] branch expr_flow updated: update flow balancer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_flow
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_flow by this push:
     new 9b2a5058b6 update flow balancer
9b2a5058b6 is described below

commit 9b2a5058b630c6924124240cfc392cedcf2260cd
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Feb 13 09:22:37 2023 +0800

    update flow balancer
---
 cluster/collect-log-dc.sh                          |  13 ++
 .../apache/iotdb/cluster/config/ClusterConfig.java |  37 +++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  22 ++
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 253 +++++++++++++--------
 .../cluster/expr/flowcontrol/FlowBalancer.java     |  10 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  16 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  10 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java |   6 +-
 8 files changed, 262 insertions(+), 105 deletions(-)

diff --git a/cluster/collect-log-dc.sh b/cluster/collect-log-dc.sh
new file mode 100644
index 0000000000..b4339e0773
--- /dev/null
+++ b/cluster/collect-log-dc.sh
@@ -0,0 +1,13 @@
+src_path=/home/jt/iotdb_expr_vg/logs/*
+
+ips=(dc16 dc17 dc18)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
+target_path=/d/CodeRepo/iotdb/cluster/target/logs
+
+mkdir $target_path
+rm $target_path/*
+for ip in ${ips[*]}
+  do
+    mkdir $target_path/$ip
+    scp -r jt@$ip:$src_path $target_path/$ip
+  done
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 8c897e5552..d26f0e4de3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -225,6 +225,10 @@ public class ClusterConfig {
 
   private int flowMonitorMaxWindowSize = 1000;
   private long flowMonitorWindowInterval = 1000;
+  private boolean useFollowerLoadBalance = false;
+  private int followerLoadBalanceWindowsToUse = 3;
+  private double followerLoadBalanceOverestimateFactor = 1.1;
+  private int logDispatcherBatchSize = 10;
 
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
@@ -713,4 +717,37 @@ public class ClusterConfig {
   public long getFlowMonitorWindowInterval() {
     return flowMonitorWindowInterval;
   }
+
+  public boolean isUseFollowerLoadBalance() {
+    return useFollowerLoadBalance;
+  }
+
+  public void setUseFollowerLoadBalance(boolean useFollowerLoadBalance) {
+    this.useFollowerLoadBalance = useFollowerLoadBalance;
+  }
+
+  public int getFollowerLoadBalanceWindowsToUse() {
+    return followerLoadBalanceWindowsToUse;
+  }
+
+  public void setFollowerLoadBalanceWindowsToUse(int followerLoadBalanceWindowsToUse) {
+    this.followerLoadBalanceWindowsToUse = followerLoadBalanceWindowsToUse;
+  }
+
+  public double getFollowerLoadBalanceOverestimateFactor() {
+    return followerLoadBalanceOverestimateFactor;
+  }
+
+  public void setFollowerLoadBalanceOverestimateFactor(
+      double followerLoadBalanceOverestimateFactor) {
+    this.followerLoadBalanceOverestimateFactor = followerLoadBalanceOverestimateFactor;
+  }
+
+  public int getLogDispatcherBatchSize() {
+    return logDispatcherBatchSize;
+  }
+
+  public void setLogDispatcherBatchSize(int logDispatcherBatchSize) {
+    this.logDispatcherBatchSize = logDispatcherBatchSize;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 42a5b80233..6b1c757f97 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -397,6 +397,28 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "enable_instrumenting", String.valueOf(config.isEnableInstrumenting()))));
 
+    config.setUseFollowerLoadBalance(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "use_follower_load_balance", String.valueOf(config.isUseFollowerLoadBalance()))));
+
+    config.setFollowerLoadBalanceWindowsToUse(
+        Integer.parseInt(
+            properties.getProperty(
+                "follower_load_balance_windows_to_use",
+                String.valueOf(config.getFollowerLoadBalanceWindowsToUse()))));
+
+    config.setFollowerLoadBalanceOverestimateFactor(
+        Double.parseDouble(
+            properties.getProperty(
+                "follower_load_balance_overestimate_factor",
+                String.valueOf(config.getFollowerLoadBalanceOverestimateFactor()))));
+
+    config.setLogDispatcherBatchSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "log_dispatcher_batch_size", String.valueOf(config.getLogDispatcherBatchSize()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 8042a03493..b14ece54dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -37,10 +37,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -52,11 +56,15 @@ public class ExprBench {
 
   private AtomicLong requestCounter = new AtomicLong();
   private AtomicLong latencySum = new AtomicLong();
+  private AtomicLong burstRequestCounter = new AtomicLong();
+  private AtomicLong burstLatencySum = new AtomicLong();
   private long maxLatency = 0;
   private int threadNum = 64;
   private int workloadSize = 64 * 1024;
   private int printInterval = 1000;
   private ClientManager clientPool;
+  private long maxRunningSecond;
+  private long burstInterval;
   private int maxRequestNum;
   private ExecutorService pool = Executors.newCachedThreadPool();
   private List<Node> nodeList = new ArrayList<>();
@@ -65,12 +73,16 @@ public class ExprBench {
   private List<EndPoint> endPoints = new ArrayList<>();
   private Map<EndPoint, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
   private Map<EndPoint, Statistic> latencyMap = new ConcurrentHashMap<>();
+  private long startTime;
+  private volatile boolean duringBurst = false;
+  private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
   public ExprBench(Node target) {
     clientPool = new ClientManager(false, Type.MetaGroupClient);
   }
 
   private static class EndPoint {
+
     private Node node;
     private int raftId;
 
@@ -86,6 +98,7 @@ public class ExprBench {
   }
 
   private static class Statistic {
+
     private AtomicLong sum = new AtomicLong();
     private AtomicLong cnt = new AtomicLong();
 
@@ -100,91 +113,142 @@ public class ExprBench {
     }
   }
 
+  private void benchmarkTask(int taskId) {
+    int endPointIdx = taskId % endPoints.size();
+    Client client = null;
+
+    ExecutNonQueryReq request = new ExecutNonQueryReq();
+    DummyPlan plan = new DummyPlan();
+    plan.setWorkload(new byte[workloadSize]);
+    plan.setNeedForward(true);
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+    Map<EndPoint, Node> endPointLeaderMap = new HashMap<>();
+
+    Node target = null;
+    long currRequsetNum = -1;
+    while (true) {
+      EndPoint endPoint = endPoints.get(endPointIdx);
+      RateLimiter rateLimiter = rateLimiterMap.get(endPoint);
+      if (rateLimiter != null) {
+        rateLimiter.acquire(1);
+      }
+
+      target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node);
+      int raftId = endPoint.raftId;
+      plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId);
+
+      try {
+        client = clientPool.borrowSyncClient(target, ClientCategory.META);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+
+      byteBuffer.clear();
+      plan.serialize(byteBuffer);
+      byteBuffer.flip();
+      request.planBytes = byteBuffer;
+      request.setPlanBytesIsSet(true);
+
+      long reqLatency = System.nanoTime();
+      try {
+        TSStatus status = client.executeNonQueryPlan(request);
+        clientPool.returnSyncClient(client, target, ClientCategory.META);
+        if (status.isSetRedirectNode()) {
+          Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880);
+          endPointLeaderMap.put(endPoint, leader);
+          logger.debug("Leader of {} is changed to {}", endPoint, leader);
+        }
+
+        currRequsetNum = requestCounter.incrementAndGet();
+        if (currRequsetNum > threadNum * 10L) {
+          reqLatency = System.nanoTime() - reqLatency;
+          maxLatency = Math.max(maxLatency, reqLatency);
+          latencySum.addAndGet(reqLatency);
+          latencyMap.get(endPoint).add(reqLatency);
+          if (duringBurst) {
+            burstRequestCounter.incrementAndGet();
+            burstLatencySum.addAndGet(reqLatency);
+          }
+        }
+      } catch (TException e) {
+        e.printStackTrace();
+      }
+
+      long elapsedTime = System.currentTimeMillis() - startTime;
+      if (currRequsetNum % printInterval == 0) {
+        System.out.println(
+            String.format(
+                "%s %d %d %f(%f) %f %f",
+                dateFormat.format(new Date(System.currentTimeMillis())),
+                elapsedTime,
+                currRequsetNum,
+                (currRequsetNum + 0.0) / elapsedTime,
+                currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
+                maxLatency / 1000.0,
+                (latencySum.get() + 0.0) / currRequsetNum));
+        System.out.println(latencyMap);
+      }
+
+      if (currRequsetNum >= maxRequestNum || elapsedTime / 1000 >= maxRunningSecond) {
+        break;
+      }
+    }
+  }
+
+  private void insertBurst() {
+    long burstStart = maxRunningSecond / 2 - burstInterval / 2;
+    long burstEnd = maxRunningSecond / 2 + burstInterval / 2;
+
+    long elapsedTime = (System.currentTimeMillis() - startTime) / 1000;
+    while (elapsedTime < burstStart) {
+      try {
+        Thread.sleep(1000);
+        elapsedTime = (System.currentTimeMillis() - startTime) / 1000;
+      } catch (InterruptedException e) {
+        logger.warn("Unexpected interruption");
+      }
+    }
+    duringBurst = true;
+    System.out.printf("Burst starts");
+    for (Entry<EndPoint, RateLimiter> endPointRateLimiterEntry : rateLimiterMap.entrySet()) {
+      RateLimiter rateLimiter = endPointRateLimiterEntry.getValue();
+      rateLimiter.setRate(rateLimiter.getRate() * 2);
+    }
+
+    while (elapsedTime < burstEnd) {
+      try {
+        Thread.sleep(1000);
+        elapsedTime = (System.currentTimeMillis() - startTime) / 1000;
+      } catch (InterruptedException e) {
+        logger.warn("Unexpected interruption");
+      }
+    }
+    duringBurst = false;
+    System.out.printf("Burst ends");
+    for (Entry<EndPoint, RateLimiter> endPointRateLimiterEntry : rateLimiterMap.entrySet()) {
+      RateLimiter rateLimiter = endPointRateLimiterEntry.getValue();
+      rateLimiter.setRate(rateLimiter.getRate() / 2);
+    }
+  }
+
   public void benchmark() {
-    long startTime = System.currentTimeMillis();
+    startTime = System.currentTimeMillis();
     for (int i = 0; i < threadNum; i++) {
-      int finalI = i;
-      pool.submit(
-          () -> {
-            int endPointIdx = finalI % endPoints.size();
-            Client client = null;
-
-            ExecutNonQueryReq request = new ExecutNonQueryReq();
-            DummyPlan plan = new DummyPlan();
-            plan.setWorkload(new byte[workloadSize]);
-            plan.setNeedForward(true);
-
-            ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
-            Map<EndPoint, Node> endPointLeaderMap = new HashMap<>();
-
-            Node target = null;
-            long currRequsetNum = -1;
-            while (true) {
-
-              EndPoint endPoint = endPoints.get(endPointIdx);
-              RateLimiter rateLimiter = rateLimiterMap.get(endPoint);
-              if (rateLimiter != null) {
-                rateLimiter.acquire(1);
-              }
-
-              target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node);
-              int raftId = endPoint.raftId;
-              plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId);
-
-              try {
-                client = clientPool.borrowSyncClient(target, ClientCategory.META);
-              } catch (IOException e) {
-                e.printStackTrace();
-              }
-
-              byteBuffer.clear();
-              plan.serialize(byteBuffer);
-              byteBuffer.flip();
-              request.planBytes = byteBuffer;
-              request.setPlanBytesIsSet(true);
-
-              long reqLatency = System.nanoTime();
-              try {
-                TSStatus status = client.executeNonQueryPlan(request);
-                clientPool.returnSyncClient(client, target, ClientCategory.META);
-                if (status.isSetRedirectNode()) {
-                  Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880);
-                  endPointLeaderMap.put(endPoint, leader);
-                  logger.debug("Leader of {} is changed to {}", endPoint, leader);
-                }
-
-                currRequsetNum = requestCounter.incrementAndGet();
-                if (currRequsetNum > threadNum * 10) {
-                  reqLatency = System.nanoTime() - reqLatency;
-                  maxLatency = Math.max(maxLatency, reqLatency);
-                  latencySum.addAndGet(reqLatency);
-                  latencyMap.get(endPoint).add(reqLatency);
-                }
-              } catch (TException e) {
-                e.printStackTrace();
-              }
-
-              if (currRequsetNum % printInterval == 0) {
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                System.out.println(
-                    String.format(
-                        "%d %d %f(%f) %f %f",
-                        elapsedTime,
-                        currRequsetNum,
-                        (currRequsetNum + 0.0) / elapsedTime,
-                        currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
-                        maxLatency / 1000.0,
-                        (latencySum.get() + 0.0) / currRequsetNum));
-                System.out.println(latencyMap);
-              }
-
-              if (currRequsetNum >= maxRequestNum) {
-                break;
-              }
-            }
-          });
+      int taskId = i;
+      pool.submit(() -> benchmarkTask(taskId));
     }
     pool.shutdown();
+    if (burstInterval > 0) {
+      insertBurst();
+    }
+    while (!pool.isTerminated()) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        logger.warn("Unexpected interruption");
+      }
+    }
   }
 
   public void setMaxRequestNum(int maxRequestNum) {
@@ -195,11 +259,13 @@ public class ExprBench {
     ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
     Node target = new Node();
     ExprBench bench = new ExprBench(target);
-    bench.maxRequestNum = Integer.parseInt(args[0]);
-    bench.threadNum = Integer.parseInt(args[1]);
-    bench.workloadSize = Integer.parseInt(args[2]) * 1024;
-    bench.printInterval = Integer.parseInt(args[3]);
-    String[] nodesSplit = args[4].split(",");
+    bench.maxRunningSecond = Integer.parseInt(args[0]);
+    bench.burstInterval = Integer.parseInt(args[1]);
+    bench.maxRequestNum = Integer.parseInt(args[2]);
+    bench.threadNum = Integer.parseInt(args[3]);
+    bench.workloadSize = Integer.parseInt(args[4]) * 1024;
+    bench.printInterval = Integer.parseInt(args[5]);
+    String[] nodesSplit = args[6].split(",");
     for (String s : nodesSplit) {
       String[] nodeSplit = s.split(":");
       Node node = new Node();
@@ -207,13 +273,13 @@ public class ExprBench {
       node.setMetaPort(Integer.parseInt(nodeSplit[1]));
       bench.nodeList.add(node);
     }
-    String[] raftFactorSplit = args[5].split(",");
+    String[] raftFactorSplit = args[7].split(",");
     bench.raftFactors = new int[raftFactorSplit.length];
     for (int i = 0; i < raftFactorSplit.length; i++) {
       bench.raftFactors[i] = Integer.parseInt(raftFactorSplit[i]);
     }
-    if (args.length >= 7) {
-      String[] ratesSplit = args[6].split(",");
+    if (args.length >= 9) {
+      String[] ratesSplit = args[8].split(",");
       bench.rateLimits = new int[ratesSplit.length];
       for (int i = 0; i < ratesSplit.length; i++) {
         bench.rateLimits[i] = Integer.parseInt(ratesSplit[i]);
@@ -236,5 +302,14 @@ public class ExprBench {
     bench.benchmark();
 
     System.out.println(bench.latencyMap);
+    if (bench.burstInterval > 0) {
+      long burstRequest = bench.burstRequestCounter.get();
+      long burstLatencySum = bench.burstLatencySum.get();
+      double burstAvgLatency = burstLatencySum * 1.0 / burstRequest;
+      double burstThroughput = burstRequest * 1.0 / bench.burstInterval;
+      System.out.printf(
+          "Statistics during burst: num request %d, throughput %f, latency %f",
+          burstRequest, burstThroughput, burstAvgLatency);
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
index 9cb507e2d7..3afaa2b535 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.expr.flowcontrol;
 
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -42,7 +43,10 @@ public class FlowBalancer {
   private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
   private double maxFlow = 900_000_000;
   private double minFlow = 10_000_000;
-  private int windowsToUse = 3;
+  private int windowsToUse =
+      ClusterDescriptor.getInstance().getConfig().getFollowerLoadBalanceWindowsToUse();
+  private double overestimateFactor =
+      ClusterDescriptor.getInstance().getConfig().getFollowerLoadBalanceOverestimateFactor();
   private int flowBalanceIntervalMS = 1000;
   private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
   private LogDispatcher logDispatcher;
@@ -81,7 +85,7 @@ public class FlowBalancer {
     int followerNum = nodeNum - 1;
 
     double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
-    double assumedFlow = thisNodeFlow * 1.1;
+    double assumedFlow = thisNodeFlow * overestimateFactor;
     logger.info("Flow of this node: {}", thisNodeFlow);
     Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueuesMap =
         logDispatcher.getNodesLogQueuesMap();
@@ -108,7 +112,7 @@ public class FlowBalancer {
     int i = 0;
     for (; i < quorumFollowerNum; i++) {
       Node node = followers.get(i);
-      nodesRate.put(node, flowToQuorum);
+      nodesRate.put(node, maxFlow);
       remainingFlow -= flowToQuorum;
     }
     double flowToRemaining = remainingFlow / (followerNum - quorumFollowerNum);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 30864444a6..dd7dfca348 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -106,8 +106,6 @@ public class LogDispatcher {
   }
 
   void createQueueAndBindingThreads() {
-    double baseRate = 300_000_000.0;
-    int i = 1;
     for (Node node : member.getAllNodes()) {
       if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
         BlockingQueue<SendLogRequest> logBlockingQueue;
@@ -117,13 +115,11 @@ public class LogDispatcher {
         nodesLogQueuesMap.put(node, logBlockingQueue);
         FlowMonitorManager.INSTANCE.register(node);
         nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
-        nodesRate.put(node, baseRate * i);
-        i += 100;
       }
     }
     updateRateLimiter();
 
-    for (i = 0; i < bindingThreadNum; i++) {
+    for (int i = 0; i < bindingThreadNum; i++) {
       for (Entry<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesMap.entrySet()) {
         executorServices
             .computeIfAbsent(
@@ -420,20 +416,24 @@ public class LogDispatcher {
           currBatch.get(0).getVotingLog().getLog().getCurrLogIndex(),
           currBatch.get(currBatch.size() - 1).getVotingLog().getLog().getCurrLogIndex());
       while (logIndex < currBatch.size()) {
-        long logSize = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
+        long logSize = 0;
+        long logSizeLimit = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
         List<ByteBuffer> logList = new ArrayList<>();
         int prevIndex = logIndex;
 
         for (; logIndex < currBatch.size(); logIndex++) {
           long curSize = currBatch.get(logIndex).getAppendEntryRequest().entry.array().length;
-          if (logSize - curSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
+          if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
             break;
           }
-          logSize -= curSize;
+          logSize += curSize;
           logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
         }
 
         AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
+        FlowMonitorManager.INSTANCE.report(receiver, logSize);
+        nodesRateLimiter.get(receiver).acquire((int) logSize);
+
         if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
           appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
         } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 6bd6f6a318..6fa543fe8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -331,8 +331,10 @@ public abstract class RaftMember implements RaftMemberMBean {
     startBackGroundThreads();
     setSkipElection(false);
     FlowMonitorManager.INSTANCE.register(thisNode);
-    flowBalancer = new FlowBalancer(logDispatcher, this);
-    flowBalancer.start();
+    if (config.isUseFollowerLoadBalance()) {
+      flowBalancer = new FlowBalancer(logDispatcher, this);
+      flowBalancer.start();
+    }
     logger.info("{} started", name);
   }
 
@@ -456,7 +458,9 @@ public abstract class RaftMember implements RaftMemberMBean {
     heartBeatService = null;
     appendLogThreadPool = null;
 
-    flowBalancer.stop();
+    if (flowBalancer != null) {
+      flowBalancer.stop();
+    }
     logger.info("Member {} stopped", name);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 2d2b860e77..f127a2885f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -498,8 +498,10 @@ public class Timer {
 
   private static void printTo(Statistic currNode, StringBuilder out) {
     if (currNode != Statistic.ROOT && currNode.valid) {
-      indent(out, currNode.level);
-      out.append(currNode).append("\n");
+      if (currNode.counter.get() != 0) {
+        indent(out, currNode.level);
+        out.append(currNode).append("\n");
+      }
     }
     for (Statistic child : currNode.children) {
       printTo(child, out);