You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/09 02:39:56 UTC

[iotdb] 01/01: add flow monitor

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_flow
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit da6ea12fea7f1126bf42157a35915c2af656a1c1
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Feb 9 10:41:11 2023 +0800

    add flow monitor
---
 cluster/collect-dc.sh                              |  12 ++
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |   3 +
 .../apache/iotdb/cluster/config/ClusterConfig.java |  11 ++
 .../iotdb/cluster/config/ClusterConstant.java      |   2 +-
 .../cluster/expr/flowcontrol/FlowBalancer.java     |  19 +--
 .../cluster/expr/flowcontrol/FlowMonitor.java      | 130 +++++++++++++++++++++
 .../expr/flowcontrol/FlowMonitorManager.java       |  75 ++++++++++++
 .../java/org/apache/iotdb/cluster/log/Log.java     |   4 +
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  29 ++++-
 .../apache/iotdb/cluster/log/VotingLogList.java    |   8 +-
 .../iotdb/cluster/log/logtypes/RequestLog.java     |   5 +
 .../handlers/caller/AppendNodeEntryHandler.java    |   1 +
 .../cluster/server/member/DataGroupMember.java     |   3 +-
 .../iotdb/cluster/server/member/RaftMember.java    |   4 +
 .../iotdb/cluster/server/monitor/NodeReport.java   |   9 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java |  23 +++-
 .../common/request/IConsensusRequest.java          |   4 +
 .../apache/iotdb/db/qp/physical/sys/DummyPlan.java |   5 +
 18 files changed, 319 insertions(+), 28 deletions(-)

diff --git a/cluster/collect-dc.sh b/cluster/collect-dc.sh
new file mode 100644
index 0000000000..58aa0b67dc
--- /dev/null
+++ b/cluster/collect-dc.sh
@@ -0,0 +1,12 @@
+src_path=/home/jt/iotdb_expr_vg/data/system/*.flow
+
+ips=(dc16)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
+target_path=/d/CodeRepo/iotdb/cluster/target/flow
+
+rm $target_path/*
+for ip in ${ips[*]}
+  do
+    mkdir $target_path
+    scp -r jt@$ip:$src_path $target_path
+  done
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index dfa2427c7e..d4bbccbef7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
 import org.apache.iotdb.cluster.impl.PlanBasedStateMachine;
 import org.apache.iotdb.cluster.metadata.CSchemaProcessor;
@@ -313,6 +314,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       logger.info("Index diff: {}", Statistic.RAFT_RECEIVER_INDEX_DIFF);
       logger.info("Follower time: {}", Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL);
       logger.info("Window length: {}", Statistic.RAFT_WINDOW_LENGTH);
+
+      FlowMonitorManager.INSTANCE.close();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 3232ecd5b4..8c897e5552 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -223,6 +223,9 @@ public class ClusterConfig {
 
   private boolean enableInstrumenting = true;
 
+  private int flowMonitorMaxWindowSize = 1000;
+  private long flowMonitorWindowInterval = 1000;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
    * there is something error for getting the ip of the hostname, then set the internalIp as
@@ -702,4 +705,12 @@ public class ClusterConfig {
   public void setEnableInstrumenting(boolean enableInstrumenting) {
     this.enableInstrumenting = enableInstrumenting;
   }
+
+  public int getFlowMonitorMaxWindowSize() {
+    return flowMonitorMaxWindowSize;
+  }
+
+  public long getFlowMonitorWindowInterval() {
+    return flowMonitorWindowInterval;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 27684e4d3d..2cd81a1e74 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -53,7 +53,7 @@ public class ClusterConstant {
    * every "REPORT_INTERVAL_SEC" seconds, a reporter thread will print the status of all raft
    * members in this node.
    */
-  public static final int REPORT_INTERVAL_SEC = 2;
+  public static final int REPORT_INTERVAL_SEC = 1;
 
   /**
    * during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
similarity index 56%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
index daa2a7d7d0..8159f807a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
@@ -17,22 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.common.request;
 
-import java.nio.ByteBuffer;
+package org.apache.iotdb.cluster.expr.flowcontrol;
+
+public class FlowBalancer {
 
-public interface IConsensusRequest {
-  /**
-   * Serialize all the data to a ByteBuffer.
-   *
-   * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to reduce the number of
-   * memory copies.
-   *
-   * <p>To improve efficiency, a specific implementation could return a DirectByteBuffer to reduce
-   * the memory copy required to send an RPC
-   *
-   * <p>Note: The implementation needs to ensure that the data in the returned Bytebuffer cannot be
-   * changed or an error may occur
-   */
-  ByteBuffer serializeToByteBuffer();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
new file mode 100644
index 0000000000..867eaddd07
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr.flowcontrol;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.Date;
+import java.util.Deque;
+
+public class FlowMonitor {
+
+  private static final Logger logger = LoggerFactory.getLogger(FlowMonitor.class);
+  private static final String FILE_SUFFIX = ".flow";
+  private Deque<Pair<Long, Long>> windows;
+  private long currWindowStart;
+  private long currWindowSum;
+  private long windowInterval;
+  private Node node;
+  private int maxWindowSize;
+  private BufferedWriter writer;
+  private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  public FlowMonitor(int maxWindowSize, long windowInterval, Node node)
+      throws IOException {
+    this.maxWindowSize = maxWindowSize;
+    this.windows = new ArrayDeque<>(maxWindowSize);
+    this.windowInterval = windowInterval;
+    this.node = node;
+    initSerializer();
+  }
+
+  private void initSerializer() throws IOException {
+    String path =
+        IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+            + File.separator
+            + node.getInternalIp() + "-" + node.nodeIdentifier
+            + FILE_SUFFIX;
+    File file = new File(path);
+    file.delete();
+    writer = new BufferedWriter(new FileWriter(file));
+    writer.write("Time,FlowSum");
+    writer.newLine();
+  }
+
+  public void close() {
+    saveWindow();
+    while (windows.size() > 0) {
+      serializeWindow();
+    }
+    try {
+      writer.close();
+      logger.info("Flow monitor {} is closed", node);
+    } catch (IOException e) {
+      logger.warn("Cannot close serializer of {}", node, e);
+    }
+  }
+
+  private void resetWindow(long newWindowStart) {
+    currWindowStart = newWindowStart;
+    currWindowSum = 0;
+  }
+
+  private void serializeWindow() {
+    Pair<Long, Long> window = windows.removeFirst();
+    try {
+      String windowString =
+          String.format("%s,%d", dateFormat.format(new Date(window.left)), window.right);
+      logger.debug("New window {} serialized by {}", windowString, node);
+      writer.write(windowString);
+      writer.newLine();
+    } catch (IOException e) {
+      logger.warn("Cannot serialize window {} of {}", window, node, e);
+    }
+  }
+
+  private void checkSize() {
+    if (windows.size() == maxWindowSize) {
+      serializeWindow();
+    }
+  }
+
+  private void saveWindow() {
+    if (currWindowSum != 0) {
+      checkSize();
+      windows.add(new Pair<>(currWindowStart, currWindowSum));
+      logger.debug("New window {},{} generated by {}", currWindowStart, currWindowSum, node);
+    }
+  }
+
+  public synchronized void report(long val) {
+    long currTime = System.currentTimeMillis();
+    long targetWindowStart = currTime - currTime % windowInterval;
+    if (targetWindowStart != currWindowStart) {
+      // save the current window and start a new window
+      saveWindow();
+      resetWindow(targetWindowStart);
+    }
+    // update the current window
+    currWindowSum += val;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
new file mode 100644
index 0000000000..0efbb1ffd5
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowMonitorManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr.flowcontrol;
+
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FlowMonitorManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(FlowMonitorManager.class);
+  public static final FlowMonitorManager INSTANCE = new FlowMonitorManager();
+
+  private Map<Node, FlowMonitor> monitorMap = new ConcurrentHashMap<>();
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+
+  private FlowMonitorManager() {}
+
+  public void close() {
+    for (FlowMonitor flowMonitor : monitorMap.values()) {
+      flowMonitor.close();
+    }
+    monitorMap.clear();
+  }
+
+  public void register(Node node) {
+    logger.info("Registering flow monitor {}", node);
+    monitorMap.computeIfAbsent(
+        node,
+        n -> {
+          try {
+            return new FlowMonitor(
+                clusterConfig.getFlowMonitorMaxWindowSize(),
+                clusterConfig.getFlowMonitorWindowInterval(),
+                n);
+          } catch (IOException e) {
+            logger.warn("Cannot register flow monitor for {}", node, e);
+            return null;
+          }
+        });
+  }
+
+  public void report(Node node, long val) {
+    FlowMonitor flowMonitor = monitorMap.get(node);
+    if (flowMonitor != null) {
+      flowMonitor.report(val);
+    } else {
+      logger.warn("Flow monitor {} is not registered", node);
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index c2b8b70259..1aa28833f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -170,4 +170,8 @@ public abstract class Log implements Comparable<Log> {
   public void setSequenceStartTime(long sequenceStartTime) {
     this.sequenceStartTime = sequenceStartTime;
   }
+
+  public long estimateSize() {
+    return 0;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 89ce46e41f..72e22181f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log;
 
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
@@ -42,6 +43,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -80,6 +82,8 @@ public class LogDispatcher {
   protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
   protected List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
   protected Map<Node, Boolean> nodesEnabled;
+  protected Map<Node, RateLimiter> nodesRateLimiter = new HashMap<>();
+  protected Map<Node, Double> nodesRate = new HashMap<>();
   protected Map<Node, ExecutorService> executorServices = new HashMap<>();
   protected ExecutorService resultHandlerThread =
       IoTDBThreadPoolFactory.newFixedThreadPool(2, "AppendResultHandler");
@@ -95,7 +99,16 @@ public class LogDispatcher {
     createQueueAndBindingThreads();
   }
 
+  protected void updateRateLimiter() {
+    logger.info("Node rates: {}", nodesRate);
+    for (Entry<Node, Double> nodeDoubleEntry : nodesRate.entrySet()) {
+      nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
+    }
+  }
+
   void createQueueAndBindingThreads() {
+    double baseRate = 300_000_000.0;
+    int i = 1;
     for (Node node : member.getAllNodes()) {
       if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
         BlockingQueue<SendLogRequest> logBlockingQueue;
@@ -103,10 +116,15 @@ public class LogDispatcher {
             new ArrayBlockingQueue<>(
                 ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
         nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+        FlowMonitorManager.INSTANCE.register(node);
+        nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
+        nodesRate.put(node, baseRate * i);
+        i += 100;
       }
     }
+    updateRateLimiter();
 
-    for (int i = 0; i < bindingThreadNum; i++) {
+    for (i = 0; i < bindingThreadNum; i++) {
       for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
         executorServices
             .computeIfAbsent(
@@ -467,6 +485,11 @@ public class LogDispatcher {
           int concurrentSender = concurrentSenderNum.incrementAndGet();
           Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
           result = client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier);
+          FlowMonitorManager.INSTANCE.report(
+              receiver,
+              logRequest.appendEntryRequest.entry.remaining());
+          nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
+
           Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
               logRequest.getVotingLog().getLog().getCreateTime());
           concurrentSenderNum.decrementAndGet();
@@ -505,6 +528,10 @@ public class LogDispatcher {
       if (client != null) {
         try {
           client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier, handler);
+          FlowMonitorManager.INSTANCE.report(
+              receiver,
+              logRequest.appendEntryRequest.entry.remaining());
+          nodesRateLimiter.get(receiver).acquire(logRequest.appendEntryRequest.entry.remaining());
         } catch (TException e) {
           handler.onError(e);
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 96bc3907e5..4d5bf6052c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -90,7 +90,7 @@ public class VotingLogList {
       return false;
     }
     nodeIndices.sort(Entry.comparingByValue());
-    Long value = nodeIndices.get(quorumSize - 1).getValue();
+    Long value = nodeIndices.get(nodeIndices.size() - quorumSize).getValue();
     long oldValue = newCommitIndex.getAndUpdate(oldV -> Math.max(value, oldV));
     return value > oldValue;
   }
@@ -137,4 +137,10 @@ public class VotingLogList {
     }
     return num;
   }
+
+  public String report() {
+    return String.format(
+        "Nodes accepted indices: %s, new commitIndex: %d",
+        stronglyAcceptedIndices, newCommitIndex.get());
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
index 8f66c52763..5fa3b1aa05 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
@@ -136,4 +136,9 @@ public class RequestLog extends Log {
   public int hashCode() {
     return Objects.hash(super.hashCode(), request);
   }
+
+  @Override
+  public long estimateSize() {
+    return request.estimateSize();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 3dc18e32a6..33f4063492 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -71,6 +71,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
 
   @Override
   public void onComplete(AppendEntryResult response) {
+
     if (Timer.ENABLE_INSTRUMENTING) {
       Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 3795718199..7480ebc295 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -1022,7 +1022,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
             lastHeartbeatReceivedTime,
             prevLastLogIndex,
             logManager.getMaxHaveAppliedCommitIndex(),
-            logRelay != null ? logRelay.first() : null);
+            logRelay != null ? logRelay.first() : null,
+            votingLogList.report());
     if (character == NodeCharacter.LEADER && config.isUseIndirectBroadcasting()) {
       dataMemberReport.setDirectToIndirectFollowerMap(
           ((IndirectLogDispatcher) getLogDispatcher()).getDirectToIndirectFollowerMap());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4851dc3b35..d613d082d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
 import org.apache.iotdb.cluster.expr.craft.FragmentedLogDispatcher;
+import org.apache.iotdb.cluster.expr.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
 import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
 import org.apache.iotdb.cluster.log.CommitLogCallback;
@@ -327,6 +328,7 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     startBackGroundThreads();
     setSkipElection(false);
+    FlowMonitorManager.INSTANCE.register(thisNode);
     logger.info("{} started", name);
   }
 
@@ -1248,6 +1250,8 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     // assign term and index to the new log and append it
     SendLogRequest sendLogRequest = logSequencer.sequence(log);
+    FlowMonitorManager.INSTANCE.report(thisNode, log.estimateSize());
+
     if (sendLogRequest == null) {
       return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index 2734b19b7b..bacd4cf0ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -206,6 +206,7 @@ public class NodeReport {
     RaftNode header;
     long headerLatency;
     private Map<Node, List<Node>> directToIndirectFollowerMap;
+    private String votingListReport;
 
     public DataMemberReport(
         NodeCharacter character,
@@ -221,7 +222,8 @@ public class NodeReport {
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
         long maxAppliedLogIndex,
-        RelayEntry nextToRelay) {
+        RelayEntry nextToRelay,
+        String votingListReport) {
       super(
           character,
           leader,
@@ -237,6 +239,7 @@ public class NodeReport {
           nextToRelay);
       this.header = header;
       this.headerLatency = headerLatency;
+      this.votingListReport = votingListReport;
     }
 
     @Override
@@ -274,7 +277,9 @@ public class NodeReport {
               + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
               + "ms ago"
               + ", logIncrement="
-              + (lastLogIndex - prevLastLogIndex);
+              + (lastLogIndex - prevLastLogIndex)
+              + ", "
+              + votingListReport;
       if (directToIndirectFollowerMap != null) {
         s = s + ", relayMap=" + directToIndirectFollowerMap;
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 448524441c..2d2b860e77 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.server.monitor;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.utils.WindowStatistic;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -375,8 +374,10 @@ public class Timer {
     String blockName;
     AtomicLong sum = new AtomicLong(0);
     AtomicLong counter = new AtomicLong(0);
-    private WindowStatistic latestWindow = new WindowStatistic();
+    AtomicLong intervalSum = new AtomicLong(0);
+    AtomicLong intervalCounter = new AtomicLong(0);
     long max;
+    long intervalMax;
     double scale;
     boolean valid;
     int level;
@@ -401,8 +402,10 @@ public class Timer {
       if (ENABLE_INSTRUMENTING) {
         sum.addAndGet(val);
         counter.incrementAndGet();
+        intervalSum.addAndGet(val);
+        intervalCounter.incrementAndGet();
         max = Math.max(max, val);
-        latestWindow.add(val);
+        intervalMax = Math.max(intervalMax, val);
       }
     }
 
@@ -432,7 +435,9 @@ public class Timer {
       sum.set(0);
       counter.set(0);
       max = 0;
-      latestWindow.reset();
+      intervalCounter.set(0);
+      intervalSum.set(0);
+      intervalMax = 0;
     }
 
     /** WARN: no current safety guarantee. */
@@ -446,10 +451,16 @@ public class Timer {
     public String toString() {
       double s = sum.get() / scale;
       long cnt = counter.get();
+      double intervalS = intervalSum.get() / scale;
+      long intervalCnt = intervalCounter.get();
       double avg = s / cnt;
+      double intervalAvg = intervalS / intervalCnt;
+      intervalSum.set(0);
+      intervalCounter.set(0);
+      intervalMax = 0;
       return String.format(
-          "%s - %s: %.2f, %d, %.2f, %d, %.2f",
-          className, blockName, s, cnt, avg, max, latestWindow.getAvg());
+          "%s - %s: %.4f(%.4f), %d(%d), %.4f(%.4f), %d(%d)",
+          className, blockName, s, intervalS, cnt, intervalCnt, avg, intervalAvg, max, intervalMax);
     }
 
     public long getCnt() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index daa2a7d7d0..f2729471d5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -35,4 +35,8 @@ public interface IConsensusRequest {
    * changed or an error may occur
    */
   ByteBuffer serializeToByteBuffer();
+
+  default long estimateSize() {
+    return 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
index 8a3ec92681..473bf9584e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
@@ -117,4 +117,9 @@ public class DummyPlan extends PhysicalPlan {
   public String toString() {
     return "ExprPlan";
   }
+
+  @Override
+  public long estimateSize() {
+    return workload.length;
+  }
 }