You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/13 01:21:17 UTC
[iotdb] branch expr_flow updated: update flow balancer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_flow
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_flow by this push:
new 9b2a5058b6 update flow balancer
9b2a5058b6 is described below
commit 9b2a5058b630c6924124240cfc392cedcf2260cd
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Feb 13 09:22:37 2023 +0800
update flow balancer
---
cluster/collect-log-dc.sh | 13 ++
.../apache/iotdb/cluster/config/ClusterConfig.java | 37 +++
.../iotdb/cluster/config/ClusterDescriptor.java | 22 ++
.../org/apache/iotdb/cluster/expr/ExprBench.java | 253 +++++++++++++--------
.../cluster/expr/flowcontrol/FlowBalancer.java | 10 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 16 +-
.../iotdb/cluster/server/member/RaftMember.java | 10 +-
.../apache/iotdb/cluster/server/monitor/Timer.java | 6 +-
8 files changed, 262 insertions(+), 105 deletions(-)
diff --git a/cluster/collect-log-dc.sh b/cluster/collect-log-dc.sh
new file mode 100644
index 0000000000..b4339e0773
--- /dev/null
+++ b/cluster/collect-log-dc.sh
@@ -0,0 +1,13 @@
+src_path=/home/jt/iotdb_expr_vg/logs/*
+
+ips=(dc16 dc17 dc18)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
+target_path=/d/CodeRepo/iotdb/cluster/target/logs
+
+mkdir $target_path
+rm $target_path/*
+for ip in ${ips[*]}
+ do
+ mkdir $target_path/$ip
+ scp -r jt@$ip:$src_path $target_path/$ip
+ done
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 8c897e5552..d26f0e4de3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -225,6 +225,10 @@ public class ClusterConfig {
private int flowMonitorMaxWindowSize = 1000;
private long flowMonitorWindowInterval = 1000;
+ private boolean useFollowerLoadBalance = false;
+ private int followerLoadBalanceWindowsToUse = 3;
+ private double followerLoadBalanceOverestimateFactor = 1.1;
+ private int logDispatcherBatchSize = 10;
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
@@ -713,4 +717,37 @@ public class ClusterConfig {
public long getFlowMonitorWindowInterval() {
return flowMonitorWindowInterval;
}
+
+ public boolean isUseFollowerLoadBalance() {
+ return useFollowerLoadBalance;
+ }
+
+ public void setUseFollowerLoadBalance(boolean useFollowerLoadBalance) {
+ this.useFollowerLoadBalance = useFollowerLoadBalance;
+ }
+
+ public int getFollowerLoadBalanceWindowsToUse() {
+ return followerLoadBalanceWindowsToUse;
+ }
+
+ public void setFollowerLoadBalanceWindowsToUse(int followerLoadBalanceWindowsToUse) {
+ this.followerLoadBalanceWindowsToUse = followerLoadBalanceWindowsToUse;
+ }
+
+ public double getFollowerLoadBalanceOverestimateFactor() {
+ return followerLoadBalanceOverestimateFactor;
+ }
+
+ public void setFollowerLoadBalanceOverestimateFactor(
+ double followerLoadBalanceOverestimateFactor) {
+ this.followerLoadBalanceOverestimateFactor = followerLoadBalanceOverestimateFactor;
+ }
+
+ public int getLogDispatcherBatchSize() {
+ return logDispatcherBatchSize;
+ }
+
+ public void setLogDispatcherBatchSize(int logDispatcherBatchSize) {
+ this.logDispatcherBatchSize = logDispatcherBatchSize;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 42a5b80233..6b1c757f97 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -397,6 +397,28 @@ public class ClusterDescriptor {
properties.getProperty(
"enable_instrumenting", String.valueOf(config.isEnableInstrumenting()))));
+ config.setUseFollowerLoadBalance(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "use_follower_load_balance", String.valueOf(config.isUseFollowerLoadBalance()))));
+
+ config.setFollowerLoadBalanceWindowsToUse(
+ Integer.parseInt(
+ properties.getProperty(
+ "follower_load_balance_windows_to_use",
+ String.valueOf(config.getFollowerLoadBalanceWindowsToUse()))));
+
+ config.setFollowerLoadBalanceOverestimateFactor(
+ Double.parseDouble(
+ properties.getProperty(
+ "follower_load_balance_overestimate_factor",
+ String.valueOf(config.getFollowerLoadBalanceOverestimateFactor()))));
+
+ config.setLogDispatcherBatchSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "log_dispatcher_batch_size", String.valueOf(config.getLogDispatcherBatchSize()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 8042a03493..b14ece54dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -37,10 +37,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -52,11 +56,15 @@ public class ExprBench {
private AtomicLong requestCounter = new AtomicLong();
private AtomicLong latencySum = new AtomicLong();
+ private AtomicLong burstRequestCounter = new AtomicLong();
+ private AtomicLong burstLatencySum = new AtomicLong();
private long maxLatency = 0;
private int threadNum = 64;
private int workloadSize = 64 * 1024;
private int printInterval = 1000;
private ClientManager clientPool;
+ private long maxRunningSecond;
+ private long burstInterval;
private int maxRequestNum;
private ExecutorService pool = Executors.newCachedThreadPool();
private List<Node> nodeList = new ArrayList<>();
@@ -65,12 +73,16 @@ public class ExprBench {
private List<EndPoint> endPoints = new ArrayList<>();
private Map<EndPoint, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
private Map<EndPoint, Statistic> latencyMap = new ConcurrentHashMap<>();
+ private long startTime;
+ private volatile boolean duringBurst = false;
+ private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ExprBench(Node target) {
clientPool = new ClientManager(false, Type.MetaGroupClient);
}
private static class EndPoint {
+
private Node node;
private int raftId;
@@ -86,6 +98,7 @@ public class ExprBench {
}
private static class Statistic {
+
private AtomicLong sum = new AtomicLong();
private AtomicLong cnt = new AtomicLong();
@@ -100,91 +113,142 @@ public class ExprBench {
}
}
+ private void benchmarkTask(int taskId) {
+ int endPointIdx = taskId % endPoints.size();
+ Client client = null;
+
+ ExecutNonQueryReq request = new ExecutNonQueryReq();
+ DummyPlan plan = new DummyPlan();
+ plan.setWorkload(new byte[workloadSize]);
+ plan.setNeedForward(true);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+ Map<EndPoint, Node> endPointLeaderMap = new HashMap<>();
+
+ Node target = null;
+ long currRequsetNum = -1;
+ while (true) {
+ EndPoint endPoint = endPoints.get(endPointIdx);
+ RateLimiter rateLimiter = rateLimiterMap.get(endPoint);
+ if (rateLimiter != null) {
+ rateLimiter.acquire(1);
+ }
+
+ target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node);
+ int raftId = endPoint.raftId;
+ plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId);
+
+ try {
+ client = clientPool.borrowSyncClient(target, ClientCategory.META);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ byteBuffer.clear();
+ plan.serialize(byteBuffer);
+ byteBuffer.flip();
+ request.planBytes = byteBuffer;
+ request.setPlanBytesIsSet(true);
+
+ long reqLatency = System.nanoTime();
+ try {
+ TSStatus status = client.executeNonQueryPlan(request);
+ clientPool.returnSyncClient(client, target, ClientCategory.META);
+ if (status.isSetRedirectNode()) {
+ Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880);
+ endPointLeaderMap.put(endPoint, leader);
+ logger.debug("Leader of {} is changed to {}", endPoint, leader);
+ }
+
+ currRequsetNum = requestCounter.incrementAndGet();
+ if (currRequsetNum > threadNum * 10L) {
+ reqLatency = System.nanoTime() - reqLatency;
+ maxLatency = Math.max(maxLatency, reqLatency);
+ latencySum.addAndGet(reqLatency);
+ latencyMap.get(endPoint).add(reqLatency);
+ if (duringBurst) {
+ burstRequestCounter.incrementAndGet();
+ burstLatencySum.addAndGet(reqLatency);
+ }
+ }
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (currRequsetNum % printInterval == 0) {
+ System.out.println(
+ String.format(
+ "%s %d %d %f(%f) %f %f",
+ dateFormat.format(new Date(System.currentTimeMillis())),
+ elapsedTime,
+ currRequsetNum,
+ (currRequsetNum + 0.0) / elapsedTime,
+ currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
+ maxLatency / 1000.0,
+ (latencySum.get() + 0.0) / currRequsetNum));
+ System.out.println(latencyMap);
+ }
+
+ if (currRequsetNum >= maxRequestNum || elapsedTime / 1000 >= maxRunningSecond) {
+ break;
+ }
+ }
+ }
+
+ private void insertBurst() {
+ long burstStart = maxRunningSecond / 2 - burstInterval / 2;
+ long burstEnd = maxRunningSecond / 2 + burstInterval / 2;
+
+ long elapsedTime = (System.currentTimeMillis() - startTime) / 1000;
+ while (elapsedTime < burstStart) {
+ try {
+ Thread.sleep(1000);
+ elapsedTime = (System.currentTimeMillis() - startTime) / 1000;
+ } catch (InterruptedException e) {
+ logger.warn("Unexpected interruption");
+ }
+ }
+ duringBurst = true;
+ System.out.printf("Burst starts");
+ for (Entry<EndPoint, RateLimiter> endPointRateLimiterEntry : rateLimiterMap.entrySet()) {
+ RateLimiter rateLimiter = endPointRateLimiterEntry.getValue();
+ rateLimiter.setRate(rateLimiter.getRate() * 2);
+ }
+
+ while (elapsedTime < burstEnd) {
+ try {
+ Thread.sleep(1000);
+ elapsedTime = (System.currentTimeMillis() - startTime) / 1000;
+ } catch (InterruptedException e) {
+ logger.warn("Unexpected interruption");
+ }
+ }
+ duringBurst = false;
+ System.out.printf("Burst ends");
+ for (Entry<EndPoint, RateLimiter> endPointRateLimiterEntry : rateLimiterMap.entrySet()) {
+ RateLimiter rateLimiter = endPointRateLimiterEntry.getValue();
+ rateLimiter.setRate(rateLimiter.getRate() / 2);
+ }
+ }
+
public void benchmark() {
- long startTime = System.currentTimeMillis();
+ startTime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
- int finalI = i;
- pool.submit(
- () -> {
- int endPointIdx = finalI % endPoints.size();
- Client client = null;
-
- ExecutNonQueryReq request = new ExecutNonQueryReq();
- DummyPlan plan = new DummyPlan();
- plan.setWorkload(new byte[workloadSize]);
- plan.setNeedForward(true);
-
- ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
- Map<EndPoint, Node> endPointLeaderMap = new HashMap<>();
-
- Node target = null;
- long currRequsetNum = -1;
- while (true) {
-
- EndPoint endPoint = endPoints.get(endPointIdx);
- RateLimiter rateLimiter = rateLimiterMap.get(endPoint);
- if (rateLimiter != null) {
- rateLimiter.acquire(1);
- }
-
- target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node);
- int raftId = endPoint.raftId;
- plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId);
-
- try {
- client = clientPool.borrowSyncClient(target, ClientCategory.META);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- byteBuffer.clear();
- plan.serialize(byteBuffer);
- byteBuffer.flip();
- request.planBytes = byteBuffer;
- request.setPlanBytesIsSet(true);
-
- long reqLatency = System.nanoTime();
- try {
- TSStatus status = client.executeNonQueryPlan(request);
- clientPool.returnSyncClient(client, target, ClientCategory.META);
- if (status.isSetRedirectNode()) {
- Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880);
- endPointLeaderMap.put(endPoint, leader);
- logger.debug("Leader of {} is changed to {}", endPoint, leader);
- }
-
- currRequsetNum = requestCounter.incrementAndGet();
- if (currRequsetNum > threadNum * 10) {
- reqLatency = System.nanoTime() - reqLatency;
- maxLatency = Math.max(maxLatency, reqLatency);
- latencySum.addAndGet(reqLatency);
- latencyMap.get(endPoint).add(reqLatency);
- }
- } catch (TException e) {
- e.printStackTrace();
- }
-
- if (currRequsetNum % printInterval == 0) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- System.out.println(
- String.format(
- "%d %d %f(%f) %f %f",
- elapsedTime,
- currRequsetNum,
- (currRequsetNum + 0.0) / elapsedTime,
- currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
- maxLatency / 1000.0,
- (latencySum.get() + 0.0) / currRequsetNum));
- System.out.println(latencyMap);
- }
-
- if (currRequsetNum >= maxRequestNum) {
- break;
- }
- }
- });
+ int taskId = i;
+ pool.submit(() -> benchmarkTask(taskId));
}
pool.shutdown();
+ if (burstInterval > 0) {
+ insertBurst();
+ }
+ while (!pool.isTerminated()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.warn("Unexpected interruption");
+ }
+ }
}
public void setMaxRequestNum(int maxRequestNum) {
@@ -195,11 +259,13 @@ public class ExprBench {
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
Node target = new Node();
ExprBench bench = new ExprBench(target);
- bench.maxRequestNum = Integer.parseInt(args[0]);
- bench.threadNum = Integer.parseInt(args[1]);
- bench.workloadSize = Integer.parseInt(args[2]) * 1024;
- bench.printInterval = Integer.parseInt(args[3]);
- String[] nodesSplit = args[4].split(",");
+ bench.maxRunningSecond = Integer.parseInt(args[0]);
+ bench.burstInterval = Integer.parseInt(args[1]);
+ bench.maxRequestNum = Integer.parseInt(args[2]);
+ bench.threadNum = Integer.parseInt(args[3]);
+ bench.workloadSize = Integer.parseInt(args[4]) * 1024;
+ bench.printInterval = Integer.parseInt(args[5]);
+ String[] nodesSplit = args[6].split(",");
for (String s : nodesSplit) {
String[] nodeSplit = s.split(":");
Node node = new Node();
@@ -207,13 +273,13 @@ public class ExprBench {
node.setMetaPort(Integer.parseInt(nodeSplit[1]));
bench.nodeList.add(node);
}
- String[] raftFactorSplit = args[5].split(",");
+ String[] raftFactorSplit = args[7].split(",");
bench.raftFactors = new int[raftFactorSplit.length];
for (int i = 0; i < raftFactorSplit.length; i++) {
bench.raftFactors[i] = Integer.parseInt(raftFactorSplit[i]);
}
- if (args.length >= 7) {
- String[] ratesSplit = args[6].split(",");
+ if (args.length >= 9) {
+ String[] ratesSplit = args[8].split(",");
bench.rateLimits = new int[ratesSplit.length];
for (int i = 0; i < ratesSplit.length; i++) {
bench.rateLimits[i] = Integer.parseInt(ratesSplit[i]);
@@ -236,5 +302,14 @@ public class ExprBench {
bench.benchmark();
System.out.println(bench.latencyMap);
+ if (bench.burstInterval > 0) {
+ long burstRequest = bench.burstRequestCounter.get();
+ long burstLatencySum = bench.burstLatencySum.get();
+ double burstAvgLatency = burstLatencySum * 1.0 / burstRequest;
+ double burstThroughput = burstRequest * 1.0 / bench.burstInterval;
+ System.out.printf(
+ "Statistics during burst: num request %d, throughput %f, latency %f",
+ burstRequest, burstThroughput, burstAvgLatency);
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
index 9cb507e2d7..3afaa2b535 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.expr.flowcontrol;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -42,7 +43,10 @@ public class FlowBalancer {
private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
private double maxFlow = 900_000_000;
private double minFlow = 10_000_000;
- private int windowsToUse = 3;
+ private int windowsToUse =
+ ClusterDescriptor.getInstance().getConfig().getFollowerLoadBalanceWindowsToUse();
+ private double overestimateFactor =
+ ClusterDescriptor.getInstance().getConfig().getFollowerLoadBalanceOverestimateFactor();
private int flowBalanceIntervalMS = 1000;
private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
private LogDispatcher logDispatcher;
@@ -81,7 +85,7 @@ public class FlowBalancer {
int followerNum = nodeNum - 1;
double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
- double assumedFlow = thisNodeFlow * 1.1;
+ double assumedFlow = thisNodeFlow * overestimateFactor;
logger.info("Flow of this node: {}", thisNodeFlow);
Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueuesMap =
logDispatcher.getNodesLogQueuesMap();
@@ -108,7 +112,7 @@ public class FlowBalancer {
int i = 0;
for (; i < quorumFollowerNum; i++) {
Node node = followers.get(i);
- nodesRate.put(node, flowToQuorum);
+ nodesRate.put(node, maxFlow);
remainingFlow -= flowToQuorum;
}
double flowToRemaining = remainingFlow / (followerNum - quorumFollowerNum);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 30864444a6..dd7dfca348 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -106,8 +106,6 @@ public class LogDispatcher {
}
void createQueueAndBindingThreads() {
- double baseRate = 300_000_000.0;
- int i = 1;
for (Node node : member.getAllNodes()) {
if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
BlockingQueue<SendLogRequest> logBlockingQueue;
@@ -117,13 +115,11 @@ public class LogDispatcher {
nodesLogQueuesMap.put(node, logBlockingQueue);
FlowMonitorManager.INSTANCE.register(node);
nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
- nodesRate.put(node, baseRate * i);
- i += 100;
}
}
updateRateLimiter();
- for (i = 0; i < bindingThreadNum; i++) {
+ for (int i = 0; i < bindingThreadNum; i++) {
for (Entry<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesMap.entrySet()) {
executorServices
.computeIfAbsent(
@@ -420,20 +416,24 @@ public class LogDispatcher {
currBatch.get(0).getVotingLog().getLog().getCurrLogIndex(),
currBatch.get(currBatch.size() - 1).getVotingLog().getLog().getCurrLogIndex());
while (logIndex < currBatch.size()) {
- long logSize = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
+ long logSize = 0;
+ long logSizeLimit = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
List<ByteBuffer> logList = new ArrayList<>();
int prevIndex = logIndex;
for (; logIndex < currBatch.size(); logIndex++) {
long curSize = currBatch.get(logIndex).getAppendEntryRequest().entry.array().length;
- if (logSize - curSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
+ if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
break;
}
- logSize -= curSize;
+ logSize += curSize;
logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
}
AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
+ FlowMonitorManager.INSTANCE.report(receiver, logSize);
+ nodesRateLimiter.get(receiver).acquire((int) logSize);
+
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 6bd6f6a318..6fa543fe8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -331,8 +331,10 @@ public abstract class RaftMember implements RaftMemberMBean {
startBackGroundThreads();
setSkipElection(false);
FlowMonitorManager.INSTANCE.register(thisNode);
- flowBalancer = new FlowBalancer(logDispatcher, this);
- flowBalancer.start();
+ if (config.isUseFollowerLoadBalance()) {
+ flowBalancer = new FlowBalancer(logDispatcher, this);
+ flowBalancer.start();
+ }
logger.info("{} started", name);
}
@@ -456,7 +458,9 @@ public abstract class RaftMember implements RaftMemberMBean {
heartBeatService = null;
appendLogThreadPool = null;
- flowBalancer.stop();
+ if (flowBalancer != null) {
+ flowBalancer.stop();
+ }
logger.info("Member {} stopped", name);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 2d2b860e77..f127a2885f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -498,8 +498,10 @@ public class Timer {
private static void printTo(Statistic currNode, StringBuilder out) {
if (currNode != Statistic.ROOT && currNode.valid) {
- indent(out, currNode.level);
- out.append(currNode).append("\n");
+ if (currNode.counter.get() != 0) {
+ indent(out, currNode.level);
+ out.append(currNode).append("\n");
+ }
}
for (Statistic child : currNode.children) {
printTo(child, out);