You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/10 01:26:41 UTC
[iotdb] branch expr_flow updated: add flow balancer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_flow
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_flow by this push:
new f82c3fd620 add flow balancer
f82c3fd620 is described below
commit f82c3fd6209f85df520763a2fd041f4ff75a8868
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Feb 10 09:27:59 2023 +0800
add flow balancer
---
.../expr/craft/FragmentedLogDispatcher.java | 8 +-
.../cluster/expr/flowcontrol/FlowBalancer.java | 108 ++++++++++++++++++++-
.../cluster/expr/flowcontrol/FlowMonitor.java | 27 +++++-
.../expr/flowcontrol/FlowMonitorManager.java | 6 +-
.../iotdb/cluster/log/IndirectLogDispatcher.java | 12 +--
.../apache/iotdb/cluster/log/LogDispatcher.java | 43 ++++----
.../cluster/server/member/DataGroupMember.java | 3 +-
.../iotdb/cluster/server/member/RaftMember.java | 6 ++
.../iotdb/cluster/server/monitor/NodeReport.java | 9 +-
9 files changed, 183 insertions(+), 39 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
index a2075e45f7..31802333af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
public class FragmentedLogDispatcher extends LogDispatcher {
@@ -47,8 +47,8 @@ public class FragmentedLogDispatcher extends LogDispatcher {
request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
int i = 0;
- for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList) {
- BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
+ for (Entry<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesMap.entrySet()) {
+ BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
SendLogRequest fragmentedRequest = new SendLogRequest(request);
fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
fragmentedRequest
@@ -60,7 +60,7 @@ public class FragmentedLogDispatcher extends LogDispatcher {
if (!addSucceeded) {
logger.debug(
"Log queue[{}] of {} is full, ignore the request to this node",
- entry.left,
+ entry.getKey(),
member.getName());
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
index 8159f807a4..9cb507e2d7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
@@ -17,9 +17,115 @@
* under the License.
*/
-
package org.apache.iotdb.cluster.expr.flowcontrol;
+import org.apache.iotdb.cluster.log.LogDispatcher;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
public class FlowBalancer {
+ private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
+ private double maxFlow = 900_000_000;
+ private double minFlow = 10_000_000;
+ private int windowsToUse = 3;
+ private int flowBalanceIntervalMS = 1000;
+ private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
+ private LogDispatcher logDispatcher;
+ private RaftMember member;
+
+ private ScheduledExecutorService scheduledExecutorService;
+
+ public FlowBalancer(LogDispatcher logDispatcher, RaftMember member) {
+ this.logDispatcher = logDispatcher;
+ this.member = member;
+ }
+
+ public void start() {
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ scheduledExecutorService,
+ this::rebalance,
+ flowBalanceIntervalMS,
+ flowBalanceIntervalMS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ scheduledExecutorService.shutdownNow();
+ }
+
+ private void rebalance() {
+ if (!member.isLeader()) {
+ return;
+ }
+
+ List<Node> followers = new ArrayList<>(member.getAllNodes());
+ followers.remove(member.getThisNode());
+
+ int nodeNum = member.getAllNodes().size();
+ int followerNum = nodeNum - 1;
+
+ double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
+ double assumedFlow = thisNodeFlow * 1.1;
+ logger.info("Flow of this node: {}", thisNodeFlow);
+ Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueuesMap =
+ logDispatcher.getNodesLogQueuesMap();
+ Map<Node, Double> nodesRate = logDispatcher.getNodesRate();
+
+ // sort followers according to their queue length
+ followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size()));
+ if (assumedFlow * followerNum > maxFlow) {
+ enterBurst(nodesRate, nodeNum, assumedFlow, followers);
+ } else {
+ exitBurst(followerNum, nodesRate, followers);
+ }
+ logDispatcher.updateRateLimiter();
+ }
+
+ private void enterBurst(
+ Map<Node, Double> nodesRate, int nodeNum, double assumedFlow, List<Node> followers) {
+ int followerNum = nodeNum - 1;
+ int quorumFollowerNum = nodeNum / 2;
+ double remainingFlow = maxFlow;
+ double quorumMaxFlow = maxFlow / quorumFollowerNum;
+ // distribute flow to quorum followers with the shortest queues
+ double flowToQuorum = Math.min(assumedFlow, quorumMaxFlow);
+ int i = 0;
+ for (; i < quorumFollowerNum; i++) {
+ Node node = followers.get(i);
+ nodesRate.put(node, flowToQuorum);
+ remainingFlow -= flowToQuorum;
+ }
+ double flowToRemaining = remainingFlow / (followerNum - quorumFollowerNum);
+ if (flowToRemaining < minFlow) {
+ flowToRemaining = minFlow;
+ }
+ for (; i < followerNum; i++) {
+ Node node = followers.get(i);
+ nodesRate.put(node, flowToRemaining);
+ }
+ }
+
+ private void exitBurst(int followerNum, Map<Node, Double> nodesRate, List<Node> followers) {
+ // lift flow limits
+ for (int i = 0; i < followerNum; i++) {
+ Node node = followers.get(i);
+ nodesRate.put(node, maxFlow);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
index 867eaddd07..c3d9767166 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
@@ -34,13 +34,13 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayDeque;
import java.util.Date;
-import java.util.Deque;
+import java.util.Iterator;
public class FlowMonitor {
private static final Logger logger = LoggerFactory.getLogger(FlowMonitor.class);
private static final String FILE_SUFFIX = ".flow";
- private Deque<Pair<Long, Long>> windows;
+ private ArrayDeque<Pair<Long, Long>> windows;
private long currWindowStart;
private long currWindowSum;
private long windowInterval;
@@ -49,8 +49,7 @@ public class FlowMonitor {
private BufferedWriter writer;
private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- public FlowMonitor(int maxWindowSize, long windowInterval, Node node)
- throws IOException {
+ public FlowMonitor(int maxWindowSize, long windowInterval, Node node) throws IOException {
this.maxWindowSize = maxWindowSize;
this.windows = new ArrayDeque<>(maxWindowSize);
this.windowInterval = windowInterval;
@@ -62,7 +61,9 @@ public class FlowMonitor {
String path =
IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ File.separator
- + node.getInternalIp() + "-" + node.nodeIdentifier
+ + node.getInternalIp()
+ + "-"
+ + node.nodeIdentifier
+ FILE_SUFFIX;
File file = new File(path);
file.delete();
@@ -127,4 +128,20 @@ public class FlowMonitor {
// update the current window
currWindowSum += val;
}
+
+ public double averageFlow(int windowsToUse) {
+ long flowSum = currWindowSum;
+ long intervalSum = System.currentTimeMillis() - currWindowStart;
+ Iterator<Pair<Long, Long>> windowIterator = windows.descendingIterator();
+ for (int i = 1; i < windowsToUse; i++) {
+ if (windowIterator.hasNext()) {
+ Pair<Long, Long> window = windowIterator.next();
+ flowSum += window.right;
+ intervalSum += windowInterval;
+ } else {
+ break;
+ }
+ }
+ return flowSum * 1.0 / intervalSum * 1000;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
index 0efbb1ffd5..7dad514c39 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.cluster.expr.flowcontrol;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-
import org.apache.iotdb.cluster.rpc.thrift.Node;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,4 +72,8 @@ public class FlowMonitorManager {
logger.warn("Flow monitor {} is not registered", node);
}
}
+
+ public double averageFlow(Node node, int windowsToUse) {
+ return monitorMap.get(node).averageFlow(windowsToUse);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index c852575275..eb84287a55 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.WeightedList;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -81,19 +81,19 @@ public class IndirectLogDispatcher extends LogDispatcher {
logBlockingQueue =
new ArrayBlockingQueue<>(
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem(), true);
- nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+ nodesLogQueuesMap.put(node, logBlockingQueue);
}
}
for (int i = 0; i < bindingThreadNum; i++) {
- for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
+ for (Entry<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesMap.entrySet()) {
executorServices
.computeIfAbsent(
- pair.left,
+ pair.getKey(),
n ->
IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
- .submit(newDispatcherThread(pair.left, pair.right));
+ "LogDispatcher-" + member.getName() + "-" + pair.getKey().nodeIdentifier))
+ .submit(newDispatcherThread(pair.getKey(), pair.getValue()));
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 72e22181f1..30864444a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.thrift.TException;
@@ -80,7 +79,7 @@ public class LogDispatcher {
protected RaftMember member;
private static final ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
- protected List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
+ protected Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueuesMap = new HashMap<>();
protected Map<Node, Boolean> nodesEnabled;
protected Map<Node, RateLimiter> nodesRateLimiter = new HashMap<>();
protected Map<Node, Double> nodesRate = new HashMap<>();
@@ -99,7 +98,7 @@ public class LogDispatcher {
createQueueAndBindingThreads();
}
- protected void updateRateLimiter() {
+ public void updateRateLimiter() {
logger.info("Node rates: {}", nodesRate);
for (Entry<Node, Double> nodeDoubleEntry : nodesRate.entrySet()) {
nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
@@ -115,7 +114,7 @@ public class LogDispatcher {
logBlockingQueue =
new ArrayBlockingQueue<>(
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
- nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+ nodesLogQueuesMap.put(node, logBlockingQueue);
FlowMonitorManager.INSTANCE.register(node);
nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
nodesRate.put(node, baseRate * i);
@@ -125,14 +124,14 @@ public class LogDispatcher {
updateRateLimiter();
for (i = 0; i < bindingThreadNum; i++) {
- for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
+ for (Entry<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesMap.entrySet()) {
executorServices
.computeIfAbsent(
- pair.left,
+ pair.getKey(),
n ->
IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
- .submit(newDispatcherThread(pair.left, pair.right));
+ "LogDispatcher-" + member.getName() + "-" + pair.getKey().nodeIdentifier))
+ .submit(newDispatcherThread(pair.getKey(), pair.getValue()));
}
}
}
@@ -191,30 +190,30 @@ public class LogDispatcher {
verifiers = member.getTrustValueHolder().chooseVerifiers();
}
- for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList) {
- if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.left, false)) {
+ for (Entry<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesMap.entrySet()) {
+ if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.getKey(), false)) {
continue;
}
- if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.left, verifiers)) {
- request = transformRequest(entry.left, request);
+ if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.getKey(), verifiers)) {
+ request = transformRequest(entry.getKey(), request);
request.setVerifier(true);
}
- BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
+ BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
try {
boolean addSucceeded = addToQueue(nodeLogQueue, request);
if (!addSucceeded) {
logger.debug(
"Log queue[{}] of {} is full, ignore the request to this node",
- entry.left,
+ entry.getKey(),
member.getName());
}
} catch (IllegalStateException e) {
logger.debug(
"Log queue[{}] of {} is full, ignore the request to this node",
- entry.left,
+ entry.getKey(),
member.getName());
}
}
@@ -486,8 +485,7 @@ public class LogDispatcher {
Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
result = client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier);
FlowMonitorManager.INSTANCE.report(
- receiver,
- logRequest.appendEntryRequest.entry.remaining());
+ receiver, logRequest.appendEntryRequest.entry.remaining());
nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
@@ -529,8 +527,7 @@ public class LogDispatcher {
try {
client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier, handler);
FlowMonitorManager.INSTANCE.report(
- receiver,
- logRequest.appendEntryRequest.entry.remaining());
+ receiver, logRequest.appendEntryRequest.entry.remaining());
nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
} catch (TException e) {
handler.onError(e);
@@ -593,4 +590,12 @@ public class LogDispatcher {
}
}
}
+
+ public Map<Node, Double> getNodesRate() {
+ return nodesRate;
+ }
+
+ public Map<Node, BlockingQueue<SendLogRequest>> getNodesLogQueuesMap() {
+ return nodesLogQueuesMap;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 7480ebc295..b318b15163 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -1023,7 +1023,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
prevLastLogIndex,
logManager.getMaxHaveAppliedCommitIndex(),
logRelay != null ? logRelay.first() : null,
- votingLogList.report());
+ votingLogList.report(),
+ getLogDispatcher().getNodesRate().toString());
if (character == NodeCharacter.LEADER && config.isUseIndirectBroadcasting()) {
dataMemberReport.setDirectToIndirectFollowerMap(
((IndirectLogDispatcher) getLogDispatcher()).getDirectToIndirectFollowerMap());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d613d082d4..6bd6f6a318 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
import org.apache.iotdb.cluster.expr.craft.FragmentedLogDispatcher;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowBalancer;
import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
@@ -300,6 +301,7 @@ public abstract class RaftMember implements RaftMemberMBean {
// VG-Raft related
private TrustValueHolder trustValueHolder = null;
+ private FlowBalancer flowBalancer;
protected RaftMember(IStateMachine stateMachine) {
this.stateMachine = stateMachine;
@@ -329,6 +331,8 @@ public abstract class RaftMember implements RaftMemberMBean {
startBackGroundThreads();
setSkipElection(false);
FlowMonitorManager.INSTANCE.register(thisNode);
+ flowBalancer = new FlowBalancer(logDispatcher, this);
+ flowBalancer.start();
logger.info("{} started", name);
}
@@ -451,6 +455,8 @@ public abstract class RaftMember implements RaftMemberMBean {
catchUpService = null;
heartBeatService = null;
appendLogThreadPool = null;
+
+ flowBalancer.stop();
logger.info("Member {} stopped", name);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index bacd4cf0ec..308b904662 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -207,6 +207,7 @@ public class NodeReport {
long headerLatency;
private Map<Node, List<Node>> directToIndirectFollowerMap;
private String votingListReport;
+ private String flowLimiterReport;
public DataMemberReport(
NodeCharacter character,
@@ -223,7 +224,8 @@ public class NodeReport {
long prevLastLogIndex,
long maxAppliedLogIndex,
RelayEntry nextToRelay,
- String votingListReport) {
+ String votingListReport,
+ String flowLimiterReport) {
super(
character,
leader,
@@ -240,6 +242,7 @@ public class NodeReport {
this.header = header;
this.headerLatency = headerLatency;
this.votingListReport = votingListReport;
+ this.flowLimiterReport = flowLimiterReport;
}
@Override
@@ -279,7 +282,9 @@ public class NodeReport {
+ ", logIncrement="
+ (lastLogIndex - prevLastLogIndex)
+ ", "
- + votingListReport;
+ + votingListReport
+ + ", "
+ + flowLimiterReport;
if (directToIndirectFollowerMap != null) {
s = s + ", relayMap=" + directToIndirectFollowerMap;
}