You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/10 01:26:41 UTC

[iotdb] branch expr_flow updated: add flow balancer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_flow
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_flow by this push:
     new f82c3fd620 add flow balancer
f82c3fd620 is described below

commit f82c3fd6209f85df520763a2fd041f4ff75a8868
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Feb 10 09:27:59 2023 +0800

    add flow balancer
---
 .../expr/craft/FragmentedLogDispatcher.java        |   8 +-
 .../cluster/expr/flowcontrol/FlowBalancer.java     | 108 ++++++++++++++++++++-
 .../cluster/expr/flowcontrol/FlowMonitor.java      |  27 +++++-
 .../expr/flowcontrol/FlowMonitorManager.java       |   6 +-
 .../iotdb/cluster/log/IndirectLogDispatcher.java   |  12 +--
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  43 ++++----
 .../cluster/server/member/DataGroupMember.java     |   3 +-
 .../iotdb/cluster/server/member/RaftMember.java    |   6 ++
 .../iotdb/cluster/server/monitor/NodeReport.java   |   9 +-
 9 files changed, 183 insertions(+), 39 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
index a2075e45f7..31802333af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 
 public class FragmentedLogDispatcher extends LogDispatcher {
@@ -47,8 +47,8 @@ public class FragmentedLogDispatcher extends LogDispatcher {
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
 
     int i = 0;
-    for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList) {
-      BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
+    for (Entry<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesMap.entrySet()) {
+      BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
       SendLogRequest fragmentedRequest = new SendLogRequest(request);
       fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
       fragmentedRequest
@@ -60,7 +60,7 @@ public class FragmentedLogDispatcher extends LogDispatcher {
       if (!addSucceeded) {
         logger.debug(
             "Log queue[{}] of {} is full, ignore the request to this node",
-            entry.left,
+            entry.getKey(),
             member.getName());
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
index 8159f807a4..9cb507e2d7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
@@ -17,9 +17,115 @@
  * under the License.
  */
 
-
 package org.apache.iotdb.cluster.expr.flowcontrol;
 
+import org.apache.iotdb.cluster.log.LogDispatcher;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 public class FlowBalancer {
 
+  private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
+  private double maxFlow = 900_000_000;
+  private double minFlow = 10_000_000;
+  private int windowsToUse = 3;
+  private int flowBalanceIntervalMS = 1000;
+  private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
+  private LogDispatcher logDispatcher;
+  private RaftMember member;
+
+  private ScheduledExecutorService scheduledExecutorService;
+
+  public FlowBalancer(LogDispatcher logDispatcher, RaftMember member) {
+    this.logDispatcher = logDispatcher;
+    this.member = member;
+  }
+
+  public void start() {
+    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        scheduledExecutorService,
+        this::rebalance,
+        flowBalanceIntervalMS,
+        flowBalanceIntervalMS,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void stop() {
+    scheduledExecutorService.shutdownNow();
+  }
+
+  private void rebalance() {
+    if (!member.isLeader()) {
+      return;
+    }
+
+    List<Node> followers = new ArrayList<>(member.getAllNodes());
+    followers.remove(member.getThisNode());
+
+    int nodeNum = member.getAllNodes().size();
+    int followerNum = nodeNum - 1;
+
+    double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
+    double assumedFlow = thisNodeFlow * 1.1;
+    logger.info("Flow of this node: {}", thisNodeFlow);
+    Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueuesMap =
+        logDispatcher.getNodesLogQueuesMap();
+    Map<Node, Double> nodesRate = logDispatcher.getNodesRate();
+
+    // sort followers according to their queue length
+    followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size()));
+    if (assumedFlow * followerNum > maxFlow) {
+      enterBurst(nodesRate, nodeNum, assumedFlow, followers);
+    } else {
+      exitBurst(followerNum, nodesRate, followers);
+    }
+    logDispatcher.updateRateLimiter();
+  }
+
+  private void enterBurst(
+      Map<Node, Double> nodesRate, int nodeNum, double assumedFlow, List<Node> followers) {
+    int followerNum = nodeNum - 1;
+    int quorumFollowerNum = nodeNum / 2;
+    double remainingFlow = maxFlow;
+    double quorumMaxFlow = maxFlow / quorumFollowerNum;
+    // distribute flow to quorum followers with the shortest queues
+    double flowToQuorum = Math.min(assumedFlow, quorumMaxFlow);
+    int i = 0;
+    for (; i < quorumFollowerNum; i++) {
+      Node node = followers.get(i);
+      nodesRate.put(node, flowToQuorum);
+      remainingFlow -= flowToQuorum;
+    }
+    double flowToRemaining = remainingFlow / (followerNum - quorumFollowerNum);
+    if (flowToRemaining < minFlow) {
+      flowToRemaining = minFlow;
+    }
+    for (; i < followerNum; i++) {
+      Node node = followers.get(i);
+      nodesRate.put(node, flowToRemaining);
+    }
+  }
+
+  private void exitBurst(int followerNum, Map<Node, Double> nodesRate, List<Node> followers) {
+    // lift flow limits
+    for (int i = 0; i < followerNum; i++) {
+      Node node = followers.get(i);
+      nodesRate.put(node, maxFlow);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
index 867eaddd07..c3d9767166 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
@@ -34,13 +34,13 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayDeque;
 import java.util.Date;
-import java.util.Deque;
+import java.util.Iterator;
 
 public class FlowMonitor {
 
   private static final Logger logger = LoggerFactory.getLogger(FlowMonitor.class);
   private static final String FILE_SUFFIX = ".flow";
-  private Deque<Pair<Long, Long>> windows;
+  private ArrayDeque<Pair<Long, Long>> windows;
   private long currWindowStart;
   private long currWindowSum;
   private long windowInterval;
@@ -49,8 +49,7 @@ public class FlowMonitor {
   private BufferedWriter writer;
   private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-  public FlowMonitor(int maxWindowSize, long windowInterval, Node node)
-      throws IOException {
+  public FlowMonitor(int maxWindowSize, long windowInterval, Node node) throws IOException {
     this.maxWindowSize = maxWindowSize;
     this.windows = new ArrayDeque<>(maxWindowSize);
     this.windowInterval = windowInterval;
@@ -62,7 +61,9 @@ public class FlowMonitor {
     String path =
         IoTDBDescriptor.getInstance().getConfig().getSystemDir()
             + File.separator
-            + node.getInternalIp() + "-" + node.nodeIdentifier
+            + node.getInternalIp()
+            + "-"
+            + node.nodeIdentifier
             + FILE_SUFFIX;
     File file = new File(path);
     file.delete();
@@ -127,4 +128,20 @@ public class FlowMonitor {
     // update the current window
     currWindowSum += val;
   }
+
+  public double averageFlow(int windowsToUse) {
+    long flowSum = currWindowSum;
+    long intervalSum = System.currentTimeMillis() - currWindowStart;
+    Iterator<Pair<Long, Long>> windowIterator = windows.descendingIterator();
+    for (int i = 1; i < windowsToUse; i++) {
+      if (windowIterator.hasNext()) {
+        Pair<Long, Long> window = windowIterator.next();
+        flowSum += window.right;
+        intervalSum += windowInterval;
+      } else {
+        break;
+      }
+    }
+    return flowSum * 1.0 / intervalSum * 1000;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
index 0efbb1ffd5..7dad514c39 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.cluster.expr.flowcontrol;
 
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,4 +72,8 @@ public class FlowMonitorManager {
       logger.warn("Flow monitor {} is not registered", node);
     }
   }
+
+  public double averageFlow(Node node, int windowsToUse) {
+    return monitorMap.get(node).averageFlow(windowsToUse);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index c852575275..eb84287a55 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.WeightedList;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -81,19 +81,19 @@ public class IndirectLogDispatcher extends LogDispatcher {
         logBlockingQueue =
             new ArrayBlockingQueue<>(
                 ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem(), true);
-        nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+        nodesLogQueuesMap.put(node, logBlockingQueue);
       }
     }
 
     for (int i = 0; i < bindingThreadNum; i++) {
-      for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
+      for (Entry<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesMap.entrySet()) {
         executorServices
             .computeIfAbsent(
-                pair.left,
+                pair.getKey(),
                 n ->
                     IoTDBThreadPoolFactory.newCachedThreadPool(
-                        "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
-            .submit(newDispatcherThread(pair.left, pair.right));
+                        "LogDispatcher-" + member.getName() + "-" + pair.getKey().nodeIdentifier))
+            .submit(newDispatcherThread(pair.getKey(), pair.getValue()));
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 72e22181f1..30864444a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.RateLimiter;
 import org.apache.thrift.TException;
@@ -80,7 +79,7 @@ public class LogDispatcher {
   protected RaftMember member;
   private static final ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
   protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
-  protected List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
+  protected Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueuesMap = new HashMap<>();
   protected Map<Node, Boolean> nodesEnabled;
   protected Map<Node, RateLimiter> nodesRateLimiter = new HashMap<>();
   protected Map<Node, Double> nodesRate = new HashMap<>();
@@ -99,7 +98,7 @@ public class LogDispatcher {
     createQueueAndBindingThreads();
   }
 
-  protected void updateRateLimiter() {
+  public void updateRateLimiter() {
     logger.info("Node rates: {}", nodesRate);
     for (Entry<Node, Double> nodeDoubleEntry : nodesRate.entrySet()) {
       nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
@@ -115,7 +114,7 @@ public class LogDispatcher {
         logBlockingQueue =
             new ArrayBlockingQueue<>(
                 ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
-        nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+        nodesLogQueuesMap.put(node, logBlockingQueue);
         FlowMonitorManager.INSTANCE.register(node);
         nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
         nodesRate.put(node, baseRate * i);
@@ -125,14 +124,14 @@ public class LogDispatcher {
     updateRateLimiter();
 
     for (i = 0; i < bindingThreadNum; i++) {
-      for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
+      for (Entry<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesMap.entrySet()) {
         executorServices
             .computeIfAbsent(
-                pair.left,
+                pair.getKey(),
                 n ->
                     IoTDBThreadPoolFactory.newCachedThreadPool(
-                        "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
-            .submit(newDispatcherThread(pair.left, pair.right));
+                        "LogDispatcher-" + member.getName() + "-" + pair.getKey().nodeIdentifier))
+            .submit(newDispatcherThread(pair.getKey(), pair.getValue()));
       }
     }
   }
@@ -191,30 +190,30 @@ public class LogDispatcher {
       verifiers = member.getTrustValueHolder().chooseVerifiers();
     }
 
-    for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList) {
-      if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.left, false)) {
+    for (Entry<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesMap.entrySet()) {
+      if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.getKey(), false)) {
         continue;
       }
 
-      if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.left, verifiers)) {
-        request = transformRequest(entry.left, request);
+      if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.getKey(), verifiers)) {
+        request = transformRequest(entry.getKey(), request);
         request.setVerifier(true);
       }
 
-      BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
+      BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
       try {
         boolean addSucceeded = addToQueue(nodeLogQueue, request);
 
         if (!addSucceeded) {
           logger.debug(
               "Log queue[{}] of {} is full, ignore the request to this node",
-              entry.left,
+              entry.getKey(),
               member.getName());
         }
       } catch (IllegalStateException e) {
         logger.debug(
             "Log queue[{}] of {} is full, ignore the request to this node",
-            entry.left,
+            entry.getKey(),
             member.getName());
       }
     }
@@ -486,8 +485,7 @@ public class LogDispatcher {
           Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
           result = client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier);
           FlowMonitorManager.INSTANCE.report(
-              receiver,
-              logRequest.appendEntryRequest.entry.remaining());
+              receiver, logRequest.appendEntryRequest.entry.remaining());
           nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
 
           Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
@@ -529,8 +527,7 @@ public class LogDispatcher {
         try {
           client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier, handler);
           FlowMonitorManager.INSTANCE.report(
-              receiver,
-              logRequest.appendEntryRequest.entry.remaining());
+              receiver, logRequest.appendEntryRequest.entry.remaining());
           nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
         } catch (TException e) {
           handler.onError(e);
@@ -593,4 +590,12 @@ public class LogDispatcher {
       }
     }
   }
+
+  public Map<Node, Double> getNodesRate() {
+    return nodesRate;
+  }
+
+  public Map<Node, BlockingQueue<SendLogRequest>> getNodesLogQueuesMap() {
+    return nodesLogQueuesMap;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 7480ebc295..b318b15163 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -1023,7 +1023,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
             prevLastLogIndex,
             logManager.getMaxHaveAppliedCommitIndex(),
             logRelay != null ? logRelay.first() : null,
-            votingLogList.report());
+            votingLogList.report(),
+            getLogDispatcher().getNodesRate().toString());
     if (character == NodeCharacter.LEADER && config.isUseIndirectBroadcasting()) {
       dataMemberReport.setDirectToIndirectFollowerMap(
           ((IndirectLogDispatcher) getLogDispatcher()).getDirectToIndirectFollowerMap());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d613d082d4..6bd6f6a318 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
 import org.apache.iotdb.cluster.expr.craft.FragmentedLogDispatcher;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowBalancer;
 import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
 import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
@@ -300,6 +301,7 @@ public abstract class RaftMember implements RaftMemberMBean {
 
   //  VG-Raft related
   private TrustValueHolder trustValueHolder = null;
+  private FlowBalancer flowBalancer;
 
   protected RaftMember(IStateMachine stateMachine) {
     this.stateMachine = stateMachine;
@@ -329,6 +331,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     startBackGroundThreads();
     setSkipElection(false);
     FlowMonitorManager.INSTANCE.register(thisNode);
+    flowBalancer = new FlowBalancer(logDispatcher, this);
+    flowBalancer.start();
     logger.info("{} started", name);
   }
 
@@ -451,6 +455,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     catchUpService = null;
     heartBeatService = null;
     appendLogThreadPool = null;
+
+    flowBalancer.stop();
     logger.info("Member {} stopped", name);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index bacd4cf0ec..308b904662 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -207,6 +207,7 @@ public class NodeReport {
     long headerLatency;
     private Map<Node, List<Node>> directToIndirectFollowerMap;
     private String votingListReport;
+    private String flowLimiterReport;
 
     public DataMemberReport(
         NodeCharacter character,
@@ -223,7 +224,8 @@ public class NodeReport {
         long prevLastLogIndex,
         long maxAppliedLogIndex,
         RelayEntry nextToRelay,
-        String votingListReport) {
+        String votingListReport,
+        String flowLimiterReport) {
       super(
           character,
           leader,
@@ -240,6 +242,7 @@ public class NodeReport {
       this.header = header;
       this.headerLatency = headerLatency;
       this.votingListReport = votingListReport;
+      this.flowLimiterReport = flowLimiterReport;
     }
 
     @Override
@@ -279,7 +282,9 @@ public class NodeReport {
               + ", logIncrement="
               + (lastLogIndex - prevLastLogIndex)
               + ", "
-              + votingListReport;
+              + votingListReport
+              + ", "
+              + flowLimiterReport;
       if (directToIndirectFollowerMap != null) {
         s = s + ", relayMap=" + directToIndirectFollowerMap;
       }