You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/04/07 01:16:58 UTC

[iotdb] 03/03: add relay_first_level_size config

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1da27c431b059cc1c6184075436a13334e381582
Author: jt <jt...@163.com>
AuthorDate: Thu Apr 7 09:15:49 2022 +0800

    add relay_first_level_size config
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  14 +--
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 ++
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 +
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 109 ++++++++++++---------
 .../org/apache/iotdb/cluster/log/LogAckSender.java |   8 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |   6 +-
 .../org/apache/iotdb/cluster/log/LogRelay.java     |  81 ++++++++++++++-
 .../cluster/server/heartbeat/HeartbeatThread.java  |   2 +-
 .../cluster/server/member/DataGroupMember.java     |  37 ++++---
 .../iotdb/cluster/server/member/RaftMember.java    |  90 +++--------------
 .../iotdb/cluster/server/monitor/NodeReport.java   |  79 ++++++++-------
 .../iotdb/cluster/server/monitor/NodeStatus.java   |  20 ++++
 .../cluster/server/monitor/NodeStatusManager.java  |   9 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java |  28 +++++-
 .../cluster/server/service/MetaSyncService.java    |   4 +-
 .../iotdb/cluster/utils/WindowStatistic.java       |  55 +++++++++++
 16 files changed, 364 insertions(+), 193 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index a626b88425..f399c78b2b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,9 +45,8 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.NodeReport;
-import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
 import org.apache.iotdb.cluster.server.raft.DataRaftService;
 import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -82,7 +81,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -216,6 +214,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
         report.setMetaMemberReport(metaGroupMember.genMemberReport());
         report.setDataMemberReportList(dataGroupEngine.genMemberReports());
         logger.info(report.toString());
+        NodeStatusManager.getINSTANCE().report();
       } catch (Exception e) {
         logger.error("exception occurred when generating node report", e);
       }
@@ -296,13 +295,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
 
     @Override
     public void run() {
-      logger.info(
-          "Total request fanout: {}",
-          Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
-      for (Entry<Node, NodeStatus> nodeNodeStatusEntry :
-          NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet()) {
-        logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
-      }
+      logger.info(Timer.getReport());
+      NodeStatusManager.getINSTANCE().report();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 187509b2ef..0f265dee76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,6 +192,8 @@ public class ClusterConfig {
 
   private int relaySenderNum = 8;
 
+  private int relayFirstLevelSize = 1;
+
   private boolean optimizeIndirectBroadcasting = false;
 
   /**
@@ -600,4 +602,12 @@ public class ClusterConfig {
   public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) {
     this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
   }
+
+  public int getRelayFirstLevelSize() {
+    return relayFirstLevelSize;
+  }
+
+  public void setRelayFirstLevelSize(int relayFirstLevelSize) {
+    this.relayFirstLevelSize = relayFirstLevelSize;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a089e2d75a..8e9f8934bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -356,6 +356,11 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "relay_sender_number", String.valueOf(config.getRelaySenderNum()))));
 
+    config.setRelayFirstLevelSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "relay_first_level_size", String.valueOf(config.getRelayFirstLevelSize()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index d32fb405e3..ecbd1dbabe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
@@ -38,6 +39,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_RELAYED_LEVEL1_NUM;
 
 /**
  * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
@@ -47,7 +51,9 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(IndirectLogDispatcher.class);
 
-  private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
+  private Map<Node, List<Node>> directToIndirectFollowerMap = new ConcurrentHashMap<>();
+  private long dispatchedEntryNum;
+  private int recalculateMapInterval = 1;
 
   public IndirectLogDispatcher(RaftMember member) {
     super(member);
@@ -74,7 +80,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
   @Override
   public void offer(SendLogRequest request) {
     super.offer(request);
-    recalculateDirectFollowerMap();
+    dispatchedEntryNum++;
+    if (dispatchedEntryNum % recalculateMapInterval == 0) {
+      recalculateDirectFollowerMap();
+    }
   }
 
   @Override
@@ -87,6 +96,15 @@ public class IndirectLogDispatcher extends LogDispatcher {
     return newRequest;
   }
 
+  private double getNodeWeight(Node node, double maxLatency) {
+    NodeStatus status = NodeStatusManager.getINSTANCE().getNodeStatus(node, false);
+    //    return 1.0
+    //        / (status.getStatus().fanoutRequestNum + 1);
+    double pow = Math.pow(100.0, maxLatency / status.getSendEntryLatencyStatistic().getAvg());
+    status.setRelayWeight(pow);
+    return pow;
+  }
+
   public void recalculateDirectFollowerMap() {
     List<Node> allNodes = new ArrayList<>(member.getAllNodes());
     allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
@@ -96,28 +114,31 @@ public class IndirectLogDispatcher extends LogDispatcher {
     nodesEnabled.clear();
     directToIndirectFollowerMap.clear();
 
+    int firstLevelSize = ClusterDescriptor.getInstance().getConfig().getRelayFirstLevelSize();
+    List<Node> firstLevelNodes;
+
     if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
       QueryCoordinator instance = QueryCoordinator.getINSTANCE();
       orderedNodes = instance.reorderNodes(allNodes);
-      long thisLoad =
-          Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + 1;
-      long minLoad =
+      long thisLoad = Statistic.getTotalFanout() + 1;
+      double maxLatency =
           NodeStatusManager.getINSTANCE()
-                  .getNodeStatus(orderedNodes.get(0), false)
-                  .getStatus()
-                  .fanoutRequestNum
-              + 1;
-      double loadFactor = 1.05;
+              .getNodeStatus(orderedNodes.get(0), false)
+              .getSendEntryLatencyStatistic()
+              .getAvg();
+      for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
+        maxLatency =
+            Double.max(
+                maxLatency,
+                NodeStatusManager.getINSTANCE()
+                    .getNodeStatus(orderedNodes.get(i), false)
+                    .getSendEntryLatencyStatistic()
+                    .getAvg());
+      }
+
       WeightedList<Node> firstLevelCandidates = new WeightedList<>();
       firstLevelCandidates.insert(
-          orderedNodes.get(0),
-          1.0
-              / (NodeStatusManager.getINSTANCE()
-                      .getNodeStatus(orderedNodes.get(0), false)
-                      .getStatus()
-                      .fanoutRequestNum
-                  + 1));
-      int firstLevelSize = 1;
+          orderedNodes.get(0), getNodeWeight(orderedNodes.get(0), maxLatency));
 
       for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
         Node orderedNode = orderedNodes.get(i);
@@ -127,9 +148,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
                     .getStatus()
                     .fanoutRequestNum
                 + 1;
-        if (nodeLoad * 1.0 <= minLoad * loadFactor) {
-          firstLevelCandidates.insert(orderedNode, 1.0 / nodeLoad);
-        }
+        firstLevelCandidates.insert(orderedNode, getNodeWeight(orderedNode, maxLatency));
         if (nodeLoad > thisLoad) {
           firstLevelSize = (int) Math.max(firstLevelSize, nodeLoad / thisLoad);
         }
@@ -139,36 +158,34 @@ public class IndirectLogDispatcher extends LogDispatcher {
         firstLevelSize = firstLevelCandidates.size();
       }
 
-      List<Node> firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
-
-      Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
-      orderedNodes.removeAll(firstLevelNodes);
-      for (int i = 0; i < orderedNodes.size(); i++) {
-        Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
-        secondLevelNodeMap
-            .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
-            .add(orderedNodes.get(i));
+      firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
+    } else {
+      firstLevelNodes = new ArrayList<>(orderedNodes.subList(0, firstLevelSize));
+      if (firstLevelSize > orderedNodes.size()) {
+        firstLevelSize = orderedNodes.size();
       }
+    }
 
-      for (Node firstLevelNode : firstLevelNodes) {
-        directToIndirectFollowerMap.put(
-            firstLevelNode,
-            secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
-        nodesEnabled.put(firstLevelNode, true);
-      }
+    Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
+    orderedNodes.removeAll(firstLevelNodes);
+    for (int i = 0; i < orderedNodes.size(); i++) {
+      Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
+      secondLevelNodeMap
+          .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
+          .add(orderedNodes.get(i));
+    }
 
-    } else {
-      for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
-        if (i != j) {
-          directToIndirectFollowerMap.put(
-              orderedNodes.get(i), Collections.singletonList(orderedNodes.get(j)));
-        } else {
-          directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
-        }
-        nodesEnabled.put(orderedNodes.get(i), true);
-      }
+    for (Node firstLevelNode : firstLevelNodes) {
+      directToIndirectFollowerMap.put(
+          firstLevelNode, secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
+      nodesEnabled.put(firstLevelNode, true);
     }
 
+    RAFT_RELAYED_LEVEL1_NUM.add(directToIndirectFollowerMap.size());
     logger.debug("New relay map: {}", directToIndirectFollowerMap);
   }
+
+  public Map<Node, List<Node>> getDirectToIndirectFollowerMap() {
+    return Collections.unmodifiableMap(directToIndirectFollowerMap);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
index 3441a01432..3a15dca6d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
@@ -57,8 +57,10 @@ public class LogAckSender {
   public LogAckSender(RaftMember member) {
     this.member = member;
     this.header = member.getHeader();
-    ackSenderPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(member.getName() + "-ACKSender");
-    ackSenderPool.submit(this::appendAckLeaderTask);
+    ackSenderPool = IoTDBThreadPoolFactory.newFixedThreadPool(4, member.getName() + "-ACKSender");
+    for (int i = 0; i < 4; i++) {
+      ackSenderPool.submit(this::appendAckLeaderTask);
+    }
   }
 
   public static class AckRequest {
@@ -93,7 +95,7 @@ public class LogAckSender {
         }
 
         appendAckLeader(ackRequestList);
-        Thread.sleep(10);
+        // Thread.sleep(10);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 3e659a1468..f1c50952bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -193,7 +193,6 @@ public class LogDispatcher {
 
   public static class SendLogRequest {
 
-    private AppendNodeEntryHandler handler;
     private VotingLog votingLog;
     private AtomicBoolean leaderShipStale;
     private AtomicLong newLeaderTerm;
@@ -469,6 +468,10 @@ public class LogDispatcher {
               logRequest.newLeaderTerm,
               logRequest.quorumSize);
       // TODO add async interface
+      if (syncClient == null) {
+        syncClient = member.getSyncClient(receiver);
+      }
+
       int retries = 5;
       try {
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -486,6 +489,7 @@ public class LogDispatcher {
             NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(receiver, false);
             nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
             nodeStatus.getSendEntryNum().incrementAndGet();
+            nodeStatus.getSendEntryLatencyStatistic().add(sendLogTime);
 
             long handleStart = Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.getOperationStartTime();
             handler.onComplete(result);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 842f2eb3ca..350e3fed0b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -23,10 +23,16 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +42,8 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 
+import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
+
 /** LogRelay is used by followers to forward entries from the leader to other followers. */
 public class LogRelay {
 
@@ -51,8 +59,8 @@ public class LogRelay {
     this.raftMember = raftMember;
     relaySenders =
         IoTDBThreadPoolFactory.newFixedThreadPool(
-            RELAY_NUMBER, raftMember.getName() + "-RelaySender");
-    for (int i = 0; i < RELAY_NUMBER; i++) {
+            RELAY_NUMBER + 4, raftMember.getName() + "-RelaySender");
+    for (int i = 0; i < 4; i++) {
       relaySenders.submit(new RelayThread());
     }
   }
@@ -116,7 +124,7 @@ public class LogRelay {
                       + (relayEntry.singleRequest.prevLogIndex + 1)
                       + "-"
                       + relayEntry.receivers);
-          raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+          sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
         } else if (relayEntry.batchRequest != null) {
           Thread.currentThread()
               .setName(
@@ -125,7 +133,7 @@ public class LogRelay {
                       + (relayEntry.batchRequest.prevLogIndex + 1)
                       + "-"
                       + relayEntry.receivers);
-          raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
+          sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
         }
 
         Statistic.RAFT_RELAYED_ENTRY.add(1);
@@ -133,6 +141,71 @@ public class LogRelay {
     }
   }
 
+  public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
+    request.setIsFromLeader(false);
+    request.setSubReceiversIsSet(false);
+    for (Node subFollower : subFollowers) {
+      relaySenders.submit(
+          () -> {
+            Client syncClient = null;
+            try {
+              if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+                raftMember
+                    .getAsyncClient(subFollower)
+                    .appendEntry(request, new IndirectAppendHandler(subFollower, request));
+              } else {
+                long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_LOG.getOperationStartTime();
+                syncClient = raftMember.getSyncClient(subFollower);
+
+                int concurrentSender = concurrentSenderNum.incrementAndGet();
+                Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
+                syncClient.appendEntry(request);
+                concurrentSenderNum.decrementAndGet();
+
+                long sendLogTime =
+                    Statistic.RAFT_RECEIVER_RELAY_LOG.calOperationCostTimeFromStart(
+                        operationStartTime);
+                NodeStatus nodeStatus =
+                    NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
+                nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+                nodeStatus.getSendEntryNum().incrementAndGet();
+                nodeStatus.getSendEntryLatencyStatistic().add(sendLogTime);
+              }
+            } catch (TException e) {
+              logger.error("Cannot send {} to {}", request, subFollower, e);
+            } finally {
+              if (syncClient != null) {
+                ClientUtils.putBackSyncClient(syncClient);
+              }
+            }
+          });
+    }
+  }
+
+  public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
+    request.setIsFromLeader(false);
+    request.setSubReceiversIsSet(false);
+    for (Node subFollower : subFollowers) {
+      Client syncClient = null;
+      try {
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          raftMember
+              .getAsyncClient(subFollower)
+              .appendEntries(request, new IndirectAppendHandler(subFollower, request));
+        } else {
+          syncClient = raftMember.getSyncClient(subFollower);
+          syncClient.appendEntries(request);
+        }
+      } catch (TException e) {
+        logger.error("Cannot send {} to {}", request, subFollower, e);
+      } finally {
+        if (syncClient != null) {
+          ClientUtils.putBackSyncClient(syncClient);
+        }
+      }
+    }
+  }
+
   public static class RelayEntry implements Comparable<RelayEntry> {
 
     private AppendEntryRequest singleRequest;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index acc34cf6c0..266f767b96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -268,7 +268,7 @@ public class HeartbeatThread implements Runnable {
 
     if (!ClusterUtils.isNodeEquals(
         localMember.getThisNode(), localMember.getPartitionGroup().getHeader().node)) {
-      long electionWait = getElectionRandomWaitMs();
+      long electionWait = getElectionRandomWaitMs() + 5000;
       logger.info(
           "{}: Sleep {}ms before the first election as this node is not the preferred " + "leader",
           memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 3114182960..11f0cf0a4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.LogParser;
@@ -993,21 +994,27 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
   public DataMemberReport genReport() {
     long prevLastLogIndex = lastReportedLogIndex;
     lastReportedLogIndex = logManager.getLastLogIndex();
-    return new DataMemberReport(
-        character,
-        leader.get(),
-        term.get(),
-        logManager.getLastLogTerm(),
-        lastReportedLogIndex,
-        logManager.getCommitLogIndex(),
-        logManager.getCommitLogTerm(),
-        getHeader(),
-        readOnly,
-        NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
-        lastHeartbeatReceivedTime,
-        prevLastLogIndex,
-        logManager.getMaxHaveAppliedCommitIndex(),
-        logRelay != null ? logRelay.first() : null);
+    DataMemberReport dataMemberReport =
+        new DataMemberReport(
+            character,
+            leader.get(),
+            term.get(),
+            logManager.getLastLogTerm(),
+            lastReportedLogIndex,
+            logManager.getCommitLogIndex(),
+            logManager.getCommitLogTerm(),
+            getHeader(),
+            readOnly,
+            NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
+            lastHeartbeatReceivedTime,
+            prevLastLogIndex,
+            logManager.getMaxHaveAppliedCommitIndex(),
+            logRelay != null ? logRelay.first() : null);
+    if (character == NodeCharacter.LEADER && config.isUseIndirectBroadcasting()) {
+      dataMemberReport.setDirectToIndirectFollowerMap(
+          ((IndirectLogDispatcher) getLogDispatcher()).getDirectToIndirectFollowerMap());
+    }
+    return dataMemberReport;
   }
 
   @TestOnly
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0d605d3f46..f554a69048 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -72,8 +72,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
-import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -133,7 +131,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
-import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
 
 /**
  * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
@@ -631,63 +628,6 @@ public abstract class RaftMember implements RaftMemberMBean {
     return result;
   }
 
-  public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
-    request.setIsFromLeader(false);
-    request.setSubReceiversIsSet(false);
-    for (Node subFollower : subFollowers) {
-      Client syncClient = null;
-      try {
-        if (config.isUseAsyncServer()) {
-          getAsyncClient(subFollower)
-              .appendEntry(request, new IndirectAppendHandler(subFollower, request));
-        } else {
-          long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_LOG.getOperationStartTime();
-          syncClient = getSyncClient(subFollower);
-
-          int concurrentSender = concurrentSenderNum.incrementAndGet();
-          Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
-          syncClient.appendEntry(request);
-          concurrentSenderNum.decrementAndGet();
-
-          long sendLogTime =
-              Statistic.RAFT_RECEIVER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
-          NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
-          nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
-          nodeStatus.getSendEntryNum().incrementAndGet();
-        }
-      } catch (TException e) {
-        logger.error("Cannot send {} to {}", request, subFollower, e);
-      } finally {
-        if (syncClient != null) {
-          ClientUtils.putBackSyncClient(syncClient);
-        }
-      }
-    }
-  }
-
-  public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
-    request.setIsFromLeader(false);
-    request.setSubReceiversIsSet(false);
-    for (Node subFollower : subFollowers) {
-      Client syncClient = null;
-      try {
-        if (config.isUseAsyncServer()) {
-          getAsyncClient(subFollower)
-              .appendEntries(request, new IndirectAppendHandler(subFollower, request));
-        } else {
-          syncClient = getSyncClient(subFollower);
-          syncClient.appendEntries(request);
-        }
-      } catch (TException e) {
-        logger.error("Cannot send {} to {}", request, subFollower, e);
-      } finally {
-        if (syncClient != null) {
-          ClientUtils.putBackSyncClient(syncClient);
-        }
-      }
-    }
-  }
-
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   public AppendEntryResult appendEntries(AppendEntriesRequest request)
       throws UnknownLogTypeException {
@@ -1860,20 +1800,22 @@ public abstract class RaftMember implements RaftMemberMBean {
   @SuppressWarnings("java:S2445")
   protected void commitLog(Log log) throws LogExecutionException {
     long startTime;
-    if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-      startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
-      synchronized (logManager) {
-        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
-            startTime);
-        if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-          startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
-          logManager.commitTo(log.getCurrLogIndex());
-          Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
-        }
-        startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
-      }
-      Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
-    }
+    //    if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+    //      startTime =
+    // Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+    //      synchronized (logManager) {
+    //        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+    //            startTime);
+    //        if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+    //          startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+    //          logManager.commitTo(log.getCurrLogIndex());
+    //
+    // Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
+    //        }
+    //        startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
+    //      }
+    //      Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
+    //    }
 
     // when using async applier, the log here may not be applied. To return the execution
     // result, we must wait until the log is applied.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index cc284f0bb4..2734b19b7b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A node report collects the current runtime information of the local node, which contains: 1. The
@@ -204,6 +205,7 @@ public class NodeReport {
   public static class DataMemberReport extends RaftMemberReport {
     RaftNode header;
     long headerLatency;
+    private Map<Node, List<Node>> directToIndirectFollowerMap;
 
     public DataMemberReport(
         NodeCharacter character,
@@ -239,40 +241,49 @@ public class NodeReport {
 
     @Override
     public String toString() {
-      return "DataMemberReport{"
-          + "header="
-          + header.getNode()
-          + ", raftId="
-          + header.getRaftId()
-          + ", character="
-          + character
-          + ", Leader="
-          + leader
-          + ", term="
-          + term
-          + ", lastLogTerm="
-          + lastLogTerm
-          + ", lastLogIndex="
-          + lastLogIndex
-          + ", commitIndex="
-          + commitIndex
-          + ", commitTerm="
-          + commitTerm
-          + ", appliedLogIndex="
-          + maxAppliedLogIndex
-          + ", readOnly="
-          + isReadOnly
-          + ", nextToRelay="
-          + nextToRelay
-          + ", headerLatency="
-          + headerLatency
-          + "ns"
-          + ", lastHeartbeat="
-          + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
-          + "ms ago"
-          + ", logIncrement="
-          + (lastLogIndex - prevLastLogIndex)
-          + '}';
+      String s =
+          "DataMemberReport{"
+              + "header="
+              + header.getNode()
+              + ", raftId="
+              + header.getRaftId()
+              + ", character="
+              + character
+              + ", Leader="
+              + leader
+              + ", term="
+              + term
+              + ", lastLogTerm="
+              + lastLogTerm
+              + ", lastLogIndex="
+              + lastLogIndex
+              + ", commitIndex="
+              + commitIndex
+              + ", commitTerm="
+              + commitTerm
+              + ", appliedLogIndex="
+              + maxAppliedLogIndex
+              + ", readOnly="
+              + isReadOnly
+              + ", nextToRelay="
+              + nextToRelay
+              + ", headerLatency="
+              + headerLatency
+              + "ns"
+              + ", lastHeartbeat="
+              + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
+              + "ms ago"
+              + ", logIncrement="
+              + (lastLogIndex - prevLastLogIndex);
+      if (directToIndirectFollowerMap != null) {
+        s = s + ", relayMap=" + directToIndirectFollowerMap;
+      }
+      s = s + '}';
+      return s;
+    }
+
+    public void setDirectToIndirectFollowerMap(Map<Node, List<Node>> directToIndirectFollowerMap) {
+      this.directToIndirectFollowerMap = directToIndirectFollowerMap;
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 6f4039456b..e1456e307b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.server.monitor;
 
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
+import org.apache.iotdb.cluster.utils.WindowStatistic;
 
 import java.util.Date;
 import java.util.Objects;
@@ -57,6 +58,9 @@ public class NodeStatus implements Comparable<NodeStatus> {
 
   private AtomicLong sendEntryNum = new AtomicLong();
   private AtomicLong sendEntryLatencySum = new AtomicLong();
+  private WindowStatistic sendEntryLatencyStatistic = new WindowStatistic();
+
+  private double relayWeight;
 
   // TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
   @Override
@@ -107,6 +111,14 @@ public class NodeStatus implements Comparable<NodeStatus> {
     this.lastResponseLatency = lastResponseLatency;
   }
 
+  public double getRelayWeight() {
+    return relayWeight;
+  }
+
+  public void setRelayWeight(double relayWeight) {
+    this.relayWeight = relayWeight;
+  }
+
   public void activate() {
     isActivated = true;
   }
@@ -129,6 +141,10 @@ public class NodeStatus implements Comparable<NodeStatus> {
     return sendEntryLatencySum;
   }
 
+  public WindowStatistic getSendEntryLatencyStatistic() {
+    return sendEntryLatencyStatistic;
+  }
+
   @Override
   public String toString() {
     return "NodeStatus{"
@@ -148,6 +164,10 @@ public class NodeStatus implements Comparable<NodeStatus> {
         + sendEntryLatencySum
         + ", sendEntryLatencyAvg="
         + (sendEntryLatencySum.get() * 1.0 / sendEntryNum.get())
+        + ", latestSendEntryLatencyAvg="
+        + sendEntryLatencyStatistic.getAvg()
+        + ", nodeRelayWeight="
+        + relayWeight
         + '}';
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index eb79a84d2a..4edcbb8d57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.net.ConnectException;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -47,7 +48,7 @@ public class NodeStatusManager {
 
   private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
   // a status is considered stale if it is older than one minute and should be updated
-  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 10L;
+  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 1000L;
   private static final NodeStatusManager INSTANCE = new NodeStatusManager();
 
   private MetaGroupMember metaGroupMember;
@@ -185,4 +186,10 @@ public class NodeStatusManager {
   public Map<Node, NodeStatus> getNodeStatusMap() {
     return Collections.unmodifiableMap(nodeStatusMap);
   }
+
+  public void report() {
+    for (Entry<Node, NodeStatus> nodeNodeStatusEntry : getNodeStatusMap().entrySet()) {
+      logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 72892bdb56..7ded0c910e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.monitor;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.utils.WindowStatistic;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -334,6 +335,7 @@ public class Timer {
     RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
     RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
     RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+    RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
     RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
@@ -344,6 +346,7 @@ public class Timer {
     String blockName;
     AtomicLong sum = new AtomicLong(0);
     AtomicLong counter = new AtomicLong(0);
+    private WindowStatistic latestWindow = new WindowStatistic();
     long max;
     double scale;
     boolean valid;
@@ -370,6 +373,7 @@ public class Timer {
         sum.addAndGet(val);
         counter.incrementAndGet();
         max = Math.max(max, val);
+        latestWindow.add(val);
       }
     }
 
@@ -399,6 +403,7 @@ public class Timer {
       sum.set(0);
       counter.set(0);
       max = 0;
+      latestWindow.reset();
     }
 
     /** WARN: no current safety guarantee. */
@@ -413,12 +418,28 @@ public class Timer {
       double s = sum.get() / scale;
       long cnt = counter.get();
       double avg = s / cnt;
-      return String.format("%s - %s: %.2f, %d, %.2f, %d", className, blockName, s, cnt, avg, max);
+      return String.format(
+          "%s - %s: %.2f, %d, %.2f, %d, %.2f",
+          className, blockName, s, cnt, avg, max, latestWindow.getAvg());
     }
 
     public long getCnt() {
       return counter.get();
     }
+
+    public long getSum() {
+      return sum.get();
+    }
+
+    public static long getTotalFanout() {
+      return Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt();
+    }
+
+    public static double getSendLatency() {
+      return (Statistic.RAFT_SENDER_SEND_LOG.getSum() + Statistic.RAFT_RECEIVER_RELAY_LOG.getSum())
+          * 1.0
+          / (Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt());
+    }
   }
 
   public static String getReport() {
@@ -427,6 +448,11 @@ public class Timer {
     }
     StringBuilder result = new StringBuilder();
     printTo(Statistic.ROOT, result);
+    result.append(System.lineSeparator());
+    result.append(
+        String.format(
+            "Total request fanout: %d, send entry latency: %f",
+            Statistic.getTotalFanout(), Statistic.getSendLatency()));
     return result.toString();
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index f8df9d8d83..7887cc8084 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -161,9 +161,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
    */
   @Override
   public TNodeStatus queryNodeStatus() {
-    return new TNodeStatus()
-        .setFanoutRequestNum(
-            Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt());
+    return new TNodeStatus().setFanoutRequestNum(Statistic.getTotalFanout());
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java
new file mode 100644
index 0000000000..854d974a95
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.utils;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class WindowStatistic {
+
+  private static final int DEFAULT_LENGTH = 1000;
+
+  private Queue<Long> values = new ArrayDeque<>();
+  private volatile long sum;
+  private volatile int cnt;
+  private int windowLength = DEFAULT_LENGTH;
+
+  public synchronized void add(long val) {
+    values.add(val);
+    sum += val;
+    cnt++;
+
+    if (cnt > windowLength) {
+      Long poll = values.poll();
+      sum -= poll;
+      cnt--;
+    }
+  }
+
+  public double getAvg() {
+    return cnt == 0 ? 0.0 : sum * 1.0 / cnt;
+  }
+
+  public synchronized void reset() {
+    values.clear();
+    cnt = 0;
+    sum = 0;
+  }
+}