You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/14 01:11:54 UTC

[iotdb] 01/02: add statistics

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a1006dc39d4f5bb2a73e678a1cdb7011a2d4880a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Apr 14 09:05:30 2023 +0800

    add statistics
---
 .../common/request/ByteBufferConsensusRequest.java |   5 +
 .../iotdb/consensus/natraft/RaftConsensus.java     |  31 ++++
 .../consensus/natraft/protocol/RaftConfig.java     |  55 +++++++
 .../consensus/natraft/protocol/RaftMember.java     |  83 +++++++---
 .../protocol/heartbeat/HeartbeatThread.java        |   4 +
 .../log/appender/SlidingWindowLogAppender.java     |   6 +-
 .../log/dispatch/AppendNodeEntryHandler.java       |   3 +
 .../protocol/log/dispatch/LogDispatcher.java       |  19 ++-
 .../log/dispatch/flowcontrol/FlowBalancer.java     |  60 +++++--
 .../log/dispatch/flowcontrol/FlowMonitor.java      |  86 ++++++----
 .../dispatch/flowcontrol/FlowMonitorManager.java   |  28 +++-
 .../log/dispatch/flowcontrol/FlowWindow.java}      |  26 ++-
 .../protocol/log/manager/RaftLogManager.java       |   3 +
 .../serialization/SyncLogDequeSerializer.java      |   1 +
 .../iotdb/consensus/natraft/utils/NodeReport.java  | 176 +++++++++++++++++++++
 .../iotdb/consensus/natraft/utils/Timer.java       |  23 ++-
 distribution/distribute-ecs.sh                     |  31 ++++
 .../plan/planner/plan/node/write/InsertNode.java   |  10 ++
 .../planner/plan/node/write/InsertTabletNode.java  |  43 +++++
 .../java/org/apache/iotdb/session/Session.java     |   5 +-
 20 files changed, 599 insertions(+), 99 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
index 0310a4e3d3..b05a35a561 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
@@ -39,4 +39,9 @@ public class ByteBufferConsensusRequest implements IConsensusRequest {
   public ByteBuffer serializeToByteBuffer() {
     return byteBuffer;
   }
+
+  @Override
+  public long estimateSize() {
+    return byteBuffer.remaining();
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index ac1f88d2a1..2d4611b7be 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.consensus.natraft;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
@@ -48,6 +50,8 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
 import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
+import org.apache.iotdb.consensus.natraft.utils.NodeReport;
+import org.apache.iotdb.consensus.natraft.utils.NodeReport.RaftMemberReport;
 import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
 import org.apache.iotdb.consensus.natraft.utils.Timer;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -65,6 +69,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class RaftConsensus implements IConsensus {
@@ -79,6 +85,7 @@ public class RaftConsensus implements IConsensus {
   private final RegisterManager registerManager = new RegisterManager();
   private final RaftConfig config;
   private final IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
+  private ScheduledExecutorService reportThread;
 
   public RaftConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -109,6 +116,9 @@ public class RaftConsensus implements IConsensus {
                 () -> {
                   logger.info(Timer.Statistic.getReport());
                 }));
+    reportThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        reportThread, this::generateNodeReport, 5, 5, TimeUnit.SECONDS);
   }
 
   private void initAndRecover() throws IOException {
@@ -372,4 +382,25 @@ public class RaftConsensus implements IConsensus {
   public TEndPoint getThisNode() {
     return thisNode;
   }
+
+  private void generateNodeReport() {
+    if (logger.isInfoEnabled()) {
+      try {
+        NodeReport report = new NodeReport(thisNode);
+        List<RaftMemberReport> reports = new ArrayList<>();
+        for (RaftMember value : stateMachineMap.values()) {
+          RaftMemberReport raftMemberReport = value.genMemberReport();
+          if (raftMemberReport.getPrevLastLogIndex() != raftMemberReport.getLastLogIndex()) {
+            reports.add(raftMemberReport);
+          }
+        }
+        if (!reports.isEmpty()) {
+          report.setMemberReports(reports);
+          logger.info(report.toString());
+        }
+      } catch (Exception e) {
+        logger.error("exception occurred when generating node report", e);
+      }
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 3d2991da3e..be3e56d231 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -63,6 +63,7 @@ public class RaftConfig {
   private int raftLogBufferSize = 64 * 1024 * 1024;
   private int maxNumberOfLogsPerFetchOnDisk = 1000;
   private int maxRaftLogIndexSizeInMemory = 64 * 1024;
+  private int maxRaftLogPersistDataSizePerFile = 1024 * 1024 * 1024;
   private int maxNumberOfPersistRaftLogFiles = 128;
   private int maxPersistRaftLogNumberOnDisk = 10_000_000;
   private int flushRaftLogThreshold = 100_000;
@@ -71,6 +72,9 @@ public class RaftConfig {
   private boolean enableCompressedDispatching = true;
   private boolean ignoreStateMachine = false;
   private boolean onlyTestNetwork = false;
+  private boolean waitApply = true;
+  private double flowControlMinFlow = 10_000_000;
+  private double flowControlMaxFlow = 100_000_000;
   private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
   private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
   private RPCConfig rpcConfig;
@@ -400,6 +404,38 @@ public class RaftConfig {
     this.ignoreStateMachine = ignoreStateMachine;
   }
 
+  public double getFlowControlMinFlow() {
+    return flowControlMinFlow;
+  }
+
+  public void setFlowControlMinFlow(double flowControlMinFlow) {
+    this.flowControlMinFlow = flowControlMinFlow;
+  }
+
+  public double getFlowControlMaxFlow() {
+    return flowControlMaxFlow;
+  }
+
+  public void setFlowControlMaxFlow(double flowControlMaxFlow) {
+    this.flowControlMaxFlow = flowControlMaxFlow;
+  }
+
+  public boolean isWaitApply() {
+    return waitApply;
+  }
+
+  public void setWaitApply(boolean waitApply) {
+    this.waitApply = waitApply;
+  }
+
+  public int getMaxRaftLogPersistDataSizePerFile() {
+    return maxRaftLogPersistDataSizePerFile;
+  }
+
+  public void setMaxRaftLogPersistDataSizePerFile(int maxRaftLogPersistDataSizePerFile) {
+    this.maxRaftLogPersistDataSizePerFile = maxRaftLogPersistDataSizePerFile;
+  }
+
   public void loadProperties(Properties properties) {
     logger.debug("Loading properties: {}", properties);
 
@@ -476,6 +512,11 @@ public class RaftConfig {
             properties.getProperty(
                 "raft_log_buffer_size", String.valueOf(this.getRaftLogBufferSize()))));
 
+    this.setMaxRaftLogPersistDataSizePerFile(
+        Integer.parseInt(
+            properties.getProperty(
+                "raft_log_file_size", String.valueOf(this.getMaxRaftLogPersistDataSizePerFile()))));
+
     this.setLogNumInBatch(
         Integer.parseInt(
             properties.getProperty("log_batch_num", String.valueOf(this.getLogNumInBatch()))));
@@ -592,6 +633,20 @@ public class RaftConfig {
         Boolean.parseBoolean(
             properties.getProperty("only_test_network", String.valueOf(this.isOnlyTestNetwork()))));
 
+    this.setWaitApply(
+        Boolean.parseBoolean(
+            properties.getProperty("wait_apply", String.valueOf(this.isWaitApply()))));
+
+    this.setFlowControlMinFlow(
+        Double.parseDouble(
+            properties.getProperty(
+                "flow_control_min_flow", String.valueOf(this.getFlowControlMinFlow()))));
+
+    this.setFlowControlMaxFlow(
+        Double.parseDouble(
+            properties.getProperty(
+                "flow_control_max_flow", String.valueOf(this.getFlowControlMaxFlow()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 5271e793aa..8d84036592 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSeq
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
 import org.apache.iotdb.consensus.natraft.utils.IOUtils;
 import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.NodeReport.RaftMemberReport;
 import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
 import org.apache.iotdb.consensus.natraft.utils.Response;
 import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
@@ -176,6 +177,7 @@ public class RaftMember {
   private ExecutorService commitLogPool;
 
   private long lastCommitTaskTime;
+  private long lastReportIndex;
 
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
@@ -512,7 +514,9 @@ public class RaftMember {
     }
 
     AppendEntryResult response;
+    long startTime = Statistic.RAFT_RECEIVER_PARSE_ENTRY.getOperationStartTime();
     List<Entry> entries = LogUtils.parseEntries(request.entries, stateMachine);
+    Statistic.RAFT_RECEIVER_PARSE_ENTRY.calOperationCostTimeFromStart(startTime);
 
     response = logAppender.appendEntries(request.leaderCommit, request.term, entries);
 
@@ -644,7 +648,7 @@ public class RaftMember {
     Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.add(entry.createTime - entry.receiveTime);
 
     if (config.isUseFollowerLoadBalance()) {
-      FlowMonitorManager.INSTANCE.report(thisNode, entry.estimateSize());
+      FlowMonitorManager.INSTANCE.report(thisNode.getEndpoint(), entry.estimateSize());
     }
 
     if (votingEntry == null) {
@@ -677,15 +681,15 @@ public class RaftMember {
     }
   }
 
-  private TSStatus includeLogNumbersInStatus(TSStatus status, Entry entry) {
+  private TSStatus includeLogNumbersInStatus(TSStatus status, long index, long term) {
     return status.setMessage(
         getRaftGroupId().getType().ordinal()
             + "-"
             + getRaftGroupId().getId()
             + "-"
-            + entry.getCurrLogIndex()
+            + index
             + "-"
-            + entry.getCurrLogTerm());
+            + term);
   }
 
   protected AppendLogResult waitAppendResult(VotingEntry votingEntry) {
@@ -748,28 +752,32 @@ public class RaftMember {
     }
     long waitTime = 1;
     AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
-    synchronized (log.getEntry()) {
-      while (acceptedType == AcceptedType.NOT_ACCEPTED
-          && alreadyWait < config.getWriteOperationTimeoutMS()) {
+
+    while (acceptedType == AcceptedType.NOT_ACCEPTED
+        && alreadyWait < config.getWriteOperationTimeoutMS()) {
+      long startTime = Statistic.RAFT_SENDER_LOG_APPEND_WAIT.getOperationStartTime();
+      synchronized (log.getEntry()) {
         try {
           log.getEntry().wait(waitTime);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("Unexpected interruption when sending a log", e);
         }
-        acceptedType = votingLogList.computeAcceptedType(log);
-
-        alreadyWait = (System.nanoTime() - waitStart) / 1000000;
-        if (alreadyWait > nextTimeToPrint) {
-          logger.info(
-              "Still not receive enough votes for {}, weakly " + "accepted {}, wait {}ms ",
-              log,
-              log.getWeaklyAcceptedNodes(),
-              alreadyWait);
-          nextTimeToPrint *= 2;
-        }
+      }
+      Statistic.RAFT_SENDER_LOG_APPEND_WAIT.calOperationCostTimeFromStart(startTime);
+
+      acceptedType = votingLogList.computeAcceptedType(log);
+      alreadyWait = (System.nanoTime() - waitStart) / 1000000;
+      if (alreadyWait > nextTimeToPrint) {
+        logger.info(
+            "Still not receive enough votes for {}, weakly " + "accepted {}, wait {}ms ",
+            log,
+            log.getWeaklyAcceptedNodes(),
+            alreadyWait);
+        nextTimeToPrint *= 2;
       }
     }
+
     if (logger.isDebugEnabled()) {
       Thread.currentThread().setName(threadBaseName);
     }
@@ -973,7 +981,7 @@ public class RaftMember {
       logger.debug("{}: plan {} has no where to be forwarded", name, plan);
       return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader to forward in: " + groupId);
     }
-    logger.debug("{}: Forward {} to node {}", name, plan, node);
+    logger.info("{}: Forward {} to node {}", name, plan, node);
 
     TSStatus status;
     status = forwardPlanAsync(plan, node, groupId);
@@ -1244,11 +1252,25 @@ public class RaftMember {
           System.nanoTime() - votingEntry.getEntry().createTime);
       switch (appendLogResult) {
         case WEAK_ACCEPT:
+          Statistic.RAFT_LEADER_WEAK_ACCEPT.add(1);
           return includeLogNumbersInStatus(
-              StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), votingEntry.getEntry());
+              StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+              votingEntry.getEntry().getCurrLogIndex(),
+              votingEntry.getEntry().getCurrLogTerm());
         case OK:
-          waitApply(votingEntry.getEntry());
-          return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), votingEntry.getEntry());
+          if (config.isWaitApply()) {
+            waitApply(votingEntry.getEntry());
+            return includeLogNumbersInStatus(
+                StatusUtils.OK.deepCopy(),
+                votingEntry.getEntry().getCurrLogIndex(),
+                votingEntry.getEntry().getCurrLogTerm());
+          } else {
+            return includeLogNumbersInStatus(
+                StatusUtils.OK.deepCopy(),
+                logManager.getAppliedIndex(),
+                logManager.getAppliedTerm());
+          }
+
         case TIME_OUT:
           logger.debug("{}: log {} timed out...", name, votingEntry.getEntry());
           break;
@@ -1345,4 +1367,21 @@ public class RaftMember {
       return StatusUtils.TIME_OUT;
     }
   }
+
+  public RaftMemberReport genMemberReport() {
+    long prevLastLogIndex = lastReportIndex;
+    lastReportIndex = logManager.getLastLogIndex();
+    return new RaftMemberReport(
+        status.role,
+        status.getLeader().get(),
+        status.getTerm().get(),
+        logManager.getLastLogTerm(),
+        lastReportIndex,
+        logManager.getCommitLogIndex(),
+        logManager.getCommitLogTerm(),
+        readOnly,
+        heartbeatThread.getLastHeartbeatReceivedTime(),
+        prevLastLogIndex,
+        logManager.getAppliedIndex());
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index 660e6feff6..4f2bdc07b5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -368,4 +368,8 @@ public class HeartbeatThread implements Runnable {
   public Object getElectionWaitObject() {
     return electionWaitObject;
   }
+
+  public long getLastHeartbeatReceivedTime() {
+    return lastHeartbeatReceivedTime;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 39b58cd424..9702b4b461 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -82,7 +82,7 @@ public class SlidingWindowLogAppender implements LogAppender {
     // check the next entry
     Entry entry = logWindow[pos];
     boolean nextMismatch = false;
-    if (logWindow[pos + 1] != null && pos < windowCapacity - 1) {
+    if (pos < windowCapacity - 1 && logWindow[pos + 1] != null) {
       long nextPrevTerm = logWindow[pos + 1].getPrevTerm();
       if (nextPrevTerm != entry.getCurrLogTerm()) {
         nextMismatch = true;
@@ -180,7 +180,9 @@ public class SlidingWindowLogAppender implements LogAppender {
 
     AppendEntryResult result = null;
     for (Entry entry : entries) {
+      long startTime = Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.getOperationStartTime();
       result = appendEntry(leaderCommit, entry);
+      Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.calOperationCostTimeFromStart(startTime);
 
       if (result.status != Response.RESPONSE_AGREE
           && result.status != Response.RESPONSE_STRONG_ACCEPT
@@ -215,8 +217,10 @@ public class SlidingWindowLogAppender implements LogAppender {
         checkLog(windowPos);
         if (windowPos == 0) {
           appended = flushWindow(result);
+          Statistic.RAFT_FOLLOWER_STRONG_ACCEPT.calOperationCostTimeFromStart(startTime);
         } else {
           result.status = Response.RESPONSE_WEAK_ACCEPT;
+          Statistic.RAFT_FOLLOWER_WEAK_ACCEPT.calOperationCostTimeFromStart(startTime);
         }
       } else {
         result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 3c98156091..7d169e6e66 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -76,6 +76,9 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
       member.getVotingLogList().onStronglyAccept(votingEntry, trueReceiver);
       member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
+      synchronized (votingEntry.getEntry()) {
+        votingEntry.getEntry().notifyAll();
+      }
     } else if (resp > 0) {
       // a response > 0 is the follower's term
       // the leadership is stale, wait for the new leader's heartbeat
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 463f9f0748..38ad5ff228 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -96,9 +96,10 @@ public class LogDispatcher {
   }
 
   public void updateRateLimiter() {
-    logger.info("TEndPoint rates: {}", nodesRate);
+    logger.info("{}: TEndPoint rates: {}", member.getName(), nodesRate);
     for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) {
-      nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
+      nodesRateLimiter.put(
+          nodeDoubleEntry.getKey(), RateLimiter.create(nodeDoubleEntry.getValue()));
     }
   }
 
@@ -106,6 +107,7 @@ public class LogDispatcher {
     BlockingQueue<VotingEntry> logBlockingQueue;
     logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
     nodesLogQueuesMap.put(node, logBlockingQueue);
+    nodesRate.put(node, Double.MAX_VALUE);
     nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
 
     for (int i = 0; i < bindingThreadNum; i++) {
@@ -261,6 +263,13 @@ public class LogDispatcher {
 
         request.getAppendEntryRequest().entry = request.getEntry().serialize();
         request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
+        logger.debug(
+            "{}/{}={}",
+            request.getEntry().estimateSize(),
+            request.getAppendEntryRequest().entry.remaining(),
+            request.getEntry().estimateSize()
+                * 1.0
+                / request.getAppendEntryRequest().entry.remaining());
       }
     }
 
@@ -346,12 +355,12 @@ public class LogDispatcher {
 
         for (; logIndex < currBatch.size(); logIndex++) {
           VotingEntry entry = currBatch.get(logIndex);
-          long curSize = entry.getAppendEntryRequest().entry.array().length;
+          long curSize = entry.getAppendEntryRequest().entry.remaining();
           if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
             break;
           }
           logSize += curSize;
-          logList.add(entry.getAppendEntryRequest().entry);
+          logList.add(entry.getAppendEntryRequest().entry.slice());
           Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
               entry.getEntry().createTime);
         }
@@ -365,7 +374,7 @@ public class LogDispatcher {
         }
 
         if (config.isUseFollowerLoadBalance()) {
-          FlowMonitorManager.INSTANCE.report(receiver, logSize);
+          FlowMonitorManager.INSTANCE.report(receiver.getEndpoint(), logSize);
         }
         nodesRateLimiter.get(receiver).acquire((int) logSize);
       }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 3c838f81b7..bbc9fd66ee 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -34,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,22 +44,26 @@ import java.util.concurrent.TimeUnit;
 public class FlowBalancer {
 
   private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
-  private double maxFlow = 900_000_000;
-  private double minFlow = 10_000_000;
+  private double maxFlow;
+  private double minFlow;
   private int windowsToUse;
   private double overestimateFactor;
   private int flowBalanceIntervalMS = 1000;
   private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
   private LogDispatcher logDispatcher;
   private RaftMember member;
-
   private ScheduledExecutorService scheduledExecutorService;
+  private volatile boolean inBurst = false;
+  private RaftConfig config;
 
   public FlowBalancer(LogDispatcher logDispatcher, RaftMember member, RaftConfig config) {
     this.logDispatcher = logDispatcher;
     this.member = member;
-    windowsToUse = config.getFollowerLoadBalanceWindowsToUse();
-    overestimateFactor = config.getFollowerLoadBalanceOverestimateFactor();
+    this.windowsToUse = config.getFollowerLoadBalanceWindowsToUse();
+    this.overestimateFactor = config.getFollowerLoadBalanceOverestimateFactor();
+    this.minFlow = config.getFlowControlMinFlow();
+    this.maxFlow = config.getFlowControlMaxFlow();
+    this.config = config;
   }
 
   public void start() {
@@ -84,25 +90,54 @@ public class FlowBalancer {
 
     int nodeNum = member.getAllNodes().size();
     int followerNum = nodeNum - 1;
+    long flowMonitorWindowInterval = config.getFlowMonitorWindowInterval();
 
-    double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
-    double assumedFlow = thisNodeFlow * overestimateFactor;
-    logger.info("Flow of this node: {}", thisNodeFlow);
+    List<FlowWindow> latestWindows =
+        flowMonitorManager.getLatestWindows(member.getThisNode().getEndpoint(), windowsToUse);
+    if (latestWindows.size() < windowsToUse) {
+      return;
+    }
+    int burstWindowNum = 0;
+    for (FlowWindow latestWindow : latestWindows) {
+      double assumedFlow =
+          latestWindow.sum * 1.0 * flowMonitorWindowInterval / 1000 * overestimateFactor;
+      if (assumedFlow * followerNum > maxFlow) {
+        burstWindowNum++;
+      }
+    }
+    double assumedFlow =
+        latestWindows.stream().mapToLong(w -> w.sum).sum()
+            * 1.0
+            / latestWindows.size()
+            * flowMonitorWindowInterval
+            * overestimateFactor;
+
+    for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
+      logger.info(
+          "{}: Flow of {}: {}, {}, {}",
+          member.getName(),
+          entry.getKey(),
+          entry.getValue().getLatestWindows(windowsToUse),
+          entry.getValue().averageFlow(windowsToUse),
+          inBurst);
+    }
     Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
     Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
 
     // sort followers according to their queue length
     followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size()));
-    if (assumedFlow * followerNum > maxFlow) {
+    if (burstWindowNum > latestWindows.size() / 2 && !inBurst) {
       enterBurst(nodesRate, nodeNum, assumedFlow, followers);
-    } else {
+      logDispatcher.updateRateLimiter();
+    } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) {
       exitBurst(followerNum, nodesRate, followers);
+      logDispatcher.updateRateLimiter();
     }
-    logDispatcher.updateRateLimiter();
   }
 
   private void enterBurst(
       Map<Peer, Double> nodesRate, int nodeNum, double assumedFlow, List<Peer> followers) {
+    logger.info("{}: entering burst", member.getName());
     int followerNum = nodeNum - 1;
     int quorumFollowerNum = nodeNum / 2;
     double remainingFlow = maxFlow;
@@ -123,13 +158,16 @@ public class FlowBalancer {
       Peer node = followers.get(i);
       nodesRate.put(node, flowToRemaining);
     }
+    inBurst = true;
   }
 
   private void exitBurst(int followerNum, Map<Peer, Double> nodesRate, List<Peer> followers) {
+    logger.info("{}: exiting burst", member.getName());
     // lift flow limits
     for (int i = 0; i < followerNum; i++) {
       Peer node = followers.get(i);
       nodesRate.put(node, maxFlow);
     }
+    inBurst = false;
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
index 79ff88984b..5689a1effd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
@@ -19,9 +19,8 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
 
-import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,24 +32,27 @@ import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 
 public class FlowMonitor {
 
   private static final Logger logger = LoggerFactory.getLogger(FlowMonitor.class);
   private static final String FILE_SUFFIX = ".flow";
-  private ArrayDeque<Pair<Long, Long>> windows;
+  private ArrayDeque<FlowWindow> windows;
   private long currWindowStart;
   private long currWindowSum;
   private long windowInterval;
-  private Peer node;
+  private TEndPoint node;
   private int maxWindowSize;
   private BufferedWriter writer;
   private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
   private RaftConfig config;
+  private boolean recordFlow = false;
 
-  public FlowMonitor(Peer node, RaftConfig config) throws IOException {
+  public FlowMonitor(TEndPoint node, RaftConfig config) throws IOException {
     this.maxWindowSize = config.getFlowMonitorMaxWindowNum();
     this.windows = new ArrayDeque<>(maxWindowSize);
     this.windowInterval = config.getFlowMonitorWindowInterval();
@@ -61,17 +63,14 @@ public class FlowMonitor {
 
   private void initSerializer() throws IOException {
     String path =
-        config.getStorageDir()
-            + File.separator
-            + node.getEndpoint().getIp()
-            + "-"
-            + node.getEndpoint().getPort()
-            + FILE_SUFFIX;
+        config.getStorageDir() + File.separator + node.getIp() + "-" + node.getPort() + FILE_SUFFIX;
     File file = new File(path);
     file.delete();
-    writer = new BufferedWriter(new FileWriter(file));
-    writer.write("Time,FlowSum");
-    writer.newLine();
+    if (recordFlow) {
+      writer = new BufferedWriter(new FileWriter(file));
+      writer.write("Time,FlowSum");
+      writer.newLine();
+    }
   }
 
   public void close() {
@@ -79,11 +78,13 @@ public class FlowMonitor {
     while (windows.size() > 0) {
       serializeWindow();
     }
-    try {
-      writer.close();
-      logger.info("Flow monitor {} is closed", node);
-    } catch (IOException e) {
-      logger.warn("Cannot close serializer of {}", node, e);
+    if (writer != null) {
+      try {
+        writer.close();
+        logger.info("Flow monitor {} is closed", node);
+      } catch (IOException e) {
+        logger.warn("Cannot close serializer of {}", node, e);
+      }
     }
   }
 
@@ -93,15 +94,17 @@ public class FlowMonitor {
   }
 
   private void serializeWindow() {
-    Pair<Long, Long> window = windows.removeFirst();
-    try {
-      String windowString =
-          String.format("%s,%d", dateFormat.format(new Date(window.left)), window.right);
-      logger.debug("New window {} serialized by {}", windowString, node);
-      writer.write(windowString);
-      writer.newLine();
-    } catch (IOException e) {
-      logger.warn("Cannot serialize window {} of {}", window, node, e);
+    FlowWindow window = windows.removeFirst();
+    if (writer != null) {
+      try {
+        String windowString =
+            String.format("%s,%d", dateFormat.format(new Date(window.start)), window.sum);
+        logger.debug("New window {} serialized by {}", windowString, node);
+        writer.write(windowString);
+        writer.newLine();
+      } catch (IOException e) {
+        logger.warn("Cannot serialize window {} of {}", window, node, e);
+      }
     }
   }
 
@@ -114,7 +117,7 @@ public class FlowMonitor {
   private void saveWindow() {
     if (currWindowSum != 0) {
       checkSize();
-      windows.add(new Pair<>(currWindowStart, currWindowSum));
+      windows.add(new FlowWindow(currWindowStart, currWindowSum));
       logger.debug("New window {},{} generated by {}", currWindowStart, currWindowSum, node);
     }
   }
@@ -134,11 +137,15 @@ public class FlowMonitor {
   public double averageFlow(int windowsToUse) {
     long flowSum = currWindowSum;
     long intervalSum = System.currentTimeMillis() - currWindowStart;
-    Iterator<Pair<Long, Long>> windowIterator = windows.descendingIterator();
+    if (intervalSum > windowInterval) {
+      intervalSum = windowInterval;
+    }
+
+    Iterator<FlowWindow> windowIterator = windows.descendingIterator();
     for (int i = 1; i < windowsToUse; i++) {
       if (windowIterator.hasNext()) {
-        Pair<Long, Long> window = windowIterator.next();
-        flowSum += window.right;
+        FlowWindow window = windowIterator.next();
+        flowSum += window.sum;
         intervalSum += windowInterval;
       } else {
         break;
@@ -146,4 +153,19 @@ public class FlowMonitor {
     }
     return flowSum * 1.0 / intervalSum * 1000;
   }
+
+  public List<FlowWindow> getLatestWindows(int windowNum) {
+    List<FlowWindow> result = new ArrayList<>();
+    result.add(new FlowWindow(currWindowStart, currWindowSum));
+    Iterator<FlowWindow> windowIterator = windows.descendingIterator();
+    for (int i = 1; i < windowNum; i++) {
+      if (windowIterator.hasNext()) {
+        FlowWindow window = windowIterator.next();
+        result.add(window);
+      } else {
+        break;
+      }
+    }
+    return result;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
index ea7c0f19bf..726ef093db 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
@@ -19,13 +19,15 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
 
-import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -34,7 +36,7 @@ public class FlowMonitorManager {
   private static final Logger logger = LoggerFactory.getLogger(FlowMonitorManager.class);
   public static final FlowMonitorManager INSTANCE = new FlowMonitorManager();
 
-  private Map<Peer, FlowMonitor> monitorMap = new ConcurrentHashMap<>();
+  private Map<TEndPoint, FlowMonitor> monitorMap = new ConcurrentHashMap<>();
   private RaftConfig config;
 
   private FlowMonitorManager() {}
@@ -50,26 +52,36 @@ public class FlowMonitorManager {
     monitorMap.clear();
   }
 
-  public void report(Peer peer, long val) {
+  public void report(TEndPoint endPoint, long val) {
     FlowMonitor flowMonitor =
         monitorMap.computeIfAbsent(
-            peer,
+            endPoint,
             p -> {
               try {
                 return new FlowMonitor(p, config);
               } catch (IOException e) {
-                logger.warn("Cannot register flow monitor for {}", peer, e);
+                logger.warn("Cannot register flow monitor for {}", endPoint, e);
                 return null;
               }
             });
     if (flowMonitor != null) {
       flowMonitor.report(val);
     } else {
-      logger.warn("Flow monitor {} is not registered", peer);
+      logger.warn("Flow monitor {} is not registered", endPoint);
     }
   }
 
-  public double averageFlow(Peer peer, int windowsToUse) {
-    return monitorMap.get(peer).averageFlow(windowsToUse);
+  public double averageFlow(TEndPoint endPoint, int windowsToUse) {
+    FlowMonitor flowMonitor = monitorMap.get(endPoint);
+    return flowMonitor != null ? flowMonitor.averageFlow(windowsToUse) : 0.0;
+  }
+
+  public List<FlowWindow> getLatestWindows(TEndPoint endPoint, int windowNum) {
+    FlowMonitor flowMonitor = monitorMap.get(endPoint);
+    return flowMonitor != null ? flowMonitor.getLatestWindows(windowNum) : Collections.emptyList();
+  }
+
+  public Map<TEndPoint, FlowMonitor> getMonitorMap() {
+    return monitorMap;
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java
similarity index 55%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
copy to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java
index 0310a4e3d3..157ecd398e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java
@@ -16,27 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
 
-package org.apache.iotdb.consensus.common.request;
+public class FlowWindow {
+  public long start;
+  public long sum;
 
-import java.nio.ByteBuffer;
-
-/*
-In general, for the requests from the leader, we can directly strong-cast the class to reduce
-the cost of deserialization during the execution of the leader state machine. For the requests
-received by the followers, the responsibility of deserialization can generally be transferred
-to the state machine layer
-*/
-public class ByteBufferConsensusRequest implements IConsensusRequest {
-
-  private final ByteBuffer byteBuffer;
-
-  public ByteBufferConsensusRequest(ByteBuffer byteBuffer) {
-    this.byteBuffer = byteBuffer;
+  public FlowWindow(long start, long sum) {
+    this.start = start;
+    this.sum = sum;
   }
 
   @Override
-  public ByteBuffer serializeToByteBuffer() {
-    return byteBuffer;
+  public String toString() {
+    return "[" + +start + "," + sum + ']';
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index aee2fa76cc..ebb0a0f37a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -632,6 +632,9 @@ public abstract class RaftLogManager {
       if (entry.createTime != 0) {
         entry.committedTime = System.nanoTime();
         Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(entry.committedTime - entry.createTime);
+        synchronized (entry) {
+          entry.notify();
+        }
       }
       entry.setSerializationCache(null);
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index a9aaae7d83..24b6755771 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -157,6 +157,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     maxRaftLogIndexSizeInMemory = config.getMaxRaftLogIndexSizeInMemory();
     maxNumberOfPersistRaftLogFiles = config.getMaxNumberOfPersistRaftLogFiles();
     maxPersistRaftLogNumberOnDisk = config.getMaxPersistRaftLogNumberOnDisk();
+    maxRaftLogPersistDataSizePerFile = config.getMaxRaftLogPersistDataSizePerFile();
 
     this.logDataFileList = new ArrayList<>();
     this.logIndexFileList = new ArrayList<>();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
new file mode 100644
index 0000000000..e6220a188d
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.utils;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A node report collects the current runtime information of the local node, which contains: 1. The
+ * MetaMemberReport of the meta member. 2. The DataMemberReports of each data member.
+ */
+@SuppressWarnings("java:S107") // reports need enough parameters
+public class NodeReport {
+
+  private TEndPoint thisNode;
+  private List<RaftMemberReport> memberReports;
+
+  public NodeReport(TEndPoint thisNode) {
+    this.thisNode = thisNode;
+    memberReports = new ArrayList<>();
+  }
+
+  public void setMemberReports(List<RaftMemberReport> memberReports) {
+    this.memberReports = memberReports;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("Report of ").append(thisNode).append(System.lineSeparator());
+    for (RaftMemberReport memberReport : memberReports) {
+      stringBuilder.append(memberReport).append(System.lineSeparator());
+    }
+    return stringBuilder.toString();
+  }
+
+  /**
+   * A RaftMemberReport contains the character, leader, term, last log term/index of a raft member.
+   */
+  public static class RaftMemberReport {
+    RaftRole character;
+    Peer leader;
+    long term;
+    long lastLogTerm;
+    long lastLogIndex;
+    long commitIndex;
+    long commitTerm;
+    boolean isReadOnly;
+    long lastHeartbeatReceivedTime;
+    long prevLastLogIndex;
+    long maxAppliedLogIndex;
+
+    public RaftMemberReport(
+        RaftRole character,
+        Peer leader,
+        long term,
+        long lastLogTerm,
+        long lastLogIndex,
+        long commitIndex,
+        long commitTerm,
+        boolean isReadOnly,
+        long lastHeartbeatReceivedTime,
+        long prevLastLogIndex,
+        long maxAppliedLogIndex) {
+      this.character = character;
+      this.leader = leader;
+      this.term = term;
+      this.lastLogTerm = lastLogTerm;
+      this.lastLogIndex = lastLogIndex;
+      this.commitIndex = commitIndex;
+      this.commitTerm = commitTerm;
+      this.isReadOnly = isReadOnly;
+      this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
+      this.prevLastLogIndex = prevLastLogIndex;
+      this.maxAppliedLogIndex = maxAppliedLogIndex;
+    }
+
+    @Override
+    public String toString() {
+      String transportCompressionReport = "";
+      return "RaftReport {\n"
+          + "character="
+          + character
+          + ", Leader="
+          + leader
+          + ", term="
+          + term
+          + ", lastLogTerm="
+          + lastLogTerm
+          + ", lastLogIndex="
+          + lastLogIndex
+          + ", commitIndex="
+          + commitIndex
+          + ", commitTerm="
+          + commitTerm
+          + ", appliedLogIndex="
+          + maxAppliedLogIndex
+          + ", readOnly="
+          + isReadOnly
+          + ", lastHeartbeat="
+          + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
+          + "ms ago"
+          + ", logIncrement="
+          + (lastLogIndex - prevLastLogIndex)
+          + transportCompressionReport
+          + ", \n timer: "
+          + Timer.Statistic.getReport()
+          + '}';
+    }
+
+    public RaftRole getCharacter() {
+      return character;
+    }
+
+    public Peer getLeader() {
+      return leader;
+    }
+
+    public long getTerm() {
+      return term;
+    }
+
+    public long getLastLogTerm() {
+      return lastLogTerm;
+    }
+
+    public long getLastLogIndex() {
+      return lastLogIndex;
+    }
+
+    public long getCommitIndex() {
+      return commitIndex;
+    }
+
+    public long getCommitTerm() {
+      return commitTerm;
+    }
+
+    public boolean isReadOnly() {
+      return isReadOnly;
+    }
+
+    public long getLastHeartbeatReceivedTime() {
+      return lastHeartbeatReceivedTime;
+    }
+
+    public long getPrevLastLogIndex() {
+      return prevLastLogIndex;
+    }
+
+    public long getMaxAppliedLogIndex() {
+      return maxAppliedLogIndex;
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index 9631b892fd..de89657540 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -196,6 +196,12 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_PARSE_ENTRY(
+        RAFT_MEMBER_RECEIVER,
+        "receiver parse entries",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
         RAFT_MEMBER_RECEIVER,
         "receiver wait for prev log",
@@ -220,6 +226,12 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_APPEND_ONE_ENTRY(
+        RAFT_MEMBER_RECEIVER,
+        "receiver append one entries",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_RECEIVER_APPEND_ENTRY(
         RAFT_MEMBER_RECEIVER,
         "append entrys",
@@ -344,6 +356,12 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_APPEND_WAIT(
+        LOG_DISPATCHER,
+        "wait for being appended",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_END(
         LOG_DISPATCHER,
         "from create to wait append end",
@@ -382,7 +400,10 @@ public class Timer {
     RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
-    RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
+    RAFT_LEADER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "leader weak accept", 1, true, ROOT),
+    RAFT_FOLLOWER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "follower weak accept", TIME_SCALE, true, ROOT),
+    RAFT_FOLLOWER_STRONG_ACCEPT(
+        RAFT_MEMBER_SENDER, "follower strong accept", TIME_SCALE, true, ROOT),
     RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT),
     RAFT_INDEX_BLOCKER(RAFT_MEMBER_SENDER, "index blocker", 1, true, ROOT),
     RAFT_APPEND_BLOCKER(RAFT_MEMBER_SENDER, "append blocker", 1, true, ROOT),
diff --git a/distribution/distribute-ecs.sh b/distribution/distribute-ecs.sh
new file mode 100644
index 0000000000..434e485e31
--- /dev/null
+++ b/distribution/distribute-ecs.sh
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+src_lib_path=/d/CodeRepo/iotdb2/distribution/target/apache-iotdb-1.1.0-SNAPSHOT-all-bin/apache-iotdb-1.1.0-SNAPSHOT-all-bin/lib/iotdb*
+
+ips=(ecs1 ecs2 ecs3 ecs4)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
+target_lib_path=/root/jt/iotdb_expr/apache-iotdb-1.1.0-SNAPSHOT-all-bin/lib/
+
+for ip in ${ips[*]}
+  do
+    ssh root@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path root@$ip:$target_lib_path
+  done
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index ae890e6a32..4b40393b25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -399,4 +399,14 @@ public abstract class InsertNode extends WritePlanNode {
   public PartialPath conflictKey() {
     return devicePath;
   }
+
+  @Override
+  public long estimateSize() {
+    long size = 0;
+    size += devicePath.getFullPath().length() * 2L;
+    for (String measurement : measurements) {
+      size += measurement.length() * 2L;
+    }
+    return size;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 19e157493f..7fa6b8ba30 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1159,4 +1159,47 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
       throw new SemanticException(e);
     }
   }
+
+  @Override
+  public long estimateSize() {
+    long size = super.estimateSize();
+
+    size += times.length * 8L;
+
+    for (int measurementIndex = 0; measurementIndex < columns.length; measurementIndex++) {
+      switch (dataTypes[measurementIndex]) {
+        case INT32:
+          int[] intValues = (int[]) columns[measurementIndex];
+          size += intValues.length * 4L;
+          break;
+        case INT64:
+          long[] longValues = (long[]) columns[measurementIndex];
+          size += longValues.length * 8L;
+          break;
+        case FLOAT:
+          float[] floatValues = (float[]) columns[measurementIndex];
+          size += floatValues.length * 4L;
+          break;
+        case DOUBLE:
+          double[] doubleValues = (double[]) columns[measurementIndex];
+          size += doubleValues.length * 8L;
+          break;
+        case BOOLEAN:
+          boolean[] boolValues = (boolean[]) columns[measurementIndex];
+          size += boolValues.length;
+          break;
+        case TEXT:
+          Binary[] binaryValues = (Binary[]) columns[measurementIndex];
+          for (Binary binaryValue : binaryValues) {
+            size += binaryValue.getLength();
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
+      }
+    }
+
+    return size;
+  }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d3edee11d5..6010d8a87f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -746,7 +746,7 @@ public class Session implements ISession {
     } catch (RedirectException e) {
       handleQueryRedirection(e.getEndPoint());
       if (enableQueryRedirection) {
-        logger.debug(
+        logger.info(
             "{} redirect query {} to {}",
             defaultSessionConnection.getEndPoint(),
             sql,
@@ -795,7 +795,7 @@ public class Session implements ISession {
     } catch (RedirectException e) {
       handleQueryRedirection(e.getEndPoint());
       if (enableQueryRedirection) {
-        logger.debug("redirect query {} to {}", paths, e.getEndPoint());
+        logger.info("redirect query {} to {}", paths, e.getEndPoint());
         // retry
         try {
           return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
@@ -1089,6 +1089,7 @@ public class Session implements ISession {
 
   private void handleRedirection(String deviceId, TEndPoint endpoint) {
     if (enableRedirection) {
+      logger.info("Redirect to {}", endpoint);
       // no need to redirection
       if (endpoint.ip.equals("0.0.0.0")) {
         return;