You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/09 02:39:56 UTC
[iotdb] 01/01: add flow monitor
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_flow
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit da6ea12fea7f1126bf42157a35915c2af656a1c1
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Feb 9 10:41:11 2023 +0800
add flow monitor
---
cluster/collect-dc.sh | 12 ++
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 3 +
.../apache/iotdb/cluster/config/ClusterConfig.java | 11 ++
.../iotdb/cluster/config/ClusterConstant.java | 2 +-
.../cluster/expr/flowcontrol/FlowBalancer.java | 19 +--
.../cluster/expr/flowcontrol/FlowMonitor.java | 130 +++++++++++++++++++++
.../expr/flowcontrol/FlowMonitorManager.java | 75 ++++++++++++
.../java/org/apache/iotdb/cluster/log/Log.java | 4 +
.../apache/iotdb/cluster/log/LogDispatcher.java | 29 ++++-
.../apache/iotdb/cluster/log/VotingLogList.java | 8 +-
.../iotdb/cluster/log/logtypes/RequestLog.java | 5 +
.../handlers/caller/AppendNodeEntryHandler.java | 1 +
.../cluster/server/member/DataGroupMember.java | 3 +-
.../iotdb/cluster/server/member/RaftMember.java | 4 +
.../iotdb/cluster/server/monitor/NodeReport.java | 9 +-
.../apache/iotdb/cluster/server/monitor/Timer.java | 23 +++-
.../common/request/IConsensusRequest.java | 4 +
.../apache/iotdb/db/qp/physical/sys/DummyPlan.java | 5 +
18 files changed, 319 insertions(+), 28 deletions(-)
diff --git a/cluster/collect-dc.sh b/cluster/collect-dc.sh
new file mode 100644
index 0000000000..58aa0b67dc
--- /dev/null
+++ b/cluster/collect-dc.sh
@@ -0,0 +1,12 @@
+src_path=/home/jt/iotdb_expr_vg/data/system/*.flow
+
+ips=(dc16)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
+target_path=/d/CodeRepo/iotdb/cluster/target/flow
+
+rm $target_path/*
+for ip in ${ips[*]}
+ do
+ mkdir $target_path
+ scp -r jt@$ip:$src_path $target_path
+ done
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index dfa2427c7e..d4bbccbef7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
import org.apache.iotdb.cluster.impl.PlanBasedStateMachine;
import org.apache.iotdb.cluster.metadata.CSchemaProcessor;
@@ -313,6 +314,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
logger.info("Index diff: {}", Statistic.RAFT_RECEIVER_INDEX_DIFF);
logger.info("Follower time: {}", Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL);
logger.info("Window length: {}", Statistic.RAFT_WINDOW_LENGTH);
+
+ FlowMonitorManager.INSTANCE.close();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 3232ecd5b4..8c897e5552 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -223,6 +223,9 @@ public class ClusterConfig {
private boolean enableInstrumenting = true;
+ private int flowMonitorMaxWindowSize = 1000;
+ private long flowMonitorWindowInterval = 1000;
+
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
* there is something error for getting the ip of the hostname, then set the internalIp as
@@ -702,4 +705,12 @@ public class ClusterConfig {
public void setEnableInstrumenting(boolean enableInstrumenting) {
this.enableInstrumenting = enableInstrumenting;
}
+
+ public int getFlowMonitorMaxWindowSize() {
+ return flowMonitorMaxWindowSize;
+ }
+
+ public long getFlowMonitorWindowInterval() {
+ return flowMonitorWindowInterval;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 27684e4d3d..2cd81a1e74 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -53,7 +53,7 @@ public class ClusterConstant {
* every "REPORT_INTERVAL_SEC" seconds, a reporter thread will print the status of all raft
* members in this node.
*/
- public static final int REPORT_INTERVAL_SEC = 2;
+ public static final int REPORT_INTERVAL_SEC = 1;
/**
* during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
similarity index 56%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
index daa2a7d7d0..8159f807a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
@@ -17,22 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.consensus.common.request;
-import java.nio.ByteBuffer;
+package org.apache.iotdb.cluster.expr.flowcontrol;
+
+public class FlowBalancer {
-public interface IConsensusRequest {
- /**
- * Serialize all the data to a ByteBuffer.
- *
- * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to reduce the number of
- * memory copies.
- *
- * <p>To improve efficiency, a specific implementation could return a DirectByteBuffer to reduce
- * the memory copy required to send an RPC
- *
- * <p>Note: The implementation needs to ensure that the data in the returned Bytebuffer cannot be
- * changed or an error may occur
- */
- ByteBuffer serializeToByteBuffer();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
new file mode 100644
index 0000000000..867eaddd07
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr.flowcontrol;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.Date;
+import java.util.Deque;
+
+public class FlowMonitor {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowMonitor.class);
+ private static final String FILE_SUFFIX = ".flow";
+ private Deque<Pair<Long, Long>> windows;
+ private long currWindowStart;
+ private long currWindowSum;
+ private long windowInterval;
+ private Node node;
+ private int maxWindowSize;
+ private BufferedWriter writer;
+ private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public FlowMonitor(int maxWindowSize, long windowInterval, Node node)
+ throws IOException {
+ this.maxWindowSize = maxWindowSize;
+ this.windows = new ArrayDeque<>(maxWindowSize);
+ this.windowInterval = windowInterval;
+ this.node = node;
+ initSerializer();
+ }
+
+ private void initSerializer() throws IOException {
+ String path =
+ IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ + File.separator
+ + node.getInternalIp() + "-" + node.nodeIdentifier
+ + FILE_SUFFIX;
+ File file = new File(path);
+ file.delete();
+ writer = new BufferedWriter(new FileWriter(file));
+ writer.write("Time,FlowSum");
+ writer.newLine();
+ }
+
+ public void close() {
+ saveWindow();
+ while (windows.size() > 0) {
+ serializeWindow();
+ }
+ try {
+ writer.close();
+ logger.info("Flow monitor {} is closed", node);
+ } catch (IOException e) {
+ logger.warn("Cannot close serializer of {}", node, e);
+ }
+ }
+
+ private void resetWindow(long newWindowStart) {
+ currWindowStart = newWindowStart;
+ currWindowSum = 0;
+ }
+
+ private void serializeWindow() {
+ Pair<Long, Long> window = windows.removeFirst();
+ try {
+ String windowString =
+ String.format("%s,%d", dateFormat.format(new Date(window.left)), window.right);
+ logger.debug("New window {} serialized by {}", windowString, node);
+ writer.write(windowString);
+ writer.newLine();
+ } catch (IOException e) {
+ logger.warn("Cannot serialize window {} of {}", window, node, e);
+ }
+ }
+
+ private void checkSize() {
+ if (windows.size() == maxWindowSize) {
+ serializeWindow();
+ }
+ }
+
+ private void saveWindow() {
+ if (currWindowSum != 0) {
+ checkSize();
+ windows.add(new Pair<>(currWindowStart, currWindowSum));
+ logger.debug("New window {},{} generated by {}", currWindowStart, currWindowSum, node);
+ }
+ }
+
+ public synchronized void report(long val) {
+ long currTime = System.currentTimeMillis();
+ long targetWindowStart = currTime - currTime % windowInterval;
+ if (targetWindowStart != currWindowStart) {
+ // save the current window and start a new window
+ saveWindow();
+ resetWindow(targetWindowStart);
+ }
+ // update the current window
+ currWindowSum += val;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
new file mode 100644
index 0000000000..0efbb1ffd5
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr.flowcontrol;
+
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FlowMonitorManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowMonitorManager.class);
+ public static final FlowMonitorManager INSTANCE = new FlowMonitorManager();
+
+ private Map<Node, FlowMonitor> monitorMap = new ConcurrentHashMap<>();
+ private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+
+ private FlowMonitorManager() {}
+
+ public void close() {
+ for (FlowMonitor flowMonitor : monitorMap.values()) {
+ flowMonitor.close();
+ }
+ monitorMap.clear();
+ }
+
+ public void register(Node node) {
+ logger.info("Registering flow monitor {}", node);
+ monitorMap.computeIfAbsent(
+ node,
+ n -> {
+ try {
+ return new FlowMonitor(
+ clusterConfig.getFlowMonitorMaxWindowSize(),
+ clusterConfig.getFlowMonitorWindowInterval(),
+ n);
+ } catch (IOException e) {
+ logger.warn("Cannot register flow monitor for {}", node, e);
+ return null;
+ }
+ });
+ }
+
+ public void report(Node node, long val) {
+ FlowMonitor flowMonitor = monitorMap.get(node);
+ if (flowMonitor != null) {
+ flowMonitor.report(val);
+ } else {
+ logger.warn("Flow monitor {} is not registered", node);
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index c2b8b70259..1aa28833f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -170,4 +170,8 @@ public abstract class Log implements Comparable<Log> {
public void setSequenceStartTime(long sequenceStartTime) {
this.sequenceStartTime = sequenceStartTime;
}
+
+ public long estimateSize() {
+ return 0;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 89ce46e41f..72e22181f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
@@ -42,6 +43,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.utils.Pair;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -80,6 +82,8 @@ public class LogDispatcher {
protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
protected List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
protected Map<Node, Boolean> nodesEnabled;
+ protected Map<Node, RateLimiter> nodesRateLimiter = new HashMap<>();
+ protected Map<Node, Double> nodesRate = new HashMap<>();
protected Map<Node, ExecutorService> executorServices = new HashMap<>();
protected ExecutorService resultHandlerThread =
IoTDBThreadPoolFactory.newFixedThreadPool(2, "AppendResultHandler");
@@ -95,7 +99,16 @@ public class LogDispatcher {
createQueueAndBindingThreads();
}
+ protected void updateRateLimiter() {
+ logger.info("Node rates: {}", nodesRate);
+ for (Entry<Node, Double> nodeDoubleEntry : nodesRate.entrySet()) {
+ nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
+ }
+ }
+
void createQueueAndBindingThreads() {
+ double baseRate = 300_000_000.0;
+ int i = 1;
for (Node node : member.getAllNodes()) {
if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
BlockingQueue<SendLogRequest> logBlockingQueue;
@@ -103,10 +116,15 @@ public class LogDispatcher {
new ArrayBlockingQueue<>(
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+ FlowMonitorManager.INSTANCE.register(node);
+ nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
+ nodesRate.put(node, baseRate * i);
+ i += 100;
}
}
+ updateRateLimiter();
- for (int i = 0; i < bindingThreadNum; i++) {
+ for (i = 0; i < bindingThreadNum; i++) {
for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
executorServices
.computeIfAbsent(
@@ -467,6 +485,11 @@ public class LogDispatcher {
int concurrentSender = concurrentSenderNum.incrementAndGet();
Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
result = client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier);
+ FlowMonitorManager.INSTANCE.report(
+ receiver,
+ logRequest.appendEntryRequest.entry.remaining());
+ nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
+
Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
logRequest.getVotingLog().getLog().getCreateTime());
concurrentSenderNum.decrementAndGet();
@@ -505,6 +528,10 @@ public class LogDispatcher {
if (client != null) {
try {
client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier, handler);
+ FlowMonitorManager.INSTANCE.report(
+ receiver,
+ logRequest.appendEntryRequest.entry.remaining());
+ nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
} catch (TException e) {
handler.onError(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 96bc3907e5..4d5bf6052c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -90,7 +90,7 @@ public class VotingLogList {
return false;
}
nodeIndices.sort(Entry.comparingByValue());
- Long value = nodeIndices.get(quorumSize - 1).getValue();
+ Long value = nodeIndices.get(nodeIndices.size() - quorumSize).getValue();
long oldValue = newCommitIndex.getAndUpdate(oldV -> Math.max(value, oldV));
return value > oldValue;
}
@@ -137,4 +137,10 @@ public class VotingLogList {
}
return num;
}
+
+ public String report() {
+ return String.format(
+ "Nodes accepted indices: %s, new commitIndex: %d",
+ stronglyAcceptedIndices, newCommitIndex.get());
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
index 8f66c52763..5fa3b1aa05 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
@@ -136,4 +136,9 @@ public class RequestLog extends Log {
public int hashCode() {
return Objects.hash(super.hashCode(), request);
}
+
+ @Override
+ public long estimateSize() {
+ return request.estimateSize();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 3dc18e32a6..33f4063492 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -71,6 +71,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
@Override
public void onComplete(AppendEntryResult response) {
+
if (Timer.ENABLE_INSTRUMENTING) {
Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 3795718199..7480ebc295 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -1022,7 +1022,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
lastHeartbeatReceivedTime,
prevLastLogIndex,
logManager.getMaxHaveAppliedCommitIndex(),
- logRelay != null ? logRelay.first() : null);
+ logRelay != null ? logRelay.first() : null,
+ votingLogList.report());
if (character == NodeCharacter.LEADER && config.isUseIndirectBroadcasting()) {
dataMemberReport.setDirectToIndirectFollowerMap(
((IndirectLogDispatcher) getLogDispatcher()).getDirectToIndirectFollowerMap());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4851dc3b35..d613d082d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
import org.apache.iotdb.cluster.expr.craft.FragmentedLogDispatcher;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
import org.apache.iotdb.cluster.log.CommitLogCallback;
@@ -327,6 +328,7 @@ public abstract class RaftMember implements RaftMemberMBean {
startBackGroundThreads();
setSkipElection(false);
+ FlowMonitorManager.INSTANCE.register(thisNode);
logger.info("{} started", name);
}
@@ -1248,6 +1250,8 @@ public abstract class RaftMember implements RaftMemberMBean {
// assign term and index to the new log and append it
SendLogRequest sendLogRequest = logSequencer.sequence(log);
+ FlowMonitorManager.INSTANCE.report(thisNode, log.estimateSize());
+
if (sendLogRequest == null) {
return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index 2734b19b7b..bacd4cf0ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -206,6 +206,7 @@ public class NodeReport {
RaftNode header;
long headerLatency;
private Map<Node, List<Node>> directToIndirectFollowerMap;
+ private String votingListReport;
public DataMemberReport(
NodeCharacter character,
@@ -221,7 +222,8 @@ public class NodeReport {
long lastHeartbeatReceivedTime,
long prevLastLogIndex,
long maxAppliedLogIndex,
- RelayEntry nextToRelay) {
+ RelayEntry nextToRelay,
+ String votingListReport) {
super(
character,
leader,
@@ -237,6 +239,7 @@ public class NodeReport {
nextToRelay);
this.header = header;
this.headerLatency = headerLatency;
+ this.votingListReport = votingListReport;
}
@Override
@@ -274,7 +277,9 @@ public class NodeReport {
+ (System.currentTimeMillis() - lastHeartbeatReceivedTime)
+ "ms ago"
+ ", logIncrement="
- + (lastLogIndex - prevLastLogIndex);
+ + (lastLogIndex - prevLastLogIndex)
+ + ", "
+ + votingListReport;
if (directToIndirectFollowerMap != null) {
s = s + ", relayMap=" + directToIndirectFollowerMap;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 448524441c..2d2b860e77 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.server.monitor;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.utils.WindowStatistic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -375,8 +374,10 @@ public class Timer {
String blockName;
AtomicLong sum = new AtomicLong(0);
AtomicLong counter = new AtomicLong(0);
- private WindowStatistic latestWindow = new WindowStatistic();
+ AtomicLong intervalSum = new AtomicLong(0);
+ AtomicLong intervalCounter = new AtomicLong(0);
long max;
+ long intervalMax;
double scale;
boolean valid;
int level;
@@ -401,8 +402,10 @@ public class Timer {
if (ENABLE_INSTRUMENTING) {
sum.addAndGet(val);
counter.incrementAndGet();
+ intervalSum.addAndGet(val);
+ intervalCounter.incrementAndGet();
max = Math.max(max, val);
- latestWindow.add(val);
+ intervalMax = Math.max(intervalMax, val);
}
}
@@ -432,7 +435,9 @@ public class Timer {
sum.set(0);
counter.set(0);
max = 0;
- latestWindow.reset();
+ intervalCounter.set(0);
+ intervalSum.set(0);
+ intervalMax = 0;
}
/** WARN: no current safety guarantee. */
@@ -446,10 +451,16 @@ public class Timer {
public String toString() {
double s = sum.get() / scale;
long cnt = counter.get();
+ double intervalS = intervalSum.get() / scale;
+ long intervalCnt = intervalCounter.get();
double avg = s / cnt;
+ double intervalAvg = intervalS / intervalCnt;
+ intervalSum.set(0);
+ intervalCounter.set(0);
+ intervalMax = 0;
return String.format(
- "%s - %s: %.2f, %d, %.2f, %d, %.2f",
- className, blockName, s, cnt, avg, max, latestWindow.getAvg());
+ "%s - %s: %.4f(%.4f), %d(%d), %.4f(%.4f), %d(%d)",
+ className, blockName, s, intervalS, cnt, intervalCnt, avg, intervalAvg, max, intervalMax);
}
public long getCnt() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index daa2a7d7d0..f2729471d5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -35,4 +35,8 @@ public interface IConsensusRequest {
* changed or an error may occur
*/
ByteBuffer serializeToByteBuffer();
+
+ default long estimateSize() {
+ return 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
index 8a3ec92681..473bf9584e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
@@ -117,4 +117,9 @@ public class DummyPlan extends PhysicalPlan {
public String toString() {
return "ExprPlan";
}
+
+ @Override
+ public long estimateSize() {
+ return workload.length;
+ }
}