You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:07 UTC

[iotdb] 07/09: fix client in LogDispatcher

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 224002987d8f8bedefb3bfecb6730dcadf7ffba1
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 8 18:25:16 2021 +0800

    fix client in LogDispatcher
---
 .../org/apache/iotdb/cluster/expr/ExprBench.java   |  88 ++++++++--------
 .../org/apache/iotdb/cluster/expr/ExprServer.java  |   5 +
 .../apache/iotdb/cluster/expr/VotingLogList.java   |   6 ++
 .../iotdb/cluster/log/IndirectLogDispatcher.java   |   8 +-
 .../java/org/apache/iotdb/cluster/log/Log.java     |   7 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  87 ++++++++++------
 .../cluster/log/logtypes/PhysicalPlanLog.java      |  14 ++-
 .../cluster/server/member/DataGroupMember.java     |   7 +-
 .../cluster/server/member/MetaGroupMember.java     |   9 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 111 +++++++++++++--------
 .../apache/iotdb/cluster/server/monitor/Timer.java |  36 ++++++-
 .../cluster/server/member/MetaGroupMemberTest.java |   4 +-
 12 files changed, 251 insertions(+), 131 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index e3f7b57..d0891a3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -32,6 +32,8 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class ExprBench {
@@ -44,6 +46,7 @@ public class ExprBench {
   private SyncClientPool clientPool;
   private Node target;
   private int maxRequestNum;
+  private ExecutorService pool = Executors.newCachedThreadPool();
 
   public ExprBench(Node target) {
     this.target = target;
@@ -54,52 +57,51 @@ public class ExprBench {
   public void benchmark() {
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < threadNum; i++) {
-      new Thread(
-              () -> {
-                Client client = clientPool.getClient(target);
-                ExecutNonQueryReq request = new ExecutNonQueryReq();
-                DummyPlan plan = new DummyPlan();
-                plan.setWorkload(new byte[workloadSize]);
-                plan.setNeedForward(true);
-                ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
-                plan.serialize(byteBuffer);
-                byteBuffer.flip();
-                request.setPlanBytes(byteBuffer);
-                long currRequsetNum = -1;
-                while (true) {
+      pool.submit(
+          () -> {
+            Client client = clientPool.getClient(target);
+            ExecutNonQueryReq request = new ExecutNonQueryReq();
+            DummyPlan plan = new DummyPlan();
+            plan.setWorkload(new byte[workloadSize]);
+            plan.setNeedForward(true);
+            ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+            plan.serialize(byteBuffer);
+            byteBuffer.flip();
+            request.setPlanBytes(byteBuffer);
+            long currRequsetNum = -1;
+            while (true) {
 
-                  long reqLatency = System.nanoTime();
-                  try {
-                    client.executeNonQueryPlan(request);
-                    currRequsetNum = requestCounter.incrementAndGet();
-                    if (currRequsetNum > threadNum * 10) {
-                      reqLatency = System.nanoTime() - reqLatency;
-                      maxLatency = Math.max(maxLatency, reqLatency);
-                      latencySum.addAndGet(reqLatency);
-                    }
-                  } catch (TException e) {
-                    e.printStackTrace();
-                  }
+              long reqLatency = System.nanoTime();
+              try {
+                client.executeNonQueryPlan(request);
+                currRequsetNum = requestCounter.incrementAndGet();
+                if (currRequsetNum > threadNum * 10) {
+                  reqLatency = System.nanoTime() - reqLatency;
+                  maxLatency = Math.max(maxLatency, reqLatency);
+                  latencySum.addAndGet(reqLatency);
+                }
+              } catch (TException e) {
+                e.printStackTrace();
+              }
 
-                  if (currRequsetNum % 1000 == 0) {
-                    long elapsedTime = System.currentTimeMillis() - startTime;
-                    System.out.println(
-                        String.format(
-                            "%d %d %f(%f) %f %f",
-                            elapsedTime,
-                            currRequsetNum,
-                            (currRequsetNum + 0.0) / elapsedTime,
-                            currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
-                            maxLatency / 1000.0,
-                            (latencySum.get() + 0.0) / currRequsetNum));
-                  }
+              if (currRequsetNum % 1000 == 0) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                System.out.println(
+                    String.format(
+                        "%d %d %f(%f) %f %f",
+                        elapsedTime,
+                        currRequsetNum,
+                        (currRequsetNum + 0.0) / elapsedTime,
+                        currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
+                        maxLatency / 1000.0,
+                        (latencySum.get() + 0.0) / currRequsetNum));
+              }
 
-                  if (currRequsetNum >= maxRequestNum) {
-                    break;
-                  }
-                }
-              })
-          .start();
+              if (currRequsetNum >= maxRequestNum) {
+                break;
+              }
+            }
+          });
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
index 4381746..6aac585 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.expr;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -79,6 +80,8 @@ public class ExprServer extends MetaClusterServer {
     boolean useSW = Boolean.parseBoolean(args[5]);
     boolean enableWeakAcceptance = Boolean.parseBoolean(args[6]);
     boolean enableCommitReturn = Boolean.parseBoolean(args[7]);
+    int maxBatchSize = Integer.parseInt(args[8]);
+    int defaultLogBufferSize = Integer.parseInt(args[9]);
 
     ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
     ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
@@ -89,10 +92,12 @@ public class ExprServer extends MetaClusterServer {
     RaftMember.USE_LOG_DISPATCHER = true;
     RaftMember.USE_INDIRECT_LOG_DISPATCHER = useIndirectDispatcher;
     LogDispatcher.bindingThreadNum = dispatcherThreadNum;
+    LogDispatcher.maxBatchSize = maxBatchSize;
     ExprMember.bypassRaft = bypassRaft;
     ExprMember.useSlidingWindow = useSW;
     ExprMember.ENABLE_WEAK_ACCEPTANCE = enableWeakAcceptance;
     ExprMember.ENABLE_COMMIT_RETURN = enableCommitReturn;
+    Log.DEFAULT_BUFFER_SIZE = defaultLogBufferSize * 1024 + 512;
 
     ExprServer server = new ExprServer();
     server.start();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 28eb19d..7e13336 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class VotingLogList {
+
   private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
 
   private List<VotingLog> logList = new ArrayList<>();
@@ -75,6 +76,11 @@ public class VotingLogList {
           if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
             lastEntryIndexToCommit = i;
           }
+          if (votingLog.getStronglyAcceptedNodeIds().size()
+                  + votingLog.getWeaklyAcceptedNodeIds().size()
+              >= quorumSize) {
+            votingLog.acceptedTime = System.nanoTime();
+          }
         } else if (votingLog.getLog().getCurrLogIndex() > index) {
           break;
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 65496c3..767736d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -87,9 +87,9 @@ public class IndirectLogDispatcher extends LogDispatcher {
       }
     }
 
-    for (Node node : directToIndirectFollowerMap.keySet()) {
-      nodeLogQueues.add(createQueueAndBindingThread(node));
-    }
+    //    for (Node node : directToIndirectFollowerMap.keySet()) {
+    //      nodeLogQueues.add(createQueueAndBindingThread(node));
+    //    }
   }
 
   class DispatcherThread extends LogDispatcher.DispatcherThread {
@@ -110,7 +110,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
           logRequest.getAppendEntryRequest(),
           logRequest.getQuorumSize(),
           directToIndirectFollowerMap.get(receiver));
-      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
+      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
           logRequest.getVotingLog().getLog().getCreateTime());
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index e70c326..4845bba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -32,7 +32,8 @@ public abstract class Log implements Comparable<Log> {
   private static final Comparator<Log> COMPARATOR =
       Comparator.comparingLong(Log::getCurrLogIndex).thenComparing(Log::getCurrLogTerm);
 
-  protected static final int DEFAULT_BUFFER_SIZE = 4096;
+  // make this configurable or adaptive
+  public static int DEFAULT_BUFFER_SIZE = 16 * 1024;
   private long currLogIndex;
   private long currLogTerm;
 
@@ -51,6 +52,10 @@ public abstract class Log implements Comparable<Log> {
 
   public abstract void deserialize(ByteBuffer buffer);
 
+  public void serialize(ByteBuffer buffer) {
+    buffer.put(serialize());
+  }
+
   public enum Types {
     // DO CHECK LogParser when you add a new type of log
     ADD_NODE,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 6c3d2073..9558714 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -31,10 +31,12 @@ import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -68,13 +70,14 @@ public class LogDispatcher {
   RaftMember member;
   private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
   private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
-  List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
+  List<BlockingQueue<SendLogRequest>> nodesLogQueues = new ArrayList<>();
   ExecutorService executorService;
   private static ExecutorService serializationService =
       Executors.newFixedThreadPool(
-          Runtime.getRuntime().availableProcessors() * 2,
+          CommonUtils.getCpuCores() * 2,
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
   public static int bindingThreadNum = 1;
+  public static int maxBatchSize = 1;
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
@@ -85,7 +88,7 @@ public class LogDispatcher {
   void createQueueAndBindingThreads() {
     for (Node node : member.getAllNodes()) {
       if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
-        nodeLogQueues.add(createQueueAndBindingThread(node));
+        nodesLogQueues.add(createQueueAndBindingThread(node));
       }
     }
   }
@@ -96,20 +99,22 @@ public class LogDispatcher {
     executorService.awaitTermination(10, TimeUnit.SECONDS);
   }
 
+  private ByteBuffer serializeTask(SendLogRequest request) {
+    ByteBuffer byteBuffer = request.getVotingLog().getLog().serialize();
+    request.getVotingLog().getLog().setByteSize(byteBuffer.capacity());
+    return byteBuffer;
+  }
+
   public void offer(SendLogRequest request) {
     // do serialization here to avoid taking LogManager for too long
-    if (!nodeLogQueues.isEmpty()) {
-      request.serializedLogFuture =
-          serializationService.submit(
-              () -> {
-                ByteBuffer byteBuffer = request.getVotingLog().getLog().serialize();
-                request.getVotingLog().getLog().setByteSize(byteBuffer.array().length);
-                return byteBuffer;
-              });
+    if (!nodesLogQueues.isEmpty()) {
+      request.serializedLogFuture = serializationService.submit(() -> serializeTask(request));
     }
 
-    for (int i = 0; i < nodeLogQueues.size(); i++) {
-      BlockingQueue<SendLogRequest> nodeLogQueue = nodeLogQueues.get(i);
+    long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
+    request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
+    for (int i = 0; i < nodesLogQueues.size(); i++) {
+      BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
       try {
         boolean addSucceeded;
         if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
@@ -135,10 +140,17 @@ public class LogDispatcher {
         Thread.currentThread().interrupt();
       }
     }
+    Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
+
+    if (Timer.ENABLE_INSTRUMENTING) {
+      Statistic.LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE.calOperationCostTimeFromStart(
+          request.getVotingLog().getLog().getCreateTime());
+    }
   }
 
   BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
-    BlockingQueue<SendLogRequest> logBlockingQueue =
+    BlockingQueue<SendLogRequest> logBlockingQueue;
+    logBlockingQueue =
         new ArrayBlockingQueue<>(
             ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
     for (int i = 0; i < bindingThreadNum; i++) {
@@ -234,6 +246,7 @@ public class LogDispatcher {
     private BlockingQueue<SendLogRequest> logBlockingDeque;
     private List<SendLogRequest> currBatch = new ArrayList<>();
     private Peer peer;
+    Client client;
 
     DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
       this.receiver = receiver;
@@ -242,6 +255,7 @@ public class LogDispatcher {
           member
               .getPeerMap()
               .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
+      client = member.getSyncClient(receiver);
     }
 
     @Override
@@ -252,15 +266,20 @@ public class LogDispatcher {
           synchronized (logBlockingDeque) {
             SendLogRequest poll = logBlockingDeque.take();
             currBatch.add(poll);
-            if (useBatchInLogCatchUp) {
-              logBlockingDeque.drainTo(currBatch);
+            if (maxBatchSize > 1) {
+              logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
             }
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
           }
+          Statistic.LOG_DISPATCHER_LOG_BATCH_SIZE.add(currBatch.size());
           for (SendLogRequest request : currBatch) {
+            Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+                request.getVotingLog().getLog().getEnqueueTime());
+            long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
             request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
+            Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
           }
           sendBatchLogs(currBatch);
           currBatch.clear();
@@ -364,8 +383,6 @@ public class LogDispatcher {
             break;
           }
           logSize -= curSize;
-          Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
-              currBatch.get(logIndex).getVotingLog().getLog().getCreateTime());
           logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
         }
 
@@ -376,7 +393,7 @@ public class LogDispatcher {
           appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
         }
         for (; prevIndex < logIndex; prevIndex++) {
-          Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
+          Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
               currBatch.get(prevIndex).getVotingLog().getLog().getCreateTime());
         }
       }
@@ -397,17 +414,27 @@ public class LogDispatcher {
     }
 
     void sendLog(SendLogRequest logRequest) {
-      Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
-          logRequest.getVotingLog().getLog().getCreateTime());
-      member.sendLogToFollower(
-          logRequest.getVotingLog(),
-          receiver,
-          logRequest.getLeaderShipStale(),
-          logRequest.getNewLeaderTerm(),
-          logRequest.getAppendEntryRequest(),
-          logRequest.getQuorumSize());
-      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
-          logRequest.getVotingLog().getLog().getCreateTime());
+      AppendNodeEntryHandler handler =
+          member.getAppendNodeEntryHandler(
+              logRequest.getVotingLog(),
+              receiver,
+              logRequest.leaderShipStale,
+              logRequest.newLeaderTerm,
+              peer,
+              logRequest.quorumSize);
+      try {
+        long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+        AppendEntryResult result = client.appendEntry(logRequest.appendEntryRequest);
+        Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+        handler.onComplete(result);
+      } catch (TException e) {
+        client.getInputProtocol().getTransport().close();
+        ClientUtils.putBackSyncClient(client);
+        client = member.getSyncClient(receiver);
+        handler.onError(e);
+      } catch (Exception e) {
+        handler.onError(e);
+      }
     }
 
     class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index ad24794..7ecdf5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -22,11 +22,11 @@ package org.apache.iotdb.cluster.log.logtypes;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -48,7 +48,7 @@ public class PhysicalPlanLog extends Log {
 
   @Override
   public ByteBuffer serialize() {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+    PublicBAOS byteArrayOutputStream = new PublicBAOS(DEFAULT_BUFFER_SIZE);
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) PHYSICAL_PLAN.ordinal());
 
@@ -60,7 +60,15 @@ public class PhysicalPlanLog extends Log {
       // unreachable
     }
 
-    return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    buffer.put((byte) PHYSICAL_PLAN.ordinal());
+    buffer.putLong(getCurrLogIndex());
+    buffer.putLong(getCurrLogTerm());
+    plan.serialize(buffer);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index eeea9d6..292dafe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.applier.AsyncDataLogApplier;
 import org.apache.iotdb.cluster.log.applier.DataLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
@@ -617,16 +618,18 @@ public class DataGroupMember extends RaftMember {
       return false;
     }
     CloseFileLog log = new CloseFileLog(storageGroupName, partitionId, isSeq);
+    VotingLog votingLog;
     synchronized (logManager) {
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
       logManager.append(log);
-
+      votingLog = buildVotingLog(log);
+      votingLogList.insert(votingLog);
       logger.info("Send the close file request of {} to other nodes", log);
     }
     try {
-      return appendLogInGroup(log);
+      return appendLogInGroup(votingLog);
     } catch (LogExecutionException e) {
       logger.error("Cannot close partition {}#{} seq:{}", storageGroupName, partitionId, isSeq, e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 9dafebe..ad0f55c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
 import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -938,6 +939,7 @@ public class MetaGroupMember extends RaftMember {
     }
 
     AddNodeLog addNodeLog = new AddNodeLog();
+    VotingLog votingLog = buildVotingLog(addNodeLog);
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
@@ -954,6 +956,7 @@ public class MetaGroupMember extends RaftMember {
       addNodeLog.setNewNode(newNode);
 
       logManager.append(addNodeLog);
+      votingLogList.insert(votingLog);
     }
 
     int retryTime = 0;
@@ -963,7 +966,7 @@ public class MetaGroupMember extends RaftMember {
           name,
           newNode,
           retryTime);
-      AppendLogResult result = sendLogToFollowers(addNodeLog);
+      AppendLogResult result = sendLogToFollowers(votingLog);
       switch (result) {
         case OK:
           commitLog(addNodeLog);
@@ -1693,6 +1696,7 @@ public class MetaGroupMember extends RaftMember {
     }
 
     RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+    VotingLog votingLog = buildVotingLog(removeNodeLog);
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
@@ -1708,6 +1712,7 @@ public class MetaGroupMember extends RaftMember {
       removeNodeLog.setRemovedNode(target);
 
       logManager.append(removeNodeLog);
+      votingLogList.insert(votingLog);
     }
 
     int retryTime = 0;
@@ -1717,7 +1722,7 @@ public class MetaGroupMember extends RaftMember {
           name,
           target,
           retryTime);
-      AppendLogResult result = sendLogToFollowers(removeNodeLog);
+      AppendLogResult result = sendLogToFollowers(votingLog);
       switch (result) {
         case OK:
           commitLog(removeNodeLog);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 829e76b..81d4ff3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1130,38 +1130,43 @@ public abstract class RaftMember {
     long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
 
     Log log;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog) log).setPlan(plan);
+      plan.setIndex(logManager.getLastLogIndex() + 1);
+    }
+    // if a single log exceeds the threshold
+    // we need to return error code to the client as in server mode
+    //    if (log.serialize().capacity() + Integer.BYTES
+    //        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+    //      logger.error(
+    //          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+    //              + "or reduce the size of requests you send.");
+    //      return StatusUtils.INTERNAL_ERROR;
+    //    }
+
     // assign term and index to the new log and append it
+    VotingLog votingLog;
     synchronized (logManager) {
-      if (plan instanceof LogPlan) {
-        try {
-          log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-        } catch (UnknownLogTypeException e) {
-          logger.error("Can not parse LogPlan {}", plan, e);
-          return StatusUtils.PARSE_LOG_ERROR;
-        }
-      } else {
-        log = new PhysicalPlanLog();
-        ((PhysicalPlanLog) log).setPlan(plan);
-        plan.setIndex(logManager.getLastLogIndex() + 1);
-      }
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
-      // if a single log exceeds the threshold
-      // we need to return error code to the client as in server mode
-      if (log.serialize().capacity() + Integer.BYTES
-          >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-        logger.error(
-            "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-                + "or reduce the size of requests you send.");
-        return StatusUtils.INTERNAL_ERROR;
-      }
       logManager.append(log);
+      votingLog = buildVotingLog(log);
+      votingLogList.insert(votingLog);
     }
+    log.setCreateTime(System.nanoTime());
+
     Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
 
     try {
-      if (appendLogInGroup(log)) {
+      if (appendLogInGroup(votingLog)) {
         return StatusUtils.OK;
       }
     } catch (LogExecutionException e) {
@@ -1171,6 +1176,7 @@ public abstract class RaftMember {
   }
 
   protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+    long totalStartTime = System.nanoTime();
     logger.debug("{}: Processing plan {}", name, plan);
     if (readOnly) {
       return StatusUtils.NODE_READ_ONLY;
@@ -1190,13 +1196,13 @@ public abstract class RaftMember {
     }
 
     // just like processPlanLocally,we need to check the size of log
-    if (log.serialize().capacity() + Integer.BYTES
-        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-      logger.error(
-          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-              + "or reduce the size of requests you send.");
-      return StatusUtils.INTERNAL_ERROR;
-    }
+    //    if (log.serialize().capacity() + Integer.BYTES
+    //        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+    //      logger.error(
+    //          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+    //              + "or reduce the size of requests you send.");
+    //      return StatusUtils.INTERNAL_ERROR;
+    //    }
 
     // assign term and index to the new log and append it
     SendLogRequest sendLogRequest = logSequencer.sequence(log);
@@ -1215,12 +1221,18 @@ public abstract class RaftMember {
         case WEAK_ACCEPT:
           // TODO: change to weak
           Statistic.RAFT_WEAK_ACCEPT.add(1);
+          Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
+              log.getCreateTime());
+          Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
           return StatusUtils.OK;
         case OK:
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
           startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
           commitLog(log);
           Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+          Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
+              log.getCreateTime());
+          Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
           return StatusUtils.OK;
         case TIME_OUT:
           logger.debug("{}: log {} timed out...", name, log);
@@ -1610,11 +1622,12 @@ public abstract class RaftMember {
       VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
     // wait for the followers to vote
     long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
-    long nextTimeToPrint = 3000;
+    long nextTimeToPrint = 15000;
 
     int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
     int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
     int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+
     synchronized (log) {
       long waitStart = System.currentTimeMillis();
       long alreadyWait = 0;
@@ -1644,7 +1657,7 @@ public abstract class RaftMember {
         totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
       }
 
-      if (alreadyWait > 3000) {
+      if (alreadyWait > 15000) {
         logger.info(
             "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
             log,
@@ -1683,9 +1696,9 @@ public abstract class RaftMember {
 
   @SuppressWarnings("java:S2445")
   protected void commitLog(Log log) throws LogExecutionException {
-    long startTime =
-        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+    long startTime;
     if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+      startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
       synchronized (logManager) {
         if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
           Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
@@ -1693,9 +1706,12 @@ public abstract class RaftMember {
 
           startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
           logManager.commitTo(log.getCurrLogIndex());
+          Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
+
+          startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
         }
       }
-      Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
+      Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
     }
     if (ENABLE_COMMIT_RETURN) {
       return;
@@ -1752,9 +1768,11 @@ public abstract class RaftMember {
     AppendEntryRequest request = new AppendEntryRequest();
     request.setTerm(term.get());
     if (serializeNow) {
+      long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
       ByteBuffer byteBuffer = log.serialize();
       log.setByteSize(byteBuffer.array().length);
-      request.setEntry(byteBuffer);
+      request.entry = byteBuffer;
+      Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
     }
     request.setLeader(getThisNode());
     // don't need lock because even if it's larger than the commitIndex when appending this log to
@@ -1840,12 +1858,13 @@ public abstract class RaftMember {
    *
    * @return true if the log is accepted by the quorum of the group, false otherwise
    */
-  boolean appendLogInGroup(Log log) throws LogExecutionException {
+  boolean appendLogInGroup(VotingLog log) throws LogExecutionException {
+    long totalStartTime = Statistic.LOG_DISPATCHER_TOTAL.getOperationStartTime();
     if (allNodes.size() == 1) {
       // single node group, no followers
       long startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
       logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
-      commitLog(log);
+      commitLog(log.getLog());
       Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
       return true;
     }
@@ -1861,11 +1880,19 @@ public abstract class RaftMember {
       AppendLogResult result = sendLogToFollowers(log);
       Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime);
       switch (result) {
+        case WEAK_ACCEPT:
+          // TODO: change to weak
+          Statistic.RAFT_WEAK_ACCEPT.add(1);
+          Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
+              log.getLog().getCreateTime());
+          Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
+          return true;
         case OK:
           startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
-          commitLog(log);
+          commitLog(log.getLog());
           Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+          Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
           return true;
         case TIME_OUT:
           logger.debug("{}: log {} timed out, retrying...", name, log);
@@ -1893,14 +1920,14 @@ public abstract class RaftMember {
    *
    * @return an AppendLogResult
    */
-  protected AppendLogResult sendLogToFollowers(Log log) {
+  protected AppendLogResult sendLogToFollowers(VotingLog log) {
     int requiredQuorum = allNodes.size() / 2;
     if (requiredQuorum <= 0) {
       // use half of the members' size as the quorum
-      return sendLogToFollowers(buildVotingLog(log), allNodes.size() / 2);
+      return sendLogToFollowers(log, allNodes.size() / 2);
     } else {
       // make sure quorum does not exceed the number of members - 1
-      return sendLogToFollowers(buildVotingLog(log), Math.min(requiredQuorum, allNodes.size() - 1));
+      return sendLogToFollowers(log, Math.min(requiredQuorum, allNodes.size() - 1));
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index c0afaa7..c794211 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -170,6 +170,12 @@ public class Timer {
         TIME_SCALE,
         RaftMember.USE_LOG_DISPATCHER,
         RAFT_SENDER_COMMIT_LOG),
+    RAFT_SENDER_EXIT_LOG_MANAGER(
+        RAFT_MEMBER_SENDER,
+        "exiting log manager synchronizer",
+        TIME_SCALE,
+        RaftMember.USE_LOG_DISPATCHER,
+        RAFT_SENDER_COMMIT_LOG),
     RAFT_SENDER_COMMIT_GET_LOGS(
         RAFT_MEMBER_SENDER,
         "get logs to be committed",
@@ -231,15 +237,41 @@ public class Timer {
         RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
     // log dispatcher
+    LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
+        LOG_DISPATCHER,
+        "from create to queue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_LOG_ENQUEUE(
+        LOG_DISPATCHER,
+        "enqueue",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_LOG_IN_QUEUE(
         LOG_DISPATCHER,
         "in queue",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
-    LOG_DISPATCHER_FROM_CREATE_TO_END(
+    LOG_DISPATCHER_LOG_BATCH_SIZE(
+        LOG_DISPATCHER, "batch size", 1, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_SENT(
+        LOG_DISPATCHER,
+        "from create to sent",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_OK(
+        LOG_DISPATCHER,
+        "from create to OK",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_TOTAL(
         LOG_DISPATCHER,
-        "from create to end",
+        "total process time",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 58e51e6..9c5c1ab 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
-import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -230,7 +230,7 @@ public class MetaGroupMemberTest extends BaseMember {
           }
 
           @Override
-          protected AppendLogResult sendLogToFollowers(Log log) {
+          protected AppendLogResult sendLogToFollowers(VotingLog log) {
             return AppendLogResult.OK;
           }