You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/07 08:27:30 UTC

[iotdb] branch native_raft updated: fix issues

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 7aca6e85c0 fix issues
7aca6e85c0 is described below

commit 7aca6e85c098d1b643e312ee36b7037409497e91
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Mar 7 16:29:17 2023 +0800

    fix issues
---
 .../common/request/IConsensusRequest.java          |  14 +
 .../iotdb/consensus/natraft/RaftConsensus.java     |  23 +-
 .../natraft/client/AsyncRaftServiceClient.java     |   3 +-
 .../consensus/natraft/client/GenericHandler.java   |   2 +-
 .../natraft/client/SyncClientAdaptor.java          |   3 +-
 .../consensus/natraft/protocol/RaftConfig.java     |   4 +-
 .../consensus/natraft/protocol/RaftMember.java     | 148 +++----
 .../protocol/heartbeat/ElectionRespHandler.java    |  12 +-
 .../natraft/protocol/heartbeat/ElectionState.java  |  30 +-
 .../protocol/heartbeat/HeartbeatThread.java        |  35 +-
 .../consensus/natraft/protocol/log/Entry.java      |  31 +-
 .../natraft/protocol/log/VotingEntry.java          |   8 +-
 .../protocol/log/appender/BlockingLogAppender.java |  44 +-
 .../log/appender/SlidingWindowLogAppender.java     |  16 +-
 .../protocol/log/applier/AsyncLogApplier.java      |  71 ++--
 .../natraft/protocol/log/applier/BaseApplier.java  |   5 +
 .../natraft/protocol/log/applier/LogApplier.java   |   3 +
 .../log/dispatch/AppendNodeEntryHandler.java       |   7 +-
 .../protocol/log/dispatch/LogDispatcher.java       |  51 +--
 .../protocol/log/dispatch/VotingLogList.java       |  35 +-
 .../protocol/log/logtype/ConfigChangeEntry.java    |   5 +-
 .../manager/DirectorySnapshotRaftLogManager.java   |   2 +-
 .../protocol/log/manager/RaftLogManager.java       |  18 +-
 .../log/sequencing/LogSequencerFactory.java        |   1 -
 .../log/sequencing/SynchronousSequencer.java       |   4 +-
 .../protocol/log/snapshot/DirectorySnapshot.java   |   8 +-
 .../natraft/protocol/log/snapshot/Snapshot.java    |   8 +-
 .../natraft/service/RaftRPCServiceProcessor.java   |  12 +-
 .../iotdb/consensus/natraft/utils/LogUtils.java    |  11 +-
 .../iotdb/consensus/natraft/utils/NodeUtils.java   |  23 +-
 .../iotdb/consensus/natraft/utils/StatusUtils.java |   2 -
 .../iotdb/consensus/natraft/utils/Timer.java       | 447 +++++++++++++++++++++
 .../plan/planner/plan/node/write/InsertNode.java   |   5 +
 33 files changed, 808 insertions(+), 283 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index f2729471d5..f18fd7413d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -19,9 +19,12 @@
 
 package org.apache.iotdb.consensus.common.request;
 
+import org.apache.iotdb.commons.path.PartialPath;
+
 import java.nio.ByteBuffer;
 
 public interface IConsensusRequest {
+
   /**
    * Serialize all the data to a ByteBuffer.
    *
@@ -39,4 +42,15 @@ public interface IConsensusRequest {
   default long estimateSize() {
     return 0;
   }
+
+  /**
+   * If two requests returns the same conflictKey or one of them returns null, they cannot be
+   * executed in parallel in the same region. Otherwise, the two requests with different
+   * conflictKeys can be executed in parallel.
+   *
+   * @return a conflict key identifying requests that cannot be executed in parallel.
+   */
+  default PartialPath conflictKey() {
+    return null;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 8c3231d21c..a07afb1e97 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.Flow
 import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
 import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
 import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -102,6 +103,12 @@ public class RaftConsensus implements IConsensus {
     } catch (StartupException e) {
       throw new IOException(e);
     }
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  logger.info(Timer.Statistic.getReport());
+                }));
   }
 
   private void initAndRecover() throws IOException {
@@ -185,8 +192,8 @@ public class RaftConsensus implements IConsensus {
     stateMachineMap.remove(groupId);
   }
 
-  public boolean createNewMemberIfAbsent(ConsensusGroupId groupId, Peer thisPeer,
-      List<Peer> peers, List<Peer> newPeers) {
+  public boolean createNewMemberIfAbsent(
+      ConsensusGroupId groupId, Peer thisPeer, List<Peer> peers, List<Peer> newPeers) {
     AtomicBoolean exist = new AtomicBoolean(true);
     stateMachineMap.computeIfAbsent(
         groupId,
@@ -199,8 +206,15 @@ public class RaftConsensus implements IConsensus {
           }
           RaftMember impl =
               new RaftMember(
-                  path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId),
-                  clientManager, this::onMemberRemoved);
+                  path,
+                  config,
+                  thisPeer,
+                  peers,
+                  newPeers,
+                  groupId,
+                  registry.apply(groupId),
+                  clientManager,
+                  this::onMemberRemoved);
           impl.start();
           return impl;
         });
@@ -348,7 +362,6 @@ public class RaftConsensus implements IConsensus {
     return stateMachineMap.get(groupId);
   }
 
-
   public int getThisNodeId() {
     return thisNodeId;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
index 2366d0e886..43f70d2b04 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
@@ -92,7 +92,8 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
   @Override
   public void onError(Exception e) {
     if (e.getCause() instanceof NoMemberException
-        || e instanceof TApplicationException && e.getMessage().contains("No such member")) {
+        || e instanceof TApplicationException
+            && (e.getMessage() != null && e.getMessage().contains("No such member"))) {
       logger.debug(e.getMessage());
       ___currentMethod = null;
       returnSelf();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java
index f5b3370ac7..d3f77dd23e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java
@@ -71,7 +71,7 @@ public class GenericHandler<T> implements AsyncMethodCallback<T> {
     while (elapsedTime < timeout) {
       if (result.get() == null && getException() == null) {
         synchronized (result) {
-          result.wait(1000);
+          result.wait(1);
         }
       } else {
         break;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index a5be4ae46a..82196bf9fa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -83,8 +83,7 @@ public class SyncClientAdaptor {
     return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
   }
 
-  public static TSStatus forceElection(
-      AsyncRaftServiceClient client, ConsensusGroupId groupId)
+  public static TSStatus forceElection(AsyncRaftServiceClient client, ConsensusGroupId groupId)
       throws TException, InterruptedException {
     GenericHandler<TSStatus> matchTermHandler = new GenericHandler<>(client.getEndpoint());
     client.forceElection(groupId.convertToTConsensusGroupId(), matchTermHandler);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index b29d51c951..78fb556cbd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -31,14 +31,14 @@ import java.util.concurrent.TimeUnit;
 
 public class RaftConfig {
 
-  private boolean enableWeakAcceptance = true;
+  private boolean enableWeakAcceptance = false;
   private int maxNumOfLogsInMem = 10000;
   private int minNumOfLogsInMem = 1000;
   private long maxMemorySizeForRaftLog = 512 * 1024 * 1024L;
   private int logDeleteCheckIntervalSecond = 1;
   private boolean enableRaftLogPersistence = true;
   private int catchUpTimeoutMS = 60_000;
-  private boolean useFollowerSlidingWindow = true;
+  private boolean useFollowerSlidingWindow = false;
   private int uncommittedRaftLogNumForRejectThreshold = 10000;
   private int heartbeatIntervalMs = 1000;
   private int electionTimeoutMs = 20_000;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 9ac0f1ed86..77a199e515 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1,29 +1,27 @@
 /*
 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
-
-
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+
+
+*/
 
 package org.apache.iotdb.consensus.natraft.protocol;
 
-import java.util.Collections;
-import java.util.function.Consumer;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -78,6 +76,7 @@ import org.apache.iotdb.consensus.natraft.utils.LogUtils;
 import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
 import org.apache.iotdb.consensus.natraft.utils.Response;
 import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -100,6 +99,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -107,6 +107,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 public class RaftMember {
 
@@ -132,9 +133,7 @@ public class RaftMember {
    */
   private final Object waitLeaderCondition = new Object();
 
-  /**
-   * the lock is to make sure that only one thread can apply snapshot at the same time
-   */
+  /** the lock is to make sure that only one thread can apply snapshot at the same time */
   private final Lock snapshotApplyLock = new ReentrantLock();
   /**
    * when the commit progress is updated by a heartbeat, this object is notified so that we may know
@@ -143,10 +142,9 @@ public class RaftMember {
   private final Object syncLock = new Object();
 
   protected Peer thisNode;
-  /**
-   * the nodes that belong to the same raft group as thisNode.
-   */
+  /** the nodes that belong to the same raft group as thisNode. */
   protected volatile List<Peer> allNodes;
+
   protected volatile List<Peer> newNodes;
 
   protected ConsensusGroupId groupId;
@@ -155,9 +153,7 @@ public class RaftMember {
 
   protected RaftStatus status = new RaftStatus();
 
-  /**
-   * the raft logs are all stored and maintained in the log manager
-   */
+  /** the raft logs are all stored and maintained in the log manager */
   protected RaftLogManager logManager;
 
   protected HeartbeatThread heartbeatThread;
@@ -177,15 +173,12 @@ public class RaftMember {
   protected IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
 
   protected CatchUpManager catchUpManager;
-  /**
-   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
-   */
+  /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
   private ExecutorService commitLogPool;
 
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
-   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
-   * logs.
+   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
    */
   private volatile LogDispatcher logDispatcher;
 
@@ -209,7 +202,6 @@ public class RaftMember {
       Consumer<ConsensusGroupId> onRemove) {
     this.config = config;
     this.storageDir = storageDir;
-    initConfig();
 
     this.thisNode = thisNode;
     this.allNodes = allNodes;
@@ -232,9 +224,7 @@ public class RaftMember {
     this.stateMachine = stateMachine;
 
     this.votingLogList = new VotingLogList(this);
-    this.logAppender = appenderFactory.create(this, config);
-    this.logSequencer = SEQUENCER_FACTORY.create(this, config);
-    this.logDispatcher = new LogDispatcher(this, config);
+    votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
     this.heartbeatReqHandler = new HeartbeatReqHandler(this);
     this.electionReqHandler = new ElectionReqHandler(this);
     this.logManager =
@@ -245,6 +235,9 @@ public class RaftMember {
             stateMachine,
             config,
             this::examineUnappliedEntry);
+    this.logAppender = appenderFactory.create(this, config);
+    this.logSequencer = SEQUENCER_FACTORY.create(this, config);
+    this.logDispatcher = new LogDispatcher(this, config);
     this.onRemove = onRemove;
 
     initPeerMap();
@@ -282,7 +275,7 @@ public class RaftMember {
   private void examineUnappliedEntry(List<Entry> entries) {
     ConfigChangeEntry configChangeEntry = null;
     for (Entry entry : entries) {
-      if (entry instanceof  ConfigChangeEntry) {
+      if (entry instanceof ConfigChangeEntry) {
         configChangeEntry = (ConfigChangeEntry) entry;
       }
     }
@@ -362,10 +355,6 @@ public class RaftMember {
     }
   }
 
-  private void initConfig() {
-    votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
-  }
-
   public void initPeerMap() {
     status.peerMap = new ConcurrentHashMap<>();
     for (Peer peer : allNodes) {
@@ -509,9 +498,7 @@ public class RaftMember {
     return appendEntriesInternal(request);
   }
 
-  /**
-   * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
-   */
+  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
       throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntriesRequest", name);
@@ -588,7 +575,7 @@ public class RaftMember {
 
   private boolean checkLogSize(Entry entry) {
     return !config.isEnableRaftLogPersistence()
-        || entry.serialize().capacity() + Integer.BYTES < config.getRaftLogBufferSize();
+        || entry.estimateSize() < config.getRaftLogBufferSize();
   }
 
   public boolean isReadOnly() {
@@ -635,11 +622,12 @@ public class RaftMember {
       } else if (request != null) {
         return forwardRequest(request, leader.getEndpoint(), leader.getGroupId());
       } else {
-        return new TSStatus().setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
+        return new TSStatus()
+            .setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
             .setRedirectNode(leader.getEndpoint());
       }
     }
-    return StatusUtils.OK;
+    return null;
   }
 
   public TSStatus processRequest(IConsensusRequest request) {
@@ -648,12 +636,13 @@ public class RaftMember {
     }
 
     TSStatus tsStatus = ensureLeader(request);
-    if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+    if (tsStatus != null) {
       return tsStatus;
     }
 
     logger.debug("{}: Processing request {}", name, request);
     Entry entry = new RequestEntry(request);
+    entry.receiveTime = System.nanoTime();
 
     // just like processPlanLocally,we need to check the size of log
     if (!checkLogSize(entry)) {
@@ -665,6 +654,9 @@ public class RaftMember {
 
     // assign term and index to the new log and append it
     VotingEntry votingEntry = logSequencer.sequence(entry);
+    entry.createTime = System.nanoTime();
+    Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.add(entry.createTime - entry.receiveTime);
+
     if (config.isUseFollowerLoadBalance()) {
       FlowMonitorManager.INSTANCE.report(thisNode, entry.estimateSize());
     }
@@ -690,6 +682,9 @@ public class RaftMember {
         }
       }
     }
+    entry.waitEndTime = System.nanoTime();
+    Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
+        entry.waitEndTime - entry.createTime);
     if (entry.getException() != null) {
       throw new LogExecutionException(entry.getException());
     }
@@ -697,14 +692,20 @@ public class RaftMember {
 
   private TSStatus includeLogNumbersInStatus(TSStatus status, Entry entry) {
     return status.setMessage(
-        getRaftGroupId() + "-" + entry.getCurrLogIndex() + "-" + entry.getCurrLogTerm());
+        getRaftGroupId().getType().ordinal()
+            + "-"
+            + getRaftGroupId().getId()
+            + "-"
+            + entry.getCurrLogIndex()
+            + "-"
+            + entry.getCurrLogTerm());
   }
 
   protected AppendLogResult waitAppendResult(VotingEntry votingEntry) {
     // wait for the followers to vote
 
     AcceptedType acceptedType = votingLogList.computeAcceptedType(votingEntry);
-    if (votingLogList.computeAcceptedType(votingEntry) == AcceptedType.NOT_ACCEPTED) {
+    if (acceptedType == AcceptedType.NOT_ACCEPTED) {
       acceptedType = waitAppendResultLoop(votingEntry);
     }
 
@@ -760,15 +761,16 @@ public class RaftMember {
     }
     long waitTime = 1;
     AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
-    synchronized (log.getEntry()) {
-      while (votingLogList.computeAcceptedType(log) == AcceptedType.NOT_ACCEPTED
+    synchronized (log) {
+      while (acceptedType == AcceptedType.NOT_ACCEPTED
           && alreadyWait < config.getWriteOperationTimeoutMS()) {
         try {
-          log.getEntry().wait(waitTime);
+          log.wait(waitTime);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("Unexpected interruption when sending a log", e);
         }
+        acceptedType = votingLogList.computeAcceptedType(log);
         waitTime = waitTime * 2;
 
         alreadyWait = (System.nanoTime() - waitStart) / 1000000;
@@ -851,7 +853,7 @@ public class RaftMember {
    * it.
    *
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *                                   value after timeout
+   *     value after timeout
    */
   protected void waitUntilCatchUp(CheckConsistency checkConsistency)
       throws CheckConsistencyException {
@@ -882,7 +884,7 @@ public class RaftMember {
    * sync local applyId to leader commitId
    *
    * @param leaderCommitId leader commit id
-   * @param fastFail       if enabled, when log differ too much, return false directly.
+   * @param fastFail if enabled, when log differ too much, return false directly.
    */
   public void syncLocalApply(long leaderCommitId, boolean fastFail) {
     long startTime = System.currentTimeMillis();
@@ -927,9 +929,7 @@ public class RaftMember {
         waitedTime);
   }
 
-  /**
-   * Wait until the leader of this node becomes known or time out.
-   */
+  /** Wait until the leader of this node becomes known or time out. */
   public void waitLeader() {
     long startTime = System.currentTimeMillis();
     while (status.leader.get() == null || status.leader.get() == null) {
@@ -966,9 +966,7 @@ public class RaftMember {
     return handler.getResult(config.getConnectionTimeoutInMS());
   }
 
-  /**
-   * @return true if there is a log whose index is "index" and term is "term", false otherwise
-   */
+  /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
   public boolean matchLog(long index, long term) {
     boolean matched = logManager.matchTerm(term, index);
     logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -978,10 +976,10 @@ public class RaftMember {
   /**
    * Forward a non-query plan to a node using the default client.
    *
-   * @param plan    a non-query plan
-   * @param node    cannot be the local node
+   * @param plan a non-query plan
+   * @param node cannot be the local node
    * @param groupId must be set for data group communication, set to null for meta group
-   *                communication
+   *     communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
   public TSStatus forwardRequest(IConsensusRequest plan, TEndPoint node, ConsensusGroupId groupId) {
@@ -1002,13 +1000,14 @@ public class RaftMember {
       this.status.leader.set(null);
       waitLeader();
     }
+    status.setRedirectNode(node);
     return status;
   }
 
   /**
    * Forward a non-query plan to "receiver" using "client".
    *
-   * @param plan    a non-query plan
+   * @param plan a non-query plan
    * @param groupId to determine which DataGroupMember of "receiver" will process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
@@ -1027,10 +1026,10 @@ public class RaftMember {
   public TSStatus forwardPlanAsync(
       IConsensusRequest request,
       TEndPoint receiver,
-      ConsensusGroupId header,
+      ConsensusGroupId groupId,
       AsyncRaftServiceClient client) {
     try {
-      TSStatus tsStatus = SyncClientAdaptor.executeRequest(client, request, header, receiver);
+      TSStatus tsStatus = SyncClientAdaptor.executeRequest(client, request, groupId, receiver);
       if (tsStatus == null) {
         tsStatus = StatusUtils.TIME_OUT;
         logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
@@ -1222,8 +1221,8 @@ public class RaftMember {
     try {
       logManager.getLock().writeLock().lock();
       if (this.newNodes != null) {
-        return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode()).setMessage(
-            "Last configuration change in progress");
+        return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode())
+            .setMessage("Last configuration change in progress");
       }
       ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newNodes);
       Entry lastEntry = logManager.getLastEntry();
@@ -1254,8 +1253,9 @@ public class RaftMember {
 
   private TSStatus waitForEntryResult(VotingEntry votingEntry) {
     try {
-      AppendLogResult appendLogResult =
-          waitAppendResult(votingEntry);
+      AppendLogResult appendLogResult = waitAppendResult(votingEntry);
+      Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_END.add(
+          System.nanoTime() - votingEntry.getEntry().createTime);
       switch (appendLogResult) {
         case WEAK_ACCEPT:
           return includeLogNumbersInStatus(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
index 301cb19ad4..1f3ba68695 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
@@ -28,8 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_AGREE;
 import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_LEADER_STILL_ONLINE;
@@ -50,10 +48,7 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
   private ElectionState electionState;
 
   public ElectionRespHandler(
-      RaftMember raftMember,
-      Peer voter,
-      long currTerm,
-      ElectionState electionState) {
+      RaftMember raftMember, Peer voter, long currTerm, ElectionState electionState) {
     this.raftMember = raftMember;
     this.voter = voter;
     this.currTerm = currTerm;
@@ -71,10 +66,7 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
 
     if (voterResp == RESPONSE_AGREE) {
       electionState.onAccept(voter);
-      logger.info(
-          "{}: Received a grant vote from {}",
-          memberName,
-          voter);
+      logger.info("{}: Received a grant vote from {}", memberName, voter);
       if (electionState.isAccepted()) {
         // the election is valid
         logger.info("{}: Election {} is won", memberName, currTerm);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
index 407252fe2c..bcb2a656b4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
@@ -1,9 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
 
+import org.apache.iotdb.consensus.common.Peer;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.iotdb.consensus.common.Peer;
 
 public class ElectionState {
   private List<Peer> currNodes;
@@ -31,8 +51,8 @@ public class ElectionState {
     if (newNodes != null && newNodes.contains(node)) {
       acceptedNewNodes.add(node);
     }
-    if (acceptedCurrNodes.size() >= currNodes.size() / 2 &&
-        (newNodes == null || (acceptedNewNodes.size() >= newNodes.size() / 2))) {
+    if (acceptedCurrNodes.size() >= currNodes.size() / 2
+        && (newNodes == null || (acceptedNewNodes.size() >= newNodes.size() / 2))) {
       accepted = true;
       synchronized (this) {
         this.notifyAll();
@@ -47,8 +67,8 @@ public class ElectionState {
     if (newNodes != null && newNodes.contains(node)) {
       rejectedNewNodes.add(node);
     }
-    if (rejectedCurrNodes.size() >= currNodes.size() / 2 + 1 &&
-        (newNodes == null || (rejectedNewNodes.size() >= newNodes.size() / 2 + 1))) {
+    if (rejectedCurrNodes.size() >= currNodes.size() / 2 + 1
+        && (newNodes == null || (rejectedNewNodes.size() >= newNodes.size() / 2 + 1))) {
       rejected = true;
       synchronized (this) {
         this.notifyAll();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index 5939f1c577..660e6feff6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
 
-import java.util.Collections;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -37,8 +36,6 @@ import java.util.Collection;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * HeartbeatThread takes the responsibility to send heartbeats (when this node is a leader), check
@@ -168,9 +165,7 @@ public class HeartbeatThread implements Runnable {
     logger.info("{}: End elections", memberName);
   }
 
-  /**
-   * Send each node (except the local node) in the group of the member a heartbeat.
-   */
+  /** Send each node (except the local node) in the group of the member a heartbeat. */
   protected void sendHeartbeats() {
     try {
       localMember.getLogManager().getLock().readLock().lock();
@@ -186,9 +181,7 @@ public class HeartbeatThread implements Runnable {
     sendHeartbeats(localMember.getAllNodes());
   }
 
-  /**
-   * Send each node (except the local node) in list a heartbeat.
-   */
+  /** Send each node (except the local node) in list a heartbeat. */
   @SuppressWarnings("java:S2445")
   private void sendHeartbeats(Collection<Peer> nodes) {
     logger.debug(
@@ -298,13 +291,10 @@ public class HeartbeatThread implements Runnable {
     electionRequest.setLastLogIndex(localMember.getLogManager().getLastLogIndex());
     electionRequest.setGroupId(localMember.getRaftGroupId().convertToTConsensusGroupId());
 
-    ElectionState electionState = new ElectionState(localMember.getAllNodes(),
-        localMember.getNewNodes());
+    ElectionState electionState =
+        new ElectionState(localMember.getAllNodes(), localMember.getNewNodes());
 
-    requestVote(
-        electionState,
-        electionRequest,
-        nextTerm);
+    requestVote(electionState, electionRequest, nextTerm);
 
     try {
       logger.info(
@@ -333,13 +323,10 @@ public class HeartbeatThread implements Runnable {
    * Any against vote will set the flag "electionTerminated" to true and ends the election.
    */
   @SuppressWarnings("java:S2445")
-  private void requestVote(
-      ElectionState electionState,
-      ElectionRequest request,
-      long nextTerm) {
+  private void requestVote(ElectionState electionState, ElectionRequest request, long nextTerm) {
 
-    Collection<Peer> peers = NodeUtils.unionNodes(electionState.getCurrNodes(),
-        electionState.getNewNodes());
+    Collection<Peer> peers =
+        NodeUtils.unionNodes(electionState.getCurrNodes(), electionState.getNewNodes());
     // avoid concurrent modification
     for (Peer node : peers) {
       if (node.equals(localMember.getThisNode())) {
@@ -347,11 +334,7 @@ public class HeartbeatThread implements Runnable {
       }
 
       ElectionRespHandler handler =
-          new ElectionRespHandler(
-              localMember,
-              node,
-              nextTerm,
-              electionState);
+          new ElectionRespHandler(localMember, node, nextTerm, electionState);
       requestVoteAsync(node, handler, request);
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 1748518078..fec0bc7035 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.consensus.natraft.protocol.log;
 
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Objects;
@@ -45,6 +47,14 @@ public abstract class Entry implements Comparable<Entry> {
   private volatile Exception exception;
 
   private long byteSize = 0;
+  private boolean fromThisNode = false;
+
+  public long receiveTime;
+  public long createTime;
+  public long acceptedTime;
+  public long committedTime;
+  public long applyTime;
+  public long waitEndTime;
 
   public int getDefaultSerializationBufferSize() {
     return DEFAULT_SERIALIZATION_BUFFER_SIZE;
@@ -87,9 +97,16 @@ public abstract class Entry implements Comparable<Entry> {
   }
 
   public void setApplied(boolean applied) {
-    synchronized (this) {
-      this.applied = applied;
-      this.notifyAll();
+    this.applied = applied;
+
+    if (createTime != 0) {
+      applyTime = System.nanoTime();
+      Statistic.LOG_DISPATCHER_FROM_CREATE_TO_APPLIED.add(applyTime - createTime);
+    }
+    if (fromThisNode) {
+      synchronized (this) {
+        this.notifyAll();
+      }
     }
   }
 
@@ -142,4 +159,12 @@ public abstract class Entry implements Comparable<Entry> {
   public void setPrevTerm(long prevTerm) {
     this.prevTerm = prevTerm;
   }
+
+  public boolean isFromThisNode() {
+    return fromThisNode;
+  }
+
+  public void setFromThisNode(boolean fromThisNode) {
+    this.fromThisNode = fromThisNode;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
index 30be376f74..0d6cf613bf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
@@ -28,6 +26,8 @@ import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
 
@@ -51,8 +51,8 @@ public class VotingEntry {
       RaftConfig config) {
     this.entry = entry;
     if (config.isUseFollowerSlidingWindow()) {
-      weaklyAcceptedNodes = new HashSet<>(
-          currNodes.size() + (newNodes != null ? newNodes.size() : 0));
+      weaklyAcceptedNodes =
+          new HashSet<>(currNodes.size() + (newNodes != null ? newNodes.size() : 0));
     }
     this.setAppendEntryRequest(appendEntryRequest);
     this.currNodes = currNodes;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index d8b3ae16f3..ed4cb9bff0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -61,7 +61,7 @@ public class BlockingLogAppender implements LogAppender {
    * and append "log" to it. Otherwise report a log mismatch.
    *
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   public AppendEntryResult appendEntry(AppendEntryRequest request, Entry log) {
     long resp = checkPrevLogIndex(request.prevLogIndex);
@@ -79,10 +79,9 @@ public class BlockingLogAppender implements LogAppender {
           <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
         success =
             logManager.maybeAppend(
-                request.prevLogIndex,
-                request.prevLogTerm,
-                request.leaderCommit,
-                Collections.singletonList(log));
+                request.prevLogIndex, request.prevLogTerm, Collections.singletonList(log));
+        member.tryUpdateCommitIndex(
+            request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
         break;
       }
       try {
@@ -107,9 +106,7 @@ public class BlockingLogAppender implements LogAppender {
     return result;
   }
 
-  /**
-   * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
-   */
+  /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
@@ -154,7 +151,7 @@ public class BlockingLogAppender implements LogAppender {
    *
    * @param logs append logs
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> logs) {
     logger.debug(
@@ -186,8 +183,10 @@ public class BlockingLogAppender implements LogAppender {
     while (true) {
       if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
           <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
-        resp = lastConfigEntry == null ? appendWithoutConfigChange(request, logs, result)
-            : appendWithConfigChange(request, logs, result, lastConfigEntry);
+        resp =
+            lastConfigEntry == null
+                ? appendWithoutConfigChange(request, logs, result)
+                : appendWithConfigChange(request, logs, result, lastConfigEntry);
         break;
       }
 
@@ -206,14 +205,15 @@ public class BlockingLogAppender implements LogAppender {
     return result;
   }
 
-  protected long appendWithConfigChange(AppendEntriesRequest request, List<Entry> logs,
-      AppendEntryResult result, ConfigChangeEntry configChangeEntry) {
+  protected long appendWithConfigChange(
+      AppendEntriesRequest request,
+      List<Entry> logs,
+      AppendEntryResult result,
+      ConfigChangeEntry configChangeEntry) {
     long resp;
     try {
       logManager.getLock().writeLock().lock();
-      resp =
-          logManager.maybeAppend(
-              request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+      resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
 
       if (resp != -1) {
         if (logger.isDebugEnabled()) {
@@ -228,6 +228,8 @@ public class BlockingLogAppender implements LogAppender {
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
         member.setNewNodes(configChangeEntry.getNewPeers());
+        member.tryUpdateCommitIndex(
+            request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
       } else {
         // the incoming log points to an illegal position, reject it
         result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -238,11 +240,9 @@ public class BlockingLogAppender implements LogAppender {
     return resp;
   }
 
-  protected long appendWithoutConfigChange(AppendEntriesRequest request, List<Entry> logs,
-      AppendEntryResult result) {
-    long resp =
-        logManager.maybeAppend(
-            request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+  protected long appendWithoutConfigChange(
+      AppendEntriesRequest request, List<Entry> logs, AppendEntryResult result) {
+    long resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
     if (resp != -1) {
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -254,6 +254,8 @@ public class BlockingLogAppender implements LogAppender {
       result.status = Response.RESPONSE_STRONG_ACCEPT;
       result.setLastLogIndex(logManager.getLastLogIndex());
       result.setLastLogTerm(logManager.getLastLogTerm());
+      member.tryUpdateCommitIndex(
+          request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
 
     } else {
       // the incoming log points to an illegal position, reject it
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 4572895c49..ab89f354cf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -138,11 +138,10 @@ public class SlidingWindowLogAppender implements LogAppender {
       // TODO: Consider memory footprint to execute a precise rejection
       if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
           <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
-        synchronized (logManager) {
-          success =
-              logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
-          break;
-        }
+        success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, logs);
+        member.tryUpdateCommitIndex(
+            member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
+        break;
       }
       try {
         TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
@@ -211,13 +210,14 @@ public class SlidingWindowLogAppender implements LogAppender {
     long appendedPos = 0;
 
     AppendEntryResult result = new AppendEntryResult();
-    synchronized (logManager) {
+    synchronized (this) {
       int windowPos = (int) (entry.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
       if (windowPos < 0) {
         // the new entry may replace an appended entry
         appendedPos =
-            logManager.maybeAppend(
-                prevLogIndex, prevLogTerm, leaderCommit, Collections.singletonList(entry));
+            logManager.maybeAppend(prevLogIndex, prevLogTerm, Collections.singletonList(entry));
+        member.tryUpdateCommitIndex(
+            member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 3f988e78f4..14745a510d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -21,28 +21,26 @@ package org.apache.iotdb.consensus.natraft.protocol.log.applier;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.RequestEntry;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.function.Consumer;
 
 public class AsyncLogApplier implements LogApplier {
 
   private static final Logger logger = LoggerFactory.getLogger(AsyncLogApplier.class);
-  private static final int CONCURRENT_CONSUMER_NUM = Runtime.getRuntime().availableProcessors();
-  private RaftConfig config;
+  private static final int CONCURRENT_CONSUMER_NUM = 4;
   private LogApplier embeddedApplier;
-  private Map<PartialPath, DataLogConsumer> consumerMap;
+  private DataLogConsumer[] consumers;
   private ExecutorService consumerPool;
   private String name;
 
@@ -54,11 +52,14 @@ public class AsyncLogApplier implements LogApplier {
 
   public AsyncLogApplier(LogApplier embeddedApplier, String name, RaftConfig config) {
     this.embeddedApplier = embeddedApplier;
-    consumerMap = new HashMap<>();
+    consumers = new DataLogConsumer[CONCURRENT_CONSUMER_NUM];
     consumerPool =
         IoTDBThreadPoolFactory.newFixedThreadPool(CONCURRENT_CONSUMER_NUM, "ApplierThread");
+    for (int i = 0; i < consumers.length; i++) {
+      consumers[i] = new DataLogConsumer(name + "-" + i, config.getMaxNumOfLogsInMem());
+      consumerPool.submit(consumers[i]);
+    }
     this.name = name;
-    this.config = config;
   }
 
   @Override
@@ -71,12 +72,18 @@ public class AsyncLogApplier implements LogApplier {
   // the consumers will never be drained
   public synchronized void apply(Entry e) {
 
-    PartialPath logKey = getLogKey(e);
-
-    if (logKey != null) {
-      // this plan only affects one sg, so we can run it with other plans in parallel
-      provideLogToConsumers(logKey, e);
-      return;
+    if (e instanceof RequestEntry) {
+      RequestEntry requestEntry = (RequestEntry) e;
+      IConsensusRequest request = requestEntry.getRequest();
+      request = getStateMachine().deserializeRequest(request);
+      requestEntry.setRequest(request);
+
+      PartialPath logKey = getLogKey(request);
+      if (logKey != null) {
+        // this plan only affects one sg, so we can run it with other plans in parallel
+        provideLogToConsumers(logKey, e);
+        return;
+      }
     }
 
     logger.debug("{}: {} is waiting for consumers to drain", name, e);
@@ -84,13 +91,12 @@ public class AsyncLogApplier implements LogApplier {
     applyInternal(e);
   }
 
-  private PartialPath getLogKey(Entry e) {
-    // TODO-raft: implement
-    return null;
+  private PartialPath getLogKey(IConsensusRequest e) {
+    return e.conflictKey();
   }
 
   private void provideLogToConsumers(PartialPath planKey, Entry e) {
-    consumerMap.computeIfAbsent(planKey, d -> new DataLogConsumer(name + "-" + d)).accept(e);
+    consumers[Math.abs(planKey.hashCode()) % CONCURRENT_CONSUMER_NUM].accept(e);
   }
 
   private void drainConsumers() {
@@ -108,7 +114,7 @@ public class AsyncLogApplier implements LogApplier {
   }
 
   private boolean allConsumersEmpty() {
-    for (DataLogConsumer consumer : consumerMap.values()) {
+    for (DataLogConsumer consumer : consumers) {
       if (!consumer.isEmpty()) {
         if (logger.isDebugEnabled()) {
           logger.debug("Consumer not empty: {}", consumer);
@@ -125,14 +131,14 @@ public class AsyncLogApplier implements LogApplier {
 
   private class DataLogConsumer implements Runnable, Consumer<Entry> {
 
-    private BlockingQueue<Entry> logQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
+    private BlockingQueue<Entry> logQueue;
     private volatile long lastLogIndex;
     private volatile long lastAppliedLogIndex;
     private String name;
-    private Future<?> future;
 
-    public DataLogConsumer(String name) {
+    public DataLogConsumer(String name, int queueCapacity) {
       this.name = name;
+      this.logQueue = new ArrayBlockingQueue<>(queueCapacity);
     }
 
     public boolean isEmpty() {
@@ -174,20 +180,6 @@ public class AsyncLogApplier implements LogApplier {
 
     @Override
     public void accept(Entry e) {
-      if (future == null || future.isCancelled() || future.isDone()) {
-        if (future != null) {
-          try {
-            future.get();
-          } catch (InterruptedException ex) {
-            logger.error("Last applier thread exits unexpectedly", ex);
-            Thread.currentThread().interrupt();
-          } catch (ExecutionException ex) {
-            logger.error("Last applier thread exits unexpectedly", ex);
-          }
-        }
-        future = consumerPool.submit(this);
-      }
-
       try {
         lastLogIndex = e.getCurrLogIndex();
         logQueue.put(e);
@@ -214,4 +206,9 @@ public class AsyncLogApplier implements LogApplier {
           + '}';
     }
   }
+
+  @Override
+  public IStateMachine getStateMachine() {
+    return embeddedApplier.getStateMachine();
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
index 646159877f..c38e5fc260 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
@@ -71,4 +71,9 @@ public class BaseApplier implements LogApplier {
     request = stateMachine.deserializeRequest(request);
     return stateMachine.write(request);
   }
+
+  @Override
+  public IStateMachine getStateMachine() {
+    return stateMachine;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java
index e2ddfdacf0..c2e0821502 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.applier;
 
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 
 /** LogApplier applies the log to the local node to make it take effect. */
@@ -34,4 +35,6 @@ public interface LogApplier {
   void apply(Entry e);
 
   default void close() {}
+
+  IStateMachine getStateMachine();
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 7b5f9f5dc2..4d90e0d9b2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -69,10 +69,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     long resp = response.status;
 
     if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
-      member
-          .getVotingLogList()
-          .onStronglyAccept(
-              log, trueReceiver);
+      member.getVotingLogList().onStronglyAccept(log, trueReceiver);
 
       member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
     } else if (resp > 0) {
@@ -136,7 +133,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     }
   }
 
-
   public void setLog(VotingEntry log) {
     this.log = log;
   }
@@ -148,5 +144,4 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   public void setDirectReceiver(Peer follower) {
     this.directReceiver = follower;
   }
-
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 07f3bc3929..27ea860aad 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,11 +19,6 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
-import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -38,13 +33,13 @@ import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
 import com.google.common.util.concurrent.RateLimiter;
-import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -55,6 +50,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+
 /**
  * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
  * followers and send the logs in an ordered manner so that the followers will not wait for previous
@@ -101,9 +98,6 @@ public class LogDispatcher {
     }
   }
 
-
-
-
   void createQueue(Peer node) {
     BlockingQueue<VotingEntry> logBlockingQueue;
     logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
@@ -112,9 +106,7 @@ public class LogDispatcher {
 
     for (int i = 0; i < bindingThreadNum; i++) {
       executorServices
-          .computeIfAbsent(
-              node,
-              n -> createPool(node))
+          .computeIfAbsent(node, n -> createPool(node))
           .submit(newDispatcherThread(node, logBlockingQueue));
     }
   }
@@ -134,7 +126,7 @@ public class LogDispatcher {
   void createQueueAndBindingThreads(Collection<Peer> peers) {
     for (Peer node : peers) {
       if (!node.equals(member.getThisNode())) {
-       createQueue(node);
+        createQueue(node);
       }
     }
     updateRateLimiter();
@@ -154,7 +146,13 @@ public class LogDispatcher {
   }
 
   protected boolean addToQueue(BlockingQueue<VotingEntry> nodeLogQueue, VotingEntry request) {
-    return nodeLogQueue.add(request);
+    synchronized (nodeLogQueue) {
+      boolean added = nodeLogQueue.add(request);
+      if (added) {
+        nodeLogQueue.notifyAll();
+      }
+      return added;
+    }
   }
 
   public void offer(VotingEntry request) {
@@ -228,12 +226,13 @@ public class LogDispatcher {
       try {
         while (!Thread.interrupted()) {
           synchronized (logBlockingDeque) {
-            VotingEntry poll = logBlockingDeque.take();
-            currBatch.add(poll);
-            if (maxBatchSize > 1) {
-              while (!logBlockingDeque.isEmpty() && currBatch.size() < maxBatchSize) {
-                currBatch.add(logBlockingDeque.take());
-              }
+            VotingEntry poll = logBlockingDeque.poll();
+            if (poll != null) {
+              currBatch.add(poll);
+              logBlockingDeque.drainTo(currBatch, maxBatchSize);
+            } else {
+              logBlockingDeque.wait(10);
+              continue;
             }
           }
           if (logger.isDebugEnabled()) {
@@ -300,7 +299,11 @@ public class LogDispatcher {
       return request;
     }
 
-    private void sendLogs(List<VotingEntry> currBatch) throws TException {
+    private void sendLogs(List<VotingEntry> currBatch) {
+      if (currBatch.isEmpty()) {
+        return;
+      }
+
       int logIndex = 0;
       logger.debug(
           "send logs from index {} to {}",
@@ -331,8 +334,7 @@ public class LogDispatcher {
       }
     }
 
-    public AppendNodeEntryHandler getAppendNodeEntryHandler(
-        VotingEntry log, Peer node) {
+    public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
       AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
       handler.setDirectReceiver(node);
       handler.setLog(log);
@@ -347,8 +349,7 @@ public class LogDispatcher {
       private AppendEntriesHandler(List<VotingEntry> batch) {
         singleEntryHandlers = new ArrayList<>(batch.size());
         for (VotingEntry sendLogRequest : batch) {
-          AppendNodeEntryHandler handler =
-              getAppendNodeEntryHandler(sendLogRequest, receiver);
+          AppendNodeEntryHandler handler = getAppendNodeEntryHandler(sendLogRequest, receiver);
           singleEntryHandlers.add(handler);
         }
       }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 7a097e1f4d..15f845abbe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -19,17 +19,20 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class VotingLogList {
 
   private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
@@ -80,7 +83,8 @@ public class VotingLogList {
    * @return the lastly removed entry if any.
    */
   public void onStronglyAccept(VotingEntry entry, Peer acceptingNode) {
-    logger.debug("{} is strongly accepted by {}", entry, acceptingNode);
+    logger.debug(
+        "{} is strongly accepted by {}; {}", entry, acceptingNode, stronglyAcceptedIndices);
     long currLogIndex = entry.getEntry().getCurrLogIndex();
 
     Long newIndex =
@@ -96,6 +100,9 @@ public class VotingLogList {
                 return oldIndex;
               }
             });
+    entry.getEntry().acceptedTime = System.nanoTime();
+    Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.add(
+        entry.getEntry().acceptedTime - entry.getEntry().createTime);
     if (newIndex == currLogIndex) {
       tryCommit(entry);
     }
@@ -119,16 +126,16 @@ public class VotingLogList {
     if (enableWeakAcceptance) {
       int currNodeQuorumNum = votingEntry.currNodesQuorumNum();
       int newNodeQuorumNum = votingEntry.newNodesQuorumNum();
-      int stronglyAcceptedNumByCurrNodes = votingEntry.stronglyAcceptedNumByCurrNodes(
-          stronglyAcceptedIndices);
-      int stronglyAcceptedNumByNewNodes = votingEntry.stronglyAcceptedNumByNewNodes(
-          stronglyAcceptedIndices);
-      int weaklyAcceptedNumByCurrNodes = votingEntry.weaklyAcceptedNumByCurrNodes(
-          stronglyAcceptedIndices);
-      int weaklyAcceptedNumByNewNodes = votingEntry.weaklyAcceptedNumByNewNodes(
-          stronglyAcceptedIndices);
-      if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= currNodeQuorumNum &&
-          (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= newNodeQuorumNum) {
+      int stronglyAcceptedNumByCurrNodes =
+          votingEntry.stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+      int stronglyAcceptedNumByNewNodes =
+          votingEntry.stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+      int weaklyAcceptedNumByCurrNodes =
+          votingEntry.weaklyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+      int weaklyAcceptedNumByNewNodes =
+          votingEntry.weaklyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+      if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= currNodeQuorumNum
+          && (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= newNodeQuorumNum) {
         return AcceptedType.WEAKLY_ACCEPTED;
       }
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 6cecfa1b9e..1e59fccc49 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -19,14 +19,15 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.logtype;
 
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 
 public class ConfigChangeEntry extends Entry {
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index befe8ef655..a76653dac4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.manager;
 
-import java.util.function.Consumer;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -34,6 +33,7 @@ import org.apache.iotdb.consensus.natraft.utils.IOUtils;
 import java.io.File;
 import java.nio.file.Path;
 import java.util.List;
+import java.util.function.Consumer;
 
 public class DirectorySnapshotRaftLogManager extends RaftLogManager {
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 3fdcf963d2..804328305d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.manager;
 
-import java.util.function.Consumer;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.consensus.IStateMachine;
@@ -33,6 +32,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.logtype.EmptyEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.LogManagerMeta;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 
 public abstract class RaftLogManager {
 
@@ -330,12 +331,11 @@ public abstract class RaftLogManager {
    *
    * @param lastIndex leader's matchIndex for this follower node
    * @param lastTerm the entry's term which index is leader's matchIndex for this follower node
-   * @param leaderCommit leader's commitIndex
    * @param entries entries sent from the leader node Note that the leader must ensure
    *     entries[0].index = lastIndex + 1
    * @return -1 if the entries cannot be appended, otherwise the last index of new entries
    */
-  public long maybeAppend(long lastIndex, long lastTerm, long leaderCommit, List<Entry> entries) {
+  public long maybeAppend(long lastIndex, long lastTerm, List<Entry> entries) {
     try {
       lock.writeLock().lock();
       if (matchTerm(lastTerm, lastIndex)) {
@@ -362,11 +362,6 @@ public abstract class RaftLogManager {
           long offset = lastIndex + 1;
           append(entries.subList((int) (ci - offset), entries.size()));
         }
-        try {
-          commitTo(Math.min(leaderCommit, newLastIndex));
-        } catch (LogExecutionException e) {
-          // exceptions are ignored on follower side
-        }
         return newLastIndex;
       }
       return -1;
@@ -590,6 +585,13 @@ public abstract class RaftLogManager {
         // Cluster could continue provide service when exception is thrown here
         getStableEntryManager().append(entries, appliedIndex);
       }
+      for (Entry entry : entries) {
+        if (entry.createTime != 0) {
+          entry.committedTime = System.nanoTime();
+          Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(
+              entry.committedTime - entry.createTime);
+        }
+      }
     } catch (IOException e) {
       // The exception will block the raft service continue accept log.
       // TODO: Notify user that the persisted logs before these entries(include) are corrupted.
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
index ed65209947..18f12ec8fe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.consensus.natraft.protocol.log.sequencing;
 
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
 
 public interface LogSequencerFactory {
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index aaba4ae2c6..624bf0af85 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -47,8 +47,6 @@ public class SynchronousSequencer implements LogSequencer {
     this.config = config;
   }
 
-
-
   @Override
   public VotingEntry sequence(Entry e) {
     VotingEntry votingEntry = null;
@@ -68,6 +66,7 @@ public class SynchronousSequencer implements LogSequencer {
           e.setCurrLogTerm(member.getStatus().getTerm().get());
           e.setCurrLogIndex(lastIndex + 1);
           e.setPrevTerm(lastTerm);
+          e.setFromThisNode(true);
 
           // logDispatcher will serialize log, and set log size, and we will use the size after it
           logManager.append(Collections.singletonList(e));
@@ -101,7 +100,6 @@ public class SynchronousSequencer implements LogSequencer {
     return votingEntry;
   }
 
-
   public static class Factory implements LogSequencerFactory {
 
     @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index 359bb1393b..e27fdb86c4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -4,7 +4,6 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
 
-import java.io.DataOutputStream;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.Peer;
@@ -12,13 +11,14 @@ import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
 import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.rpc.TSStatusCode;
-
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -37,9 +37,7 @@ public class DirectorySnapshot extends Snapshot {
   private TEndPoint source;
   private String memberName;
 
-  public DirectorySnapshot() {
-
-  }
+  public DirectorySnapshot() {}
 
   public DirectorySnapshot(File directory, List<Path> filePaths, List<Peer> peers) {
     this.directory = directory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
index b5aa2ae455..7ade751b0b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
@@ -19,15 +19,15 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * As we can only hold a certain amount of logs in memory, when the logs' size exceed the memory
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index 08793449fd..df31eeba40 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -61,8 +61,7 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
     this.consensus = consensus;
   }
 
-  public void handleClientExit() {
-  }
+  public void handleClientExit() {}
 
   private RaftMember getMember(TConsensusGroupId groupId) throws TException {
     RaftMember member = consensus.getMember(Factory.createFromTConsensusGroupId(groupId));
@@ -98,16 +97,19 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
         ConfigChangeEntry lastConfigChangeEntry = findFirstConfigChangeEntry(request);
         if (lastConfigChangeEntry != null) {
           Peer thisPeer = new Peer(groupId, consensus.getThisNodeId(), consensus.getThisNode());
-          consensus.createNewMemberIfAbsent(groupId, thisPeer, lastConfigChangeEntry.getOldPeers(),
+          consensus.createNewMemberIfAbsent(
+              groupId,
+              thisPeer,
+              lastConfigChangeEntry.getOldPeers(),
               lastConfigChangeEntry.getNewPeers());
           return consensus.getMember(groupId);
         }
       } catch (UnknownLogTypeException e) {
         throw new TException(e.getMessage());
       }
-
+      throw new NoMemberException("No such member of: " + tgroupId);
     }
-    throw new NoMemberException("No such member of: " + tgroupId);
+    return member;
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index d9f137277f..745e1d5fef 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -17,17 +17,18 @@
  * under the License.
  */
 
-
 package org.apache.iotdb.consensus.natraft.utils;
 
-import java.nio.ByteBuffer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+
 public class LogUtils {
 
   private static final Logger logger = LoggerFactory.getLogger(LogUtils.class);
@@ -41,8 +42,8 @@ public class LogUtils {
     return votingEntry;
   }
 
-  public static AppendEntryRequest buildAppendEntryRequest(Entry entry, boolean serializeNow,
-      RaftMember member) {
+  public static AppendEntryRequest buildAppendEntryRequest(
+      Entry entry, boolean serializeNow, RaftMember member) {
     AppendEntryRequest request = new AppendEntryRequest();
     request.setTerm(member.getStatus().getTerm().get());
     if (serializeNow) {
@@ -76,6 +77,4 @@ public class LogUtils {
     }
     return sendLogRequest;
   }
-
-
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
index dc1ec45817..d149827c64 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -1,11 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.consensus.natraft.utils;
 
+import org.apache.iotdb.consensus.common.Peer;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.iotdb.consensus.common.Peer;
 
 public class NodeUtils {
 
@@ -28,5 +48,4 @@ public class NodeUtils {
     nodeUnion.addAll(newNodes);
     return nodeUnion;
   }
-
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
index ba01f7983f..7227b18940 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
@@ -24,9 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse.Builder;
 import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient.Factory;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.checkerframework.checker.units.qual.C;
 
 public class StatusUtils {
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
new file mode 100644
index 0000000000..ba3a85ac31
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Timer {
+
+  private static final Logger logger = LoggerFactory.getLogger(Timer.class);
+
+  public static final boolean ENABLE_INSTRUMENTING = true;
+
+  private static final String COORDINATOR = "Coordinator";
+  private static final String META_GROUP_MEMBER = "Meta group member";
+  private static final String DATA_GROUP_MEMBER = "Data group member";
+  private static final String RAFT_MEMBER_SENDER = " Raft member(sender)";
+  private static final String RAFT_MEMBER_RECEIVER = " Raft member(receiver)";
+  private static final String LOG_DISPATCHER = "Log dispatcher";
+
+  // convert nano to milli
+  private static final double TIME_SCALE = 1_000_000.0;
+
+  public enum Statistic {
+    // A dummy root for the convenience of prints
+    ROOT("ClassName", "BlockName", TIME_SCALE, true, null),
+    // coordinator
+    COORDINATOR_EXECUTE_NON_QUERY(COORDINATOR, "execute non query", TIME_SCALE, true, ROOT),
+
+    // meta group member
+    META_GROUP_MEMBER_EXECUTE_NON_QUERY(
+        META_GROUP_MEMBER, "execute non query", TIME_SCALE, true, COORDINATOR_EXECUTE_NON_QUERY),
+    META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP(
+        META_GROUP_MEMBER,
+        "execute in local group",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY),
+    META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP(
+        META_GROUP_MEMBER,
+        "execute in remote group",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY),
+    // data group member
+    DATA_GROUP_MEMBER_LOCAL_EXECUTION(
+        DATA_GROUP_MEMBER,
+        "execute locally",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    DATA_GROUP_MEMBER_WAIT_LEADER(
+        DATA_GROUP_MEMBER,
+        "wait for leader",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    DATA_GROUP_MEMBER_FORWARD_PLAN(
+        DATA_GROUP_MEMBER,
+        "forward to leader",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    // raft member - sender
+    RAFT_SENDER_SEQUENCE_LOG(
+        RAFT_MEMBER_SENDER, "sequence log", TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY),
+    RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2(
+        RAFT_MEMBER_SENDER,
+        "compete for log manager before append",
+        TIME_SCALE,
+        true,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND(
+        RAFT_MEMBER_SENDER,
+        "occupy log manager in append",
+        TIME_SCALE,
+        true,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_APPEND_LOG_V2(
+        RAFT_MEMBER_SENDER,
+        "locally append log",
+        TIME_SCALE,
+        true,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_BUILD_LOG_REQUEST(
+        RAFT_MEMBER_SENDER,
+        "build SendLogRequest",
+        TIME_SCALE,
+        true,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_BUILD_APPEND_REQUEST(
+        RAFT_MEMBER_SENDER,
+        "build AppendEntryRequest",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_BUILD_LOG_REQUEST),
+    RAFT_SENDER_OFFER_LOG(
+        RAFT_MEMBER_SENDER,
+        "offer log to dispatcher",
+        TIME_SCALE,
+        true,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_COMMIT_LOG(
+        RAFT_MEMBER_SENDER,
+        "locally commit log",
+        TIME_SCALE,
+        true,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT(
+        RAFT_MEMBER_SENDER,
+        "compete for log manager before commit",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_COMMIT_LOG),
+    RAFT_COMMIT_LOG_IN_MANAGER(
+        RAFT_MEMBER_SENDER, "commit log in log manager", TIME_SCALE, true, RAFT_SENDER_COMMIT_LOG),
+    RAFT_SENDER_EXIT_LOG_MANAGER(
+        RAFT_MEMBER_SENDER,
+        "exiting log manager synchronizer",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_COMMIT_LOG),
+    RAFT_SENDER_COMMIT_GET_LOGS(
+        RAFT_MEMBER_SENDER,
+        "get logs to be committed",
+        TIME_SCALE,
+        true,
+        RAFT_COMMIT_LOG_IN_MANAGER),
+    RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS(
+        RAFT_MEMBER_SENDER,
+        "delete logs exceeding capacity",
+        TIME_SCALE,
+        true,
+        RAFT_COMMIT_LOG_IN_MANAGER),
+    RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS(
+        RAFT_MEMBER_SENDER,
+        "append and stable committed logs",
+        TIME_SCALE,
+        true,
+        RAFT_COMMIT_LOG_IN_MANAGER),
+    RAFT_SENDER_COMMIT_APPLY_LOGS(
+        RAFT_MEMBER_SENDER,
+        "apply after committing logs",
+        TIME_SCALE,
+        true,
+        RAFT_COMMIT_LOG_IN_MANAGER),
+    RAFT_SENDER_COMMIT_TO_CONSUMER_LOGS(
+        RAFT_MEMBER_SENDER,
+        "provide log to consumer",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_COMMIT_APPLY_LOGS),
+    RAFT_SENDER_COMMIT_EXCLUSIVE_LOGS(
+        RAFT_MEMBER_SENDER,
+        "apply logs that cannot run in parallel",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_COMMIT_APPLY_LOGS),
+    RAFT_SENDER_COMMIT_WAIT_LOG_APPLY(
+        RAFT_MEMBER_SENDER, "wait until log is applied", TIME_SCALE, true, RAFT_SENDER_COMMIT_LOG),
+    RAFT_SENDER_IN_APPLY_QUEUE(
+        RAFT_MEMBER_SENDER, "in apply queue", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
+    RAFT_SENDER_DATA_LOG_APPLY(
+        RAFT_MEMBER_SENDER, "apply data log", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
+    // raft member - receiver
+    RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
+        RAFT_MEMBER_RECEIVER,
+        "receiver wait for prev log",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_APPEND_ENTRY(
+        RAFT_MEMBER_RECEIVER,
+        "append entrys",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_APPEND_ACK(
+        RAFT_MEMBER_RECEIVER,
+        "ack append entrys",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_APPEND_ENTRY_FULL(
+        RAFT_MEMBER_RECEIVER,
+        "append entrys(full)",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_HANDLE_APPEND_ACK(
+        RAFT_MEMBER_SENDER,
+        "handle append entrys ack",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
+    // log dispatcher
+    LOG_DISPATCHER_LOG_ENQUEUE(
+        LOG_DISPATCHER,
+        "enqueue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_LOG_ENQUEUE_SINGLE(
+        LOG_DISPATCHER,
+        "enqueue (single)",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_LOG_IN_QUEUE(
+        LOG_DISPATCHER,
+        "in queue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_LOG_BATCH_SIZE(
+        LOG_DISPATCHER, "batch size", 1, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE(
+        LOG_DISPATCHER,
+        "from receive to create",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
+        LOG_DISPATCHER,
+        "from create to queue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE(
+        LOG_DISPATCHER,
+        "from create to dequeue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_SENDING(
+        LOG_DISPATCHER,
+        "from create to sending",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_SENT(
+        LOG_DISPATCHER,
+        "from create to sent",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT(
+        LOG_DISPATCHER,
+        "from create to accept",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
+        LOG_DISPATCHER,
+        "from create to commit",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_APPLIED(
+        LOG_DISPATCHER,
+        "from create to OK",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_END(
+        LOG_DISPATCHER,
+        "from create to wait append end",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END(
+        LOG_DISPATCHER,
+        "from create to wait apply end",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_TOTAL(
+        LOG_DISPATCHER,
+        "total process time",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
+    RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
+    RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+    RAFT_SENT_ENTRY_SIZE(RAFT_MEMBER_SENDER, "sent entry size", 1, true, ROOT),
+    DISPATCHER_QUEUE_LENGTH(RAFT_MEMBER_SENDER, "dispatcher queue length", 1, true, ROOT),
+    RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
+    RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
+    RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
+    RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
+    RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT),
+    RAFT_INDEX_BLOCKER(RAFT_MEMBER_SENDER, "index blocker", 1, true, ROOT),
+    RAFT_APPEND_BLOCKER(RAFT_MEMBER_SENDER, "append blocker", 1, true, ROOT),
+    RAFT_APPLY_BLOCKER(RAFT_MEMBER_SENDER, "apply blocker", 1, true, ROOT);
+
+    String className;
+    String blockName;
+    AtomicLong sum = new AtomicLong(0);
+    AtomicLong counter = new AtomicLong(0);
+    AtomicLong intervalSum = new AtomicLong(0);
+    AtomicLong intervalCounter = new AtomicLong(0);
+    long max;
+    long intervalMax;
+    double scale;
+    boolean valid;
+    int level;
+    Statistic parent;
+    List<Statistic> children = new ArrayList<>();
+
+    Statistic(String className, String blockName, double scale, boolean valid, Statistic parent) {
+      this.className = className;
+      this.blockName = blockName;
+      this.scale = scale;
+      this.valid = valid;
+      this.parent = parent;
+      if (parent == null) {
+        level = -1;
+      } else {
+        level = parent.level + 1;
+        parent.children.add(this);
+      }
+    }
+
+    public void add(long val) {
+      if (ENABLE_INSTRUMENTING) {
+        sum.addAndGet(val);
+        counter.incrementAndGet();
+        intervalSum.addAndGet(val);
+        intervalCounter.incrementAndGet();
+        max = Math.max(max, val);
+        intervalMax = Math.max(intervalMax, val);
+      }
+    }
+
+    /** @return System.nanoTime() if the ENABLE_INSTRUMENTING is true, else zero */
+    public long getOperationStartTime() {
+      if (ENABLE_INSTRUMENTING) {
+        return System.nanoTime();
+      }
+      return Long.MIN_VALUE;
+    }
+
+    /**
+     * This method equals `add(System.nanoTime() - start)`. We wrap `System.nanoTime()` in this
+     * method to avoid unnecessary calls when instrumenting is disabled.
+     */
+    public long calOperationCostTimeFromStart(long startTime) {
+      if (ENABLE_INSTRUMENTING && startTime != Long.MIN_VALUE && startTime != 0) {
+        long consumed = System.nanoTime() - startTime;
+        add(consumed);
+        return consumed;
+      }
+      return 0;
+    }
+
+    /** WARN: no current safety guarantee. */
+    public void reset() {
+      sum.set(0);
+      counter.set(0);
+      max = 0;
+      intervalCounter.set(0);
+      intervalSum.set(0);
+      intervalMax = 0;
+    }
+
+    /** WARN: no current safety guarantee. */
+    public static void resetAll() {
+      for (Statistic value : values()) {
+        value.reset();
+      }
+    }
+
+    @Override
+    public String toString() {
+      double s = sum.get() / scale;
+      long cnt = counter.get();
+      double intervalS = intervalSum.get() / scale;
+      long intervalCnt = intervalCounter.get();
+      double avg = s / cnt;
+      double intervalAvg = intervalS / intervalCnt;
+      intervalSum.set(0);
+      intervalCounter.set(0);
+      intervalMax = 0;
+      return String.format(
+          "%s - %s: %.4f(%.4f), %d(%d), %.4f(%.4f), %d(%d)",
+          className, blockName, s, intervalS, cnt, intervalCnt, avg, intervalAvg, max, intervalMax);
+    }
+
+    public long getCnt() {
+      return counter.get();
+    }
+
+    public long getSum() {
+      return sum.get();
+    }
+
+    public static String getReport() {
+      if (!ENABLE_INSTRUMENTING) {
+        return "";
+      }
+      StringBuilder result = new StringBuilder();
+      printTo(Statistic.ROOT, result);
+      return result.toString();
+    }
+
+    private static void printTo(Statistic currNode, StringBuilder out) {
+      if (currNode != Statistic.ROOT && currNode.valid) {
+        if (currNode.counter.get() != 0) {
+          indent(out, currNode.level);
+          out.append(currNode).append("\n");
+        }
+      }
+      for (Statistic child : currNode.children) {
+        printTo(child, out);
+      }
+    }
+
+    private static void indent(StringBuilder out, int indents) {
+      for (int i = 0; i < indents; i++) {
+        out.append("  ");
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 0994935c95..ae890e6a32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -394,4 +394,9 @@ public abstract class InsertNode extends WritePlanNode {
     result = 31 * result + Arrays.hashCode(dataTypes);
     return result;
   }
+
+  @Override
+  public PartialPath conflictKey() {
+    return devicePath;
+  }
 }