You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/09 04:07:09 UTC

[iotdb] branch expr_vgraft updated: add more timers and timer config

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new 3329414c79 add more timers and timer config
3329414c79 is described below

commit 3329414c799d5cf51bfea60d8eb443a84513bb84
Author: Tian Jiang <jt...@163.com>
AuthorDate: Sun Oct 9 12:07:00 2022 +0800

    add more timers and timer config
---
 cluster/distribute-dc.sh                           |   4 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 +++
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 ++
 .../iotdb/cluster/log/FragmentedLogDispatcher.java |   6 ++
 .../iotdb/cluster/log/IndirectLogDispatcher.java   |   7 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  71 ++++-----------
 .../apache/iotdb/cluster/log/VotingLogList.java    | 100 ++++++++++++++-------
 .../cluster/log/applier/AsyncDataLogApplier.java   |   8 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   9 +-
 .../log/sequencing/AsynchronousSequencer.java      |   8 +-
 .../log/sequencing/SynchronousSequencer.java       |  18 ++--
 .../handlers/caller/AppendNodeEntryHandler.java    |  46 +++++-----
 .../iotdb/cluster/server/member/RaftMember.java    |  94 +++++++------------
 .../apache/iotdb/cluster/server/monitor/Timer.java |  39 ++++++--
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |   2 +-
 15 files changed, 215 insertions(+), 212 deletions(-)

diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 75757ebcb2..599b658324 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,7 +1,7 @@
 src_lib_path=/d/CodeRepo/iotdb/cluster/target/iotdb-cluster-0.14.0-SNAPSHOT/lib/iotdb*
 
-ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
-#ips=(dc11 dc12)
+ips=(dc15 dc16 dc17 dc18)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
 target_lib_path=/home/jt/iotdb_expr_vg/lib
 
 for ip in ${ips[*]}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index a9724b4280..3232ecd5b4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -221,6 +221,8 @@ public class ClusterConfig {
 
   private boolean useCRaft = false;
 
+  private boolean enableInstrumenting = true;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
    * there is something error for getting the ip of the hostname, then set the internalIp as
@@ -692,4 +694,12 @@ public class ClusterConfig {
   public void setUseCRaft(boolean useCRaft) {
     this.useCRaft = useCRaft;
   }
+
+  public boolean isEnableInstrumenting() {
+    return enableInstrumenting;
+  }
+
+  public void setEnableInstrumenting(boolean enableInstrumenting) {
+    this.enableInstrumenting = enableInstrumenting;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 562cff9c07..42a5b80233 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -392,6 +392,11 @@ public class ClusterDescriptor {
         Boolean.parseBoolean(
             properties.getProperty("use_c_raft", String.valueOf(config.isUseCRaft()))));
 
+    config.setEnableInstrumenting(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_instrumenting", String.valueOf(config.isEnableInstrumenting()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index 49dcc642d5..fc31f792c6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -40,6 +40,10 @@ public class FragmentedLogDispatcher extends LogDispatcher {
   }
 
   public void offer(SendLogRequest request) {
+    if (!(request.getVotingLog().getLog() instanceof FragmentedLog)) {
+      super.offer(request);
+      return;
+    }
     // do serialization here to avoid taking LogManager for too long
 
     long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
@@ -89,6 +93,8 @@ public class FragmentedLogDispatcher extends LogDispatcher {
       for (SendLogRequest request : currBatch) {
         Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
             request.getVotingLog().getLog().getEnqueueTime());
+        Statistic.LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE.calOperationCostTimeFromStart(
+            request.getVotingLog().getLog().getCreateTime());
         long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
         request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
         Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 56b4cf2a50..c852575275 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -80,7 +80,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
         BlockingQueue<SendLogRequest> logBlockingQueue;
         logBlockingQueue =
             new ArrayBlockingQueue<>(
-                ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+                ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem(), true);
         nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
       }
     }
@@ -92,10 +92,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
                 pair.left,
                 n ->
                     IoTDBThreadPoolFactory.newCachedThreadPool(
-                        "LogDispatcher-"
-                            + member.getName()
-                            + "-"
-                            + ClusterUtils.nodeToString(pair.left)))
+                        "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
             .submit(newDispatcherThread(pair.left, pair.right));
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a79bb8098a..c6cfbd2703 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -60,9 +60,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPATCHER_LOG_ENQUEUE_SINGLE;
 
 /**
  * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
@@ -111,10 +111,7 @@ public class LogDispatcher {
                 pair.left,
                 n ->
                     IoTDBThreadPoolFactory.newCachedThreadPool(
-                        "LogDispatcher-"
-                            + member.getName()
-                            + "-"
-                            + ClusterUtils.nodeToString(pair.left)))
+                        "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
             .submit(newDispatcherThread(pair.left, pair.right));
       }
     }
@@ -135,11 +132,13 @@ public class LogDispatcher {
   }
 
   protected boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue, SendLogRequest request) {
+    long operationStartTime = LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.getOperationStartTime();
     if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
       long waitStart = System.currentTimeMillis();
       long waitTime = 1;
       while (System.currentTimeMillis() - waitStart < clusterConfig.getConnectionTimeoutInMS()) {
         if (nodeLogQueue.add(request)) {
+          LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.calOperationCostTimeFromStart(operationStartTime);
           return true;
         } else {
           try {
@@ -150,9 +149,12 @@ public class LogDispatcher {
           }
         }
       }
+      LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.calOperationCostTimeFromStart(operationStartTime);
       return false;
     } else {
-      return nodeLogQueue.add(request);
+      boolean added = nodeLogQueue.add(request);
+      LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.calOperationCostTimeFromStart(operationStartTime);
+      return added;
     }
   }
 
@@ -209,31 +211,20 @@ public class LogDispatcher {
   public static class SendLogRequest {
 
     private VotingLog votingLog;
-    private AtomicBoolean leaderShipStale;
-    private AtomicLong newLeaderTerm;
     private AppendEntryRequest appendEntryRequest;
     private long enqueueTime;
     private Future<ByteBuffer> serializedLogFuture;
     private int quorumSize;
     private boolean isVerifier;
 
-    public SendLogRequest(
-        VotingLog log,
-        AtomicBoolean leaderShipStale,
-        AtomicLong newLeaderTerm,
-        AppendEntryRequest appendEntryRequest,
-        int quorumSize) {
+    public SendLogRequest(VotingLog log, AppendEntryRequest appendEntryRequest, int quorumSize) {
       this.setVotingLog(log);
-      this.setLeaderShipStale(leaderShipStale);
-      this.setNewLeaderTerm(newLeaderTerm);
       this.setAppendEntryRequest(appendEntryRequest);
       this.setQuorumSize(quorumSize);
     }
 
     public SendLogRequest(SendLogRequest request) {
       this.setVotingLog(request.votingLog);
-      this.setLeaderShipStale(request.leaderShipStale);
-      this.setNewLeaderTerm(request.newLeaderTerm);
       this.setAppendEntryRequest(request.appendEntryRequest);
       this.setQuorumSize(request.quorumSize);
       this.setEnqueueTime(request.enqueueTime);
@@ -256,22 +247,6 @@ public class LogDispatcher {
       this.enqueueTime = enqueueTime;
     }
 
-    public AtomicBoolean getLeaderShipStale() {
-      return leaderShipStale;
-    }
-
-    public void setLeaderShipStale(AtomicBoolean leaderShipStale) {
-      this.leaderShipStale = leaderShipStale;
-    }
-
-    public AtomicLong getNewLeaderTerm() {
-      return newLeaderTerm;
-    }
-
-    void setNewLeaderTerm(AtomicLong newLeaderTerm) {
-      this.newLeaderTerm = newLeaderTerm;
-    }
-
     public AppendEntryRequest getAppendEntryRequest() {
       return appendEntryRequest;
     }
@@ -330,7 +305,9 @@ public class LogDispatcher {
             SendLogRequest poll = logBlockingDeque.take();
             currBatch.add(poll);
             if (maxBatchSize > 1 && useBatchInLogCatchUp) {
-              logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
+              while (!logBlockingDeque.isEmpty() && currBatch.size() < maxBatchSize) {
+                currBatch.add(logBlockingDeque.take());
+              }
             }
           }
           if (logger.isDebugEnabled()) {
@@ -472,8 +449,6 @@ public class LogDispatcher {
           sendLogs(currBatch);
         } else {
           for (SendLogRequest batch : currBatch) {
-            Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
-                batch.getVotingLog().getLog().getCreateTime());
             sendLog(batch);
           }
         }
@@ -485,11 +460,7 @@ public class LogDispatcher {
     void sendLogSync(SendLogRequest logRequest) {
       AppendNodeEntryHandler handler =
           member.getAppendNodeEntryHandler(
-              logRequest.getVotingLog(),
-              receiver,
-              logRequest.leaderShipStale,
-              logRequest.newLeaderTerm,
-              logRequest.quorumSize);
+              logRequest.getVotingLog(), receiver, logRequest.quorumSize);
       // TODO add async interface
 
       int retries = 5;
@@ -535,11 +506,7 @@ public class LogDispatcher {
     private void sendLogAsync(SendLogRequest logRequest) {
       AppendNodeEntryHandler handler =
           member.getAppendNodeEntryHandler(
-              logRequest.getVotingLog(),
-              receiver,
-              logRequest.leaderShipStale,
-              logRequest.newLeaderTerm,
-              logRequest.quorumSize);
+              logRequest.getVotingLog(), receiver, logRequest.quorumSize);
 
       AsyncClient client = member.getAsyncClient(receiver);
       if (client != null) {
@@ -552,6 +519,8 @@ public class LogDispatcher {
     }
 
     void sendLog(SendLogRequest logRequest) {
+      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+          logRequest.getVotingLog().getLog().getCreateTime());
       if (logger.isDebugEnabled()) {
         Thread.currentThread()
             .setName(baseName + "-" + logRequest.getVotingLog().getLog().getCurrLogIndex());
@@ -586,11 +555,7 @@ public class LogDispatcher {
         for (SendLogRequest sendLogRequest : batch) {
           AppendNodeEntryHandler handler =
               member.getAppendNodeEntryHandler(
-                  sendLogRequest.getVotingLog(),
-                  receiver,
-                  sendLogRequest.getLeaderShipStale(),
-                  sendLogRequest.getNewLeaderTerm(),
-                  sendLogRequest.getQuorumSize());
+                  sendLogRequest.getVotingLog(), receiver, sendLogRequest.getQuorumSize());
           singleEntryHandlers.add(handler);
         }
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index fde20d18bf..398a80eda9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -20,22 +20,24 @@
 package org.apache.iotdb.cluster.log;
 
 import org.apache.iotdb.cluster.exception.LogExecutionException;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER;
 
@@ -47,40 +49,65 @@ public class VotingLogList {
   private int quorumSize;
   private RaftMember member;
   private Map<Integer, Long> stronglyAcceptedIndices = new ConcurrentHashMap<>();
-  private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+  private final ExecutorService service = Executors.newSingleThreadExecutor();
+  private AtomicLong newCommitIndex = new AtomicLong(-1);
 
   public VotingLogList(int quorumSize, RaftMember member) {
     this.quorumSize = quorumSize;
     this.member = member;
-    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
-        service,
+    service.submit(
         () -> {
-          long newCommitIndex = computeNewCommitIndex();
-          if (newCommitIndex > member.getLogManager().getCommitLogIndex()) {
-            synchronized (member.getLogManager()) {
-              long operationStartTime = RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
-              try {
-                member.getLogManager().commitTo(newCommitIndex);
-              } catch (LogExecutionException e) {
-                logger.error("Fail to commit {}", newCommitIndex, e);
+          try {
+            while (true) {
+              if (!tryCommit()) {
+                synchronized (newCommitIndex) {
+                  newCommitIndex.wait(1);
+                }
               }
-              RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
             }
+          } catch (Exception e) {
+            logger.error("Unexpected exception when updating commit index", e);
           }
-        },
-        0,
-        1,
-        TimeUnit.MILLISECONDS);
+        });
+  }
+
+  private boolean tryCommit() {
+    RaftLogManager logManager = member.getLogManager();
+
+    List<Log> entries = Collections.emptyList();
+    if (computeNewCommitIndex()
+        && logManager != null
+        && newCommitIndex.get() > logManager.getCommitLogIndex()) {
+      long start = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+      synchronized (logManager) {
+        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+            start);
+        long operationStartTime = RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+        try {
+          logManager.commitTo(newCommitIndex.get());
+        } catch (LogExecutionException e) {
+          logger.error("Fail to commit {}", newCommitIndex, e);
+        }
+        RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
+      }
+
+      return true;
+    } else {
+      return false;
+    }
   }
 
-  private long computeNewCommitIndex() {
+  public boolean computeNewCommitIndex() {
     List<Entry<Integer, Long>> nodeIndices = new ArrayList<>(stronglyAcceptedIndices.entrySet());
     if (nodeIndices.size() < quorumSize) {
-      return -1;
+      return false;
     }
     nodeIndices.sort(Entry.comparingByValue());
-    return nodeIndices.get(quorumSize - 1).getValue();
+    Long value = nodeIndices.get(quorumSize - 1).getValue();
+    long oldValue = newCommitIndex.getAndUpdate(oldV -> Math.max(value, oldV));
+    return value > oldValue;
   }
+
   /**
    * When an entry of index-term is strongly accepted by a node of acceptingNodeId, record the id in
    * all entries whose index <= the accepted entry. If any entry is accepted by a quorum, remove it
@@ -88,22 +115,31 @@ public class VotingLogList {
    *
    * @param index
    * @param term
-   * @param acceptingNodeId
+   * @param acceptingNode
    * @param signature
    * @return the lastly removed entry if any.
    */
   public void onStronglyAccept(long index, long term, Node acceptingNode, ByteBuffer signature) {
     logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNode);
 
-    stronglyAcceptedIndices.compute(
-        acceptingNode.nodeIdentifier,
-        (nid, idx) -> {
-          if (idx == null) {
-            return index;
-          } else {
-            return Math.max(index, idx);
-          }
-        });
+    Long newIndex =
+        stronglyAcceptedIndices.compute(
+            acceptingNode.nodeIdentifier,
+            (nid, oldIndex) -> {
+              if (oldIndex == null) {
+                return index;
+              } else {
+                if (index > oldIndex) {
+                  return index;
+                }
+                return oldIndex;
+              }
+            });
+    if (newIndex == index) {
+      synchronized (newCommitIndex) {
+        newCommitIndex.notifyAll();
+      }
+    }
   }
 
   public int totalAcceptedNodeNum(VotingLog log) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index a717bf4543..97a621de04 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
@@ -34,7 +35,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.db.service.IoTDB;
 
 import org.slf4j.Logger;
@@ -168,8 +168,6 @@ public class AsyncDataLogApplier implements LogApplier {
     } else if (plan instanceof CreateTimeSeriesPlan) {
       PartialPath path = ((CreateTimeSeriesPlan) plan).getPath();
       sgPath = IoTDB.schemaProcessor.getBelongedStorageGroup(path);
-    } else if (plan instanceof DummyPlan) {
-      sgPath = new PartialPath("dummy", false);
     }
     return sgPath;
   }
@@ -217,7 +215,9 @@ public class AsyncDataLogApplier implements LogApplier {
 
   private class DataLogConsumer implements Runnable, Consumer<Log> {
 
-    private BlockingQueue<Log> logQueue = new ArrayBlockingQueue<>(4096);
+    private BlockingQueue<Log> logQueue =
+        new ArrayBlockingQueue<>(
+            ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
     private volatile long lastLogIndex;
     private volatile long lastAppliedLogIndex;
     private String name;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 1a22b26a5d..eec035dc46 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -656,16 +656,15 @@ public abstract class RaftLogManager {
     }
 
     startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+    for (Log entry : entries) {
+      Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.calOperationCostTimeFromStart(
+          entry.getCreateTime());
+    }
     try {
       // Operations here are so simple that the execution could be thought
       // success or fail together approximately.
       // TODO: make it real atomic
       getCommittedEntryManager().append(entries);
-      for (Log entry : entries) {
-        synchronized (entry) {
-          entry.notifyAll();
-        }
-      }
       Log lastLog = entries.get(entries.size() - 1);
       getUnCommittedEntryManager().stableTo(lastLog.getCurrLogIndex());
       commitIndex = lastLog.getCurrLogIndex();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index a6fda19576..d2acf0fde9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -42,8 +42,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_SEQUENCE_LOG;
 
@@ -76,12 +74,8 @@ public class AsynchronousSequencer implements LogSequencer {
 
   public SendLogRequest enqueueSendLogRequest(Log log) {
     VotingLog votingLog = member.buildVotingLog(log);
-    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
 
-    SendLogRequest request =
-        new SendLogRequest(
-            votingLog, leaderShipStale, newLeaderTerm, null, member.getAllNodes().size() / 2);
+    SendLogRequest request = new SendLogRequest(votingLog, null, member.getAllNodes().size() / 2);
     try {
       if (!unsequencedLogQueue.offer(
           request,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index f44a10a03e..c1e35aa4a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -34,8 +34,6 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
@@ -71,6 +69,8 @@ public class SynchronousSequencer implements LogSequencer {
 
     while (true) {
       synchronized (logManager) {
+        long occupyStart =
+            Statistic.RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND.getOperationStartTime();
         if (!IoTDBDescriptor.getInstance().getConfig().isEnableMemControl()
             || (logManager.getLastLogIndex() - logManager.getCommitLogIndex()
                 <= ClusterDescriptor.getInstance()
@@ -104,6 +104,8 @@ public class SynchronousSequencer implements LogSequencer {
               && ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance())) {
             sendLogRequest = enqueueEntry(sendLogRequest);
           }
+          Statistic.RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND.calOperationCostTimeFromStart(
+              occupyStart);
           break;
         }
         try {
@@ -116,6 +118,9 @@ public class SynchronousSequencer implements LogSequencer {
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
+
+        Statistic.RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND.calOperationCostTimeFromStart(
+            occupyStart);
       }
     }
 
@@ -134,19 +139,12 @@ public class SynchronousSequencer implements LogSequencer {
 
   private SendLogRequest buildSendLogRequest(Log log) {
     VotingLog votingLog = member.buildVotingLog(log);
-    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
 
     long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
     AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
     Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
 
-    return new SendLogRequest(
-        votingLog,
-        leaderShipStale,
-        newLeaderTerm,
-        appendEntryRequest,
-        member.getAllNodes().size() / 2);
+    return new SendLogRequest(votingLog, appendEntryRequest, member.getAllNodes().size() / 2);
   }
 
   public static class Factory implements LogSequencerFactory {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index c9a97388d4..cc4684481e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -32,8 +32,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
@@ -52,9 +52,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
 
   protected RaftMember member;
-  protected AtomicLong receiverTerm;
   protected VotingLog log;
-  protected AtomicBoolean leaderShipStale;
   protected Node directReceiver;
   protected int quorumSize;
 
@@ -68,6 +66,9 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     }
   }
 
+  // TODO-remove
+  private static Map<Long, Integer> entryAcceptedTimes = new ConcurrentHashMap<>();
+
   @Override
   public void onComplete(AppendEntryResult response) {
     if (Timer.ENABLE_INSTRUMENTING) {
@@ -81,10 +82,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
 
     logger.debug(
         "{}: Append response {} from {} for log {}", member.getName(), response, trueReceiver, log);
-    if (leaderShipStale.get()) {
-      // someone has rejected this log because the leadership is stale
-      return;
-    }
 
     long resp = response.status;
 
@@ -96,22 +93,33 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
               log.getLog().getCurrLogTerm(),
               trueReceiver,
               response.signature);
+      Integer count =
+          entryAcceptedTimes.compute(
+              log.getLog().getCurrLogIndex(),
+              (index, cnt) -> {
+                if (cnt == null) {
+                  cnt = 1;
+                } else {
+                  cnt = cnt + 1;
+                }
+                return cnt;
+              });
+      if (count == quorumSize) {
+        Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+            log.getLog().getCreateTime());
+      }
+
       member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex);
     } else if (resp > 0) {
       // a response > 0 is the follower's term
       // the leadership is stale, wait for the new leader's heartbeat
-      long prevReceiverTerm = receiverTerm.get();
       logger.debug(
-          "{}: Received a rejection from {} because term is stale: {}/{}, log: {}",
+          "{}: Received a rejection from {} because term is stale: {}, log: {}",
           member.getName(),
           trueReceiver,
-          prevReceiverTerm,
           resp,
           log);
-      if (resp > prevReceiverTerm) {
-        receiverTerm.set(resp);
-      }
-      leaderShipStale.set(true);
+      member.stepDown(resp, false);
       synchronized (log) {
         log.notifyAll();
       }
@@ -186,18 +194,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     this.member = member;
   }
 
-  public void setLeaderShipStale(AtomicBoolean leaderShipStale) {
-    this.leaderShipStale = leaderShipStale;
-  }
-
   public void setDirectReceiver(Node follower) {
     this.directReceiver = follower;
   }
 
-  public void setReceiverTerm(AtomicLong receiverTerm) {
-    this.receiverTerm = receiverTerm;
-  }
-
   public void setQuorumSize(int quorumSize) {
     this.quorumSize = quorumSize;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 6852499ff4..7985189b81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -132,7 +132,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -739,17 +738,10 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   public void sendLogAsync(
-      VotingLog log,
-      Node node,
-      AtomicBoolean leaderShipStale,
-      AtomicLong newLeaderTerm,
-      AppendEntryRequest request,
-      int quorumSize,
-      boolean isVerifier) {
+      VotingLog log, Node node, AppendEntryRequest request, int quorumSize, boolean isVerifier) {
     AsyncClient client = getSendLogAsyncClient(node);
     if (client != null) {
-      AppendNodeEntryHandler handler =
-          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
+      AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log, node, quorumSize);
       try {
         client.appendEntry(request, isVerifier, handler);
         logger.debug("{} sending a log to {}: {}", name, node, log);
@@ -1251,12 +1243,8 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     try {
       AppendLogResult appendLogResult =
-          waitAppendResult(
-              sendLogRequest.getVotingLog(),
-              sendLogRequest.getLeaderShipStale(),
-              sendLogRequest.getNewLeaderTerm(),
-              sendLogRequest.getQuorumSize());
-      Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+          waitAppendResult(sendLogRequest.getVotingLog(), sendLogRequest.getQuorumSize());
+      Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_END.calOperationCostTimeFromStart(
           sendLogRequest.getVotingLog().getLog().getCreateTime());
       long startTime;
       switch (appendLogResult) {
@@ -1296,15 +1284,12 @@ public abstract class RaftMember implements RaftMemberMBean {
 
   public SendLogRequest buildSendLogRequest(Log log) {
     VotingLog votingLog = buildVotingLog(log);
-    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(term.get());
 
     long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
     AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
     Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
 
-    return new SendLogRequest(
-        votingLog, leaderShipStale, newLeaderTerm, appendEntryRequest, allNodes.size() / 2);
+    return new SendLogRequest(votingLog, appendEntryRequest, allNodes.size() / 2);
   }
 
   public VotingLog buildVotingLog(Log log) {
@@ -1711,6 +1696,9 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   private boolean canBeWeaklyAccepted(Log log) {
+    if (log instanceof FragmentedLog) {
+      return true;
+    }
     if (!(log instanceof RequestLog)) {
       return false;
     }
@@ -1726,6 +1714,8 @@ public abstract class RaftMember implements RaftMemberMBean {
   @SuppressWarnings({"java:S2445"}) // safe synchronized
   private void waitAppendResultLoop(VotingLog log, int quorumSize) {
     int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+    int weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+    int stronglyAccepted = totalAccepted - weaklyAccepted;
     long nextTimeToPrint = 5000;
 
     long waitStart = System.nanoTime();
@@ -1736,7 +1726,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     synchronized (log.getLog()) {
       while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
           || (!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-                      && getCommitIndex() < log.getLog().getCurrLogIndex()
+                      && stronglyAccepted < quorumSize
                   || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
                       && log.getSignatures().size()
                           < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
@@ -1768,6 +1758,8 @@ public abstract class RaftMember implements RaftMemberMBean {
           nextTimeToPrint *= 2;
         }
         totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+        weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+        stronglyAccepted = totalAccepted - weaklyAccepted;
       }
     }
     if (logger.isDebugEnabled()) {
@@ -1787,15 +1779,16 @@ public abstract class RaftMember implements RaftMemberMBean {
    * wait until "voteCounter" counts down to zero, which means the quorum has received the log, or
    * one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
    */
-  protected AppendLogResult waitAppendResult(
-      VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
+  protected AppendLogResult waitAppendResult(VotingLog log, int quorumSize) {
     // wait for the followers to vote
     long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
     int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+    int weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+    int stronglyAccepted = totalAccepted - weaklyAccepted;
 
     if (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
         || ((!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-                    && log.getLog().getCurrLogIndex() > getCommitIndex()
+                    && stronglyAccepted < quorumSize
                 || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
                     && log.getSignatures().size()
                         < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
@@ -1805,6 +1798,8 @@ public abstract class RaftMember implements RaftMemberMBean {
       waitAppendResultLoop(log, quorumSize);
     }
     totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+    weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+    stronglyAccepted = totalAccepted - weaklyAccepted;
 
     if (log.acceptedTime.get() != 0) {
       Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
@@ -1812,8 +1807,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
 
     // a node has a larger term than the local node, so this node is no longer a valid leader
-    if (leaderShipStale.get()) {
-      stepDown(newLeaderTerm.get(), false);
+    if (term.get() != log.getLog().getCurrLogTerm()) {
       return AppendLogResult.LEADERSHIP_STALE;
     }
     // the node knows it is no long the leader from other requests
@@ -1821,12 +1815,12 @@ public abstract class RaftMember implements RaftMemberMBean {
       return AppendLogResult.LEADERSHIP_STALE;
     }
 
-    if (totalAccepted >= quorumSize && log.getLog().getCurrLogIndex() > getCommitIndex()) {
+    if (totalAccepted >= quorumSize && stronglyAccepted < quorumSize) {
       return AppendLogResult.WEAK_ACCEPT;
     }
 
     // cannot get enough agreements within a certain amount of time
-    if (log.getLog().getCurrLogIndex() > getCommitIndex()) {
+    if (totalAccepted < quorumSize) {
       return AppendLogResult.TIME_OUT;
     }
 
@@ -2092,11 +2086,6 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
     logger.debug("{} sending a log to followers: {}", name, log);
 
-    // if a follower has larger term than this node, leaderShipStale will be set to true and
-    // newLeaderTerm will store the follower's term
-    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(term.get());
-
     AppendEntryRequest request = buildAppendEntryRequest(log.getLog(), true);
     log.getFailedNodeIds().clear();
     log.setHasFailed(false);
@@ -2107,9 +2096,7 @@ public abstract class RaftMember implements RaftMemberMBean {
         // follower will not be blocked
         for (Node node : allNodes) {
           appendLogThreadPool.submit(
-              () ->
-                  sendLogToFollower(
-                      log, node, leaderShipStale, newLeaderTerm, request, quorumSize, false));
+              () -> sendLogToFollower(log, node, request, quorumSize, false));
           if (character != NodeCharacter.LEADER) {
             return AppendLogResult.LEADERSHIP_STALE;
           }
@@ -2118,7 +2105,7 @@ public abstract class RaftMember implements RaftMemberMBean {
         // there is only one member, send to it within this thread to reduce thread switching
         // overhead
         for (Node node : allNodes) {
-          sendLogToFollower(log, node, leaderShipStale, newLeaderTerm, request, quorumSize, false);
+          sendLogToFollower(log, node, request, quorumSize, false);
           if (character != NodeCharacter.LEADER) {
             return AppendLogResult.LEADERSHIP_STALE;
           }
@@ -2130,18 +2117,12 @@ public abstract class RaftMember implements RaftMemberMBean {
       return AppendLogResult.TIME_OUT;
     }
 
-    return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
+    return waitAppendResult(log, quorumSize);
   }
 
   /** Send "log" to "node". */
   public void sendLogToFollower(
-      VotingLog log,
-      Node node,
-      AtomicBoolean leaderShipStale,
-      AtomicLong newLeaderTerm,
-      AppendEntryRequest request,
-      int quorumSize,
-      boolean isVerifier) {
+      VotingLog log, Node node, AppendEntryRequest request, int quorumSize, boolean isVerifier) {
     if (node.equals(thisNode)) {
       return;
     }
@@ -2163,9 +2144,9 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
 
     if (config.isUseAsyncServer()) {
-      sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize, isVerifier);
+      sendLogAsync(log, node, request, quorumSize, isVerifier);
     } else {
-      sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize, isVerifier);
+      sendLogSync(log, node, request, quorumSize, isVerifier);
     }
   }
 
@@ -2198,17 +2179,10 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   private void sendLogSync(
-      VotingLog log,
-      Node node,
-      AtomicBoolean leaderShipStale,
-      AtomicLong newLeaderTerm,
-      AppendEntryRequest request,
-      int quorumSize,
-      boolean isVerifier) {
+      VotingLog log, Node node, AppendEntryRequest request, int quorumSize, boolean isVerifier) {
     Client client = getSyncClient(node);
     if (client != null) {
-      AppendNodeEntryHandler handler =
-          getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
+      AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log, node, quorumSize);
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -2230,17 +2204,11 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   public AppendNodeEntryHandler getAppendNodeEntryHandler(
-      VotingLog log,
-      Node node,
-      AtomicBoolean leaderShipStale,
-      AtomicLong newLeaderTerm,
-      int quorumSize) {
+      VotingLog log, Node node, int quorumSize) {
     AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
     handler.setDirectReceiver(node);
-    handler.setLeaderShipStale(leaderShipStale);
     handler.setLog(log);
     handler.setMember(this);
-    handler.setReceiverTerm(newLeaderTerm);
     handler.setQuorumSize(quorumSize);
     if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
       registerAppendLogHandler(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 7ded0c910e..9d771b6465 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -34,7 +34,8 @@ public class Timer {
 
   private static final Logger logger = LoggerFactory.getLogger(Timer.class);
 
-  public static final boolean ENABLE_INSTRUMENTING = true;
+  public static final boolean ENABLE_INSTRUMENTING =
+      ClusterDescriptor.getInstance().getConfig().isEnableInstrumenting();
 
   private static final String COORDINATOR = "Coordinator";
   private static final String META_GROUP_MEMBER = "Meta group member";
@@ -101,6 +102,12 @@ public class Timer {
         TIME_SCALE,
         RaftMember.USE_LOG_DISPATCHER,
         DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND(
+        RAFT_MEMBER_SENDER,
+        "occupy log manager in append",
+        TIME_SCALE,
+        RaftMember.USE_LOG_DISPATCHER,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
     RAFT_SENDER_APPEND_LOG_V2(
         RAFT_MEMBER_SENDER,
         "locally append log",
@@ -239,12 +246,6 @@ public class Timer {
         RAFT_MEMBER_SENDER, "in apply queue", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
     RAFT_SENDER_DATA_LOG_APPLY(
         RAFT_MEMBER_SENDER, "apply data log", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
-    RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT(
-        RAFT_MEMBER_SENDER,
-        "log from create to accept",
-        TIME_SCALE,
-        RaftMember.USE_LOG_DISPATCHER,
-        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
     // raft member - receiver
     RAFT_RECEIVER_LOG_PARSE(
         RAFT_MEMBER_RECEIVER, "log parse", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
@@ -282,6 +283,12 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_LOG_ENQUEUE_SINGLE(
+        LOG_DISPATCHER,
+        "enqueue (single)",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_LOG_IN_QUEUE(
         LOG_DISPATCHER,
         "in queue",
@@ -320,6 +327,24 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT(
+        LOG_DISPATCHER,
+        "from create to accept",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
+        LOG_DISPATCHER,
+        "from create to commit",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_END(
+        LOG_DISPATCHER,
+        "from create to wait end",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_CREATE_TO_OK(
         LOG_DISPATCHER,
         "from create to OK",
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 02e8703e64..87e2d1475b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -389,6 +389,6 @@ public class ClusterUtils {
   }
 
   public static String nodeToString(Node node) {
-    return node.getInternalIp() + "-" + node.getMetaPort();
+    return node.getInternalIp() + ":" + node.getMetaPort();
   }
 }