You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/28 01:20:34 UTC

[iotdb] branch native_raft updated: improve lock utilization

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 411c2852c3 improve lock utilization
411c2852c3 is described below

commit 411c2852c3eb11bf40e8cb55ad89e1fa5eacf3b4
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Mar 28 09:22:29 2023 +0800

    improve lock utilization
---
 .../iotdb/consensus/config/ConsensusConfig.java    |  2 +
 .../apache/iotdb/consensus/config/RPCConfig.java   | 60 +++++++++++++---
 .../iotdb/consensus/natraft/RaftConsensus.java     |  3 +
 .../natraft/client/AsyncRaftServiceClient.java     | 18 ++++-
 .../natraft/client/RaftConsensusClientPool.java    |  4 +-
 .../consensus/natraft/protocol/RaftConfig.java     | 27 ++++++++
 .../consensus/natraft/protocol/RaftMember.java     | 41 +++++------
 .../protocol/heartbeat/HeartbeatReqHandler.java    |  6 +-
 .../consensus/natraft/protocol/log/Entry.java      | 21 +++++-
 .../consensus/natraft/protocol/log/LogParser.java  |  8 ++-
 .../natraft/protocol/log/VotingEntry.java          | 21 ------
 .../protocol/log/appender/BlockingLogAppender.java | 70 +++++++------------
 .../natraft/protocol/log/appender/LogAppender.java |  3 +-
 .../log/appender/SlidingWindowLogAppender.java     | 78 +++++++++------------
 .../natraft/protocol/log/applier/BaseApplier.java  |  8 ++-
 .../protocol/log/catchup/LogCatchUpTask.java       | 12 ----
 .../log/dispatch/AppendNodeEntryHandler.java       | 43 +++++++-----
 .../protocol/log/dispatch/LogDispatcher.java       | 39 ++---------
 .../protocol/log/logtype/ConfigChangeEntry.java    |  4 +-
 .../natraft/protocol/log/logtype/EmptyEntry.java   |  4 +-
 .../natraft/protocol/log/logtype/RequestEntry.java |  4 +-
 .../protocol/log/manager/RaftLogManager.java       | 80 +++++++++++++++++-----
 .../serialization/SyncLogDequeSerializer.java      |  2 +-
 .../protocol/log/snapshot/DirectorySnapshot.java   |  8 ++-
 .../natraft/service/RaftRPCServiceProcessor.java   |  4 +-
 .../iotdb/consensus/natraft/utils/LogUtils.java    | 20 ++----
 .../iotdb/consensus/natraft/utils/Timer.java       | 36 ++++++++++
 .../db/mpp/plan/parser/StatementGenerator.java     | 13 ++--
 .../planner/plan/node/write/InsertTabletNode.java  |  4 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 31 ++++++++-
 .../org/apache/iotdb/db/utils/SetThreadName.java   | 22 +++---
 .../java/org/apache/iotdb/session/Session.java     | 17 ++++-
 .../apache/iotdb/session/util/SessionUtils.java    | 23 +++++++
 thrift-raft/src/main/thrift/raft.thrift            | 27 +++-----
 thrift/src/main/thrift/client.thrift               |  2 +
 35 files changed, 469 insertions(+), 296 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index fed43984fe..8740be6857 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -57,6 +57,8 @@ public class ConsensusConfig {
             .setRpcMaxConcurrentClientNum(
                 ioTConsensusConfig.getRpc().getRpcMaxConcurrentClientNum())
             .setThriftMaxFrameSize(ioTConsensusConfig.getRpc().getThriftMaxFrameSize())
+            .setCoreClientNumForEachNode(ioTConsensusConfig.getRpc().getCoreClientNumForEachNode())
+            .setMaxClientNumForEachNode(ioTConsensusConfig.getRpc().getMaxClientNumForEachNode())
             .build();
     this.properties = properties;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java
index 495519cdfe..98c29bbf0a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java
@@ -4,17 +4,21 @@
 
 package org.apache.iotdb.consensus.config;
 
+import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
+
 import java.util.concurrent.TimeUnit;
 
 public class RPCConfig {
-  private final int rpcSelectorThreadNum;
-  private final int rpcMinConcurrentClientNum;
-  private final int rpcMaxConcurrentClientNum;
-  private final int thriftServerAwaitTimeForStopService;
-  private final boolean isRpcThriftCompressionEnabled;
-  private final int selectorNumOfClientManager;
-  private final int connectionTimeoutInMs;
-  private final int thriftMaxFrameSize;
+  private int rpcSelectorThreadNum;
+  private int rpcMinConcurrentClientNum;
+  private int rpcMaxConcurrentClientNum;
+  private int thriftServerAwaitTimeForStopService;
+  private boolean isRpcThriftCompressionEnabled;
+  private int selectorNumOfClientManager;
+  private int connectionTimeoutInMs;
+  private int thriftMaxFrameSize;
+  private int coreClientNumForEachNode;
+  private int maxClientNumForEachNode;
 
   private RPCConfig(
       int rpcSelectorThreadNum,
@@ -24,7 +28,9 @@ public class RPCConfig {
       boolean isRpcThriftCompressionEnabled,
       int selectorNumOfClientManager,
       int connectionTimeoutInMs,
-      int thriftMaxFrameSize) {
+      int thriftMaxFrameSize,
+      int coreClientNumForEachNode,
+      int maxClientNumForEachNode) {
     this.rpcSelectorThreadNum = rpcSelectorThreadNum;
     this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
     this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
@@ -33,6 +39,8 @@ public class RPCConfig {
     this.selectorNumOfClientManager = selectorNumOfClientManager;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.thriftMaxFrameSize = thriftMaxFrameSize;
+    this.coreClientNumForEachNode = coreClientNumForEachNode;
+    this.maxClientNumForEachNode = maxClientNumForEachNode;
   }
 
   public int getRpcSelectorThreadNum() {
@@ -81,6 +89,10 @@ public class RPCConfig {
     private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
     private int thriftMaxFrameSize = 536870912;
 
+    private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+
+    private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+
     public RPCConfig.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
       this.rpcSelectorThreadNum = rpcSelectorThreadNum;
       return this;
@@ -122,6 +134,16 @@ public class RPCConfig {
       return this;
     }
 
+    public RPCConfig.Builder setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+      this.coreClientNumForEachNode = coreClientNumForEachNode;
+      return this;
+    }
+
+    public RPCConfig.Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+      this.maxClientNumForEachNode = maxClientNumForEachNode;
+      return this;
+    }
+
     public RPCConfig build() {
       return new RPCConfig(
           rpcSelectorThreadNum,
@@ -131,7 +153,25 @@ public class RPCConfig {
           isRpcThriftCompressionEnabled,
           selectorNumOfClientManager,
           connectionTimeoutInMs,
-          thriftMaxFrameSize);
+          thriftMaxFrameSize,
+          coreClientNumForEachNode,
+          maxClientNumForEachNode);
     }
   }
+
+  public int getCoreClientNumForEachNode() {
+    return coreClientNumForEachNode;
+  }
+
+  public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+    this.coreClientNumForEachNode = coreClientNumForEachNode;
+  }
+
+  public int getMaxClientNumForEachNode() {
+    return maxClientNumForEachNode;
+  }
+
+  public void setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+    this.maxClientNumForEachNode = maxClientNumForEachNode;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index a07afb1e97..ac1f88d2a1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -156,6 +156,9 @@ public class RaftConsensus implements IConsensus {
 
   @Override
   public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
+    if (config.isOnlyTestNetwork()) {
+      return ConsensusWriteResponse.newBuilder().setStatus(StatusUtils.OK).build();
+    }
     RaftMember impl = stateMachineMap.get(groupId);
     if (impl == null) {
       return ConsensusWriteResponse.newBuilder()
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
index 43f70d2b04..3533c90585 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AsyncRaftServiceClient extends RaftService.AsyncClient {
 
@@ -44,6 +45,8 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
+  private static final AtomicInteger createCnt = new AtomicInteger();
+  private static final AtomicInteger closeCnt = new AtomicInteger();
 
   public AsyncRaftServiceClient(
       TProtocolFactory protocolFactory,
@@ -58,11 +61,17 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
         TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
     this.endpoint = endpoint;
     this.clientManager = clientManager;
+    if (createCnt.incrementAndGet() % 1000 == 0) {
+      logger.info("Created {} clients", createCnt.get(), new Exception());
+    }
   }
 
   public void close() {
     ___transport.close();
     ___currentMethod = null;
+    if (closeCnt.incrementAndGet() % 1000 == 0) {
+      logger.info("Closed {} clients", closeCnt.get(), new Exception());
+    }
   }
 
   /**
@@ -110,6 +119,13 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
     } catch (Exception e) {
       if (!(e.getCause() instanceof ConnectException)) {
         logger.info("Unexpected exception occurs in {} :", this, e);
+      } else {
+        logger.debug("Cannot connect: {}", e.getMessage());
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ex) {
+          // ignore
+        }
       }
       return false;
     }
@@ -117,7 +133,7 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
 
   @Override
   public String toString() {
-    return String.format("AsyncConfigNodeIServiceClient{%s}", endpoint);
+    return String.format("AsyncRaftServiceClient{%s}", endpoint);
   }
 
   public static class Factory extends AsyncThriftClientFactory<TEndPoint, AsyncRaftServiceClient> {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
index d393a5c8b9..87db31f4c0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
@@ -58,8 +58,8 @@ public class RaftConsensusClientPool {
                   .build(),
               RAFT_CONSENSUS_CLIENT_POOL_THREAD_NAME),
           new ClientPoolProperty.Builder<AsyncRaftServiceClient>()
-              .setMaxClientNumForEachNode(config.getMaxClientPerNode())
-              .setCoreClientNumForEachNode(config.getMaxIdleClientPerNode())
+              .setMaxClientNumForEachNode(config.getRpcConfig().getMaxClientNumForEachNode())
+              .setCoreClientNumForEachNode(config.getRpcConfig().getCoreClientNumForEachNode())
               .build()
               .getConfig());
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 91ca23a15f..3d2991da3e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -69,6 +69,8 @@ public class RaftConfig {
   private long maxSyncLogLag = 100_000;
   private int syncLeaderMaxWaitMs = 30_000;
   private boolean enableCompressedDispatching = true;
+  private boolean ignoreStateMachine = false;
+  private boolean onlyTestNetwork = false;
   private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
   private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
   private RPCConfig rpcConfig;
@@ -374,6 +376,14 @@ public class RaftConfig {
     this.enableCompressedDispatching = enableCompressedDispatching;
   }
 
+  public boolean isOnlyTestNetwork() {
+    return onlyTestNetwork;
+  }
+
+  public void setOnlyTestNetwork(boolean onlyTestNetwork) {
+    this.onlyTestNetwork = onlyTestNetwork;
+  }
+
   public CompressionType getDispatchingCompressionType() {
     return dispatchingCompressionType;
   }
@@ -382,6 +392,14 @@ public class RaftConfig {
     this.dispatchingCompressionType = dispatchingCompressionType;
   }
 
+  public boolean isIgnoreStateMachine() {
+    return ignoreStateMachine;
+  }
+
+  public void setIgnoreStateMachine(boolean ignoreStateMachine) {
+    this.ignoreStateMachine = ignoreStateMachine;
+  }
+
   public void loadProperties(Properties properties) {
     logger.debug("Loading properties: {}", properties);
 
@@ -565,6 +583,15 @@ public class RaftConfig {
             properties.getProperty(
                 "default_boolean_encoding", this.getDispatchingCompressionType().toString())));
 
+    this.setIgnoreStateMachine(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "ignore_state_machine", String.valueOf(this.isIgnoreStateMachine()))));
+
+    this.setOnlyTestNetwork(
+        Boolean.parseBoolean(
+            properties.getProperty("only_test_network", String.valueOf(this.isOnlyTestNetwork()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 4aef915e65..5271e793aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -175,6 +175,8 @@ public class RaftMember {
   /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
   private ExecutorService commitLogPool;
 
+  private long lastCommitTaskTime;
+
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
    * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
@@ -448,7 +450,9 @@ public class RaftMember {
   }
 
   public void tryUpdateCommitIndex(long leaderTerm, long commitIndex, long commitTerm) {
-    if (leaderTerm >= status.term.get() && logManager.getCommitLogIndex() < commitIndex) {
+    if (leaderTerm >= status.term.get()
+        && logManager.getCommitLogIndex() < commitIndex
+        && System.currentTimeMillis() - lastCommitTaskTime > 100) {
       // there are more local logs that can be committed, commit them in a ThreadPool so the
       // heartbeat response will not be blocked
       CommitLogTask commitLogTask = new CommitLogTask(logManager, commitIndex, commitTerm);
@@ -457,18 +461,8 @@ public class RaftMember {
       // node catch up
       if (commitLogPool != null && !commitLogPool.isShutdown()) {
         commitLogPool.submit(commitLogTask);
+        lastCommitTaskTime = System.currentTimeMillis();
       }
-
-      logger.debug(
-          "{}: Inconsistent log found, leaderCommit: {}-{}, localCommit: {}-{}, "
-              + "localLast: {}-{}",
-          name,
-          commitIndex,
-          commitTerm,
-          logManager.getCommitLogIndex(),
-          logManager.getCommitLogTerm(),
-          logManager.getLastLogIndex(),
-          logManager.getLastLogTerm());
     }
   }
 
@@ -518,11 +512,9 @@ public class RaftMember {
     }
 
     AppendEntryResult response;
-    List<Entry> entries = LogUtils.parseEntries(request.entries);
+    List<Entry> entries = LogUtils.parseEntries(request.entries, stateMachine);
 
-    response =
-        logAppender.appendEntries(
-            request.prevLogIndex, request.prevLogTerm, request.leaderCommit, request.term, entries);
+    response = logAppender.appendEntries(request.leaderCommit, request.term, entries);
 
     if (logger.isDebugEnabled()) {
       logger.debug(
@@ -590,6 +582,10 @@ public class RaftMember {
     return allNodes;
   }
 
+  public RaftConfig getConfig() {
+    return config;
+  }
+
   protected enum AppendLogResult {
     OK,
     TIME_OUT,
@@ -609,7 +605,7 @@ public class RaftMember {
     if (!isLeader()) {
       Peer leader = getLeader();
       if (leader == null) {
-        return StatusUtils.NO_LEADER;
+        return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader in: " + groupId);
       } else if (request != null) {
         return forwardRequest(request, leader.getEndpoint(), leader.getGroupId());
       } else {
@@ -655,7 +651,11 @@ public class RaftMember {
       return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
     }
 
-    return waitForEntryResult(votingEntry);
+    TSStatus tsStatus1 = waitForEntryResult(votingEntry);
+    entry.waitEndTime = System.nanoTime();
+    Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
+        entry.waitEndTime - entry.createTime);
+    return tsStatus1;
   }
 
   protected void waitApply(Entry entry) throws LogExecutionException {
@@ -672,9 +672,6 @@ public class RaftMember {
         }
       }
     }
-    entry.waitEndTime = System.nanoTime();
-    Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
-        entry.waitEndTime - entry.createTime);
     if (entry.getException() != null) {
       throw new LogExecutionException(entry.getException());
     }
@@ -974,7 +971,7 @@ public class RaftMember {
   public TSStatus forwardRequest(IConsensusRequest plan, TEndPoint node, ConsensusGroupId groupId) {
     if (node == null || node.equals(thisNode.getEndpoint())) {
       logger.debug("{}: plan {} has no where to be forwarded", name, plan);
-      return StatusUtils.NO_LEADER;
+      return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader to forward in: " + groupId);
     }
     logger.debug("{}: Forward {} to node {}", name, plan, node);
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
index 615a34224b..f7c30e60aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.utils.Response;
 import org.apache.iotdb.consensus.raft.thrift.HeartBeatRequest;
 import org.apache.iotdb.consensus.raft.thrift.HeartBeatResponse;
@@ -92,8 +93,9 @@ public class HeartbeatReqHandler {
 
       response.setTerm(Response.RESPONSE_AGREE);
       // tell the leader the local log progress, so it may decide whether to perform a catch-up
-      response.setLastLogIndex(member.getLogManager().getLastLogIndex());
-      response.setLastLogTerm(member.getLogManager().getLastLogTerm());
+      Entry lastEntry = member.getLogManager().getLastEntryUnsafe();
+      response.setLastLogIndex(lastEntry.getCurrLogIndex());
+      response.setLastLogTerm(lastEntry.getCurrLogTerm());
       response.setCommitIndex(member.getLogManager().getCommitLogIndex());
 
       // if the snapshot apply lock is held, it means that a snapshot is installing now.
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index fec0bc7035..208678c6b2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -55,12 +55,23 @@ public abstract class Entry implements Comparable<Entry> {
   public long committedTime;
   public long applyTime;
   public long waitEndTime;
+  private ByteBuffer serializationCache;
 
   public int getDefaultSerializationBufferSize() {
     return DEFAULT_SERIALIZATION_BUFFER_SIZE;
   }
 
-  public abstract ByteBuffer serialize();
+  protected abstract ByteBuffer serializeInternal();
+
+  public ByteBuffer serialize() {
+    ByteBuffer cache = serializationCache;
+    if (cache != null) {
+      return cache.slice();
+    }
+    ByteBuffer byteBuffer = serializeInternal();
+    serializationCache = byteBuffer;
+    return byteBuffer.slice();
+  };
 
   public abstract void deserialize(ByteBuffer buffer);
 
@@ -167,4 +178,12 @@ public abstract class Entry implements Comparable<Entry> {
   public void setFromThisNode(boolean fromThisNode) {
     this.fromThisNode = fromThisNode;
   }
+
+  public ByteBuffer getSerializationCache() {
+    return serializationCache;
+  }
+
+  public void setSerializationCache(ByteBuffer serializationCache) {
+    this.serializationCache = serializationCache;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
index 84ffc3b59e..3f49a6576f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log;
 
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry.Types;
 import org.apache.iotdb.consensus.natraft.protocol.log.logtype.EmptyEntry;
@@ -43,7 +44,7 @@ public class LogParser {
     return INSTANCE;
   }
 
-  public Entry parse(ByteBuffer buffer) throws UnknownLogTypeException {
+  public Entry parse(ByteBuffer buffer, IStateMachine stateMachine) throws UnknownLogTypeException {
     if (logger.isDebugEnabled()) {
       logger.debug("Received a log buffer, pos:{}, limit:{}", buffer.position(), buffer.limit());
     }
@@ -55,11 +56,15 @@ public class LogParser {
       throw new UnknownLogTypeException(typeInt);
     }
     logger.debug("The log type is {}", type);
+    int startPos = buffer.position();
     Entry log;
     switch (type) {
       case CLIENT_REQUEST:
         RequestEntry requestLog = new RequestEntry();
         requestLog.deserialize(buffer);
+        if (stateMachine != null) {
+          requestLog.setRequest(stateMachine.deserializeRequest(requestLog.getRequest()));
+        }
         log = requestLog;
         break;
       case EMPTY:
@@ -70,6 +75,7 @@ public class LogParser {
       default:
         throw new IllegalArgumentException(type.toString());
     }
+    log.setByteSize(buffer.position() - startPos);
     logger.debug("Parsed a log {}", log);
     return log;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
index 0d6cf613bf..b654a3a872 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -23,22 +23,18 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 
 public class VotingEntry {
 
   protected Entry entry;
   // for NB-Raft
   protected Set<Peer> weaklyAcceptedNodes;
-  private boolean hasFailed;
   private AppendEntryRequest appendEntryRequest;
-  private Future<ByteBuffer> serializedLogFuture;
   protected List<Peer> currNodes;
   protected List<Peer> newNodes;
   private boolean isStronglyAccepted;
@@ -59,15 +55,6 @@ public class VotingEntry {
     this.newNodes = newNodes;
   }
 
-  public VotingEntry(VotingEntry another) {
-    this.entry = another.entry;
-    this.weaklyAcceptedNodes = another.weaklyAcceptedNodes;
-    this.setAppendEntryRequest(another.appendEntryRequest);
-    this.setSerializedLogFuture(another.getSerializedLogFuture());
-    this.currNodes = another.currNodes;
-    this.newNodes = another.newNodes;
-  }
-
   public Entry getEntry() {
     return entry;
   }
@@ -97,14 +84,6 @@ public class VotingEntry {
     this.appendEntryRequest = appendEntryRequest;
   }
 
-  public Future<ByteBuffer> getSerializedLogFuture() {
-    return serializedLogFuture;
-  }
-
-  public void setSerializedLogFuture(Future<ByteBuffer> serializedLogFuture) {
-    this.serializedLogFuture = serializedLogFuture;
-  }
-
   public int currNodesQuorumNum() {
     return currNodes.size() / 2 + 1;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index 2496ce316b..c6333b7e75 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -82,15 +82,12 @@ public class BlockingLogAppender implements LogAppender {
     return alreadyWait <= config.getWriteOperationTimeoutMS();
   }
 
-  protected long checkPrevLogIndex(long prevLogIndex) {
+  protected boolean checkPrevLogIndex(long prevLogIndex) {
     long lastLogIndex = logManager.getLastLogIndex();
-    if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
-      // there are logs missing between the incoming log and the local last log, and such logs
-      // did not come within a timeout, report a mismatch to the sender and it shall fix this
-      // through catch-up
-      return Response.RESPONSE_LOG_MISMATCH;
-    }
-    return Response.RESPONSE_AGREE;
+    // there are logs missing between the incoming log and the local last log, and such logs
+    // did not come within a timeout, report a mismatch to the sender and it shall fix this
+    // through catch-up
+    return lastLogIndex >= prevLogIndex || waitForPrevLog(prevLogIndex);
   }
 
   /**
@@ -101,25 +98,19 @@ public class BlockingLogAppender implements LogAppender {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  public AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> logs) {
-    logger.debug(
-        "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
-        member.getName(),
-        prevLogIndex,
-        prevLogTerm,
-        leaderCommit);
+  public AppendEntryResult appendEntries(long leaderCommit, long term, List<Entry> logs) {
+    logger.debug("{}, entries={}, leaderCommit={}", member.getName(), logs, leaderCommit);
     if (logs.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE)
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
     }
 
     long startTime = Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
-    long resp = checkPrevLogIndex(prevLogIndex);
+    boolean resp = checkPrevLogIndex(logs.get(0).getCurrLogIndex() - 1);
     Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
 
-    if (resp != Response.RESPONSE_AGREE) {
-      return new AppendEntryResult(resp)
+    if (!resp) {
+      return new AppendEntryResult(Response.RESPONSE_LOG_MISMATCH)
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
     }
 
@@ -136,12 +127,11 @@ public class BlockingLogAppender implements LogAppender {
     while (true) {
       if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
           <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
-        resp =
-            lastConfigEntry == null
-                ? appendWithoutConfigChange(
-                    prevLogIndex, prevLogTerm, leaderCommit, term, logs, result)
-                : appendWithConfigChange(
-                    prevLogIndex, prevLogTerm, leaderCommit, term, logs, result, lastConfigEntry);
+        if (lastConfigEntry == null) {
+          appendWithoutConfigChange(leaderCommit, term, logs, result);
+        } else {
+          appendWithConfigChange(leaderCommit, term, logs, result, lastConfigEntry);
+        }
         break;
       }
 
@@ -161,20 +151,18 @@ public class BlockingLogAppender implements LogAppender {
     return result;
   }
 
-  protected long appendWithConfigChange(
-      long prevLogIndex,
-      long prevLogTerm,
+  protected boolean appendWithConfigChange(
       long leaderCommit,
       long term,
       List<Entry> logs,
       AppendEntryResult result,
       ConfigChangeEntry configChangeEntry) {
-    long resp;
+    boolean resp;
     try {
       logManager.getLock().writeLock().lock();
-      resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
+      resp = logManager.maybeAppend(logs);
 
-      if (resp != -1) {
+      if (resp) {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
@@ -182,9 +170,8 @@ public class BlockingLogAppender implements LogAppender {
 
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
-        result.setLastLogTerm(logManager.getLastLogTerm());
         member.setNewNodes(configChangeEntry.getNewPeers());
-        member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
+        member.tryUpdateCommitIndex(term, leaderCommit, -1);
       } else {
         // the incoming log points to an illegal position, reject it
         result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -195,24 +182,17 @@ public class BlockingLogAppender implements LogAppender {
     return resp;
   }
 
-  protected long appendWithoutConfigChange(
-      long prevLogIndex,
-      long prevLogTerm,
-      long leaderCommit,
-      long term,
-      List<Entry> logs,
-      AppendEntryResult result) {
-    long resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
-    if (resp != -1) {
+  protected boolean appendWithoutConfigChange(
+      long leaderCommit, long term, List<Entry> logs, AppendEntryResult result) {
+    boolean resp = logManager.maybeAppend(logs);
+    if (resp) {
       if (logger.isDebugEnabled()) {
         logger.debug(
             "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
       }
       result.status = Response.RESPONSE_STRONG_ACCEPT;
       result.setLastLogIndex(logManager.getLastLogIndex());
-      result.setLastLogTerm(logManager.getLastLogTerm());
-      member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
-
+      member.tryUpdateCommitIndex(term, leaderCommit, -1);
     } else {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
index 1d0bb8f56d..cccd152448 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
@@ -30,8 +30,7 @@ import java.util.List;
  */
 public interface LogAppender {
 
-  AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries);
+  AppendEntryResult appendEntries(long leaderCommit, long term, List<Entry> entries);
 
   void reset();
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 6157ab6ea1..39b58cd424 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
 import org.apache.iotdb.consensus.natraft.utils.Response;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
 import org.slf4j.Logger;
@@ -33,6 +34,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SlidingWindowLogAppender implements LogAppender {
 
@@ -42,18 +44,16 @@ public class SlidingWindowLogAppender implements LogAppender {
   private int windowLength = 0;
   private Entry[] logWindow;
   private long firstPosPrevIndex;
-  private long[] prevTerms;
-
   private RaftMember member;
   private RaftLogManager logManager;
   private RaftConfig config;
+  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   public SlidingWindowLogAppender(RaftMember member, RaftConfig config) {
     this.member = member;
     this.logManager = member.getLogManager();
     windowCapacity = config.getMaxNumOfLogsInMem();
     logWindow = new Entry[windowCapacity];
-    prevTerms = new long[windowCapacity];
     this.config = config;
     reset();
   }
@@ -69,8 +69,8 @@ public class SlidingWindowLogAppender implements LogAppender {
 
   private void checkLogPrev(int pos) {
     // check the previous entry
-    long prevLogTerm = prevTerms[pos];
-    if (pos > 0) {
+    long prevLogTerm = logWindow[pos].getPrevTerm();
+    if (pos > 0 && logWindow[pos - 1] != null) {
       Entry prev = logWindow[pos - 1];
       if (prev != null && prev.getCurrLogTerm() != prevLogTerm) {
         logWindow[pos - 1] = null;
@@ -82,8 +82,8 @@ public class SlidingWindowLogAppender implements LogAppender {
     // check the next entry
     Entry entry = logWindow[pos];
     boolean nextMismatch = false;
-    if (pos < windowCapacity - 1) {
-      long nextPrevTerm = prevTerms[pos + 1];
+    if (logWindow[pos + 1] != null && pos < windowCapacity - 1) {
+      long nextPrevTerm = logWindow[pos + 1].getPrevTerm();
       if (nextPrevTerm != entry.getCurrLogTerm()) {
         nextMismatch = true;
       }
@@ -106,9 +106,7 @@ public class SlidingWindowLogAppender implements LogAppender {
    * Flush window range [0, flushPos) into the LogManager, where flushPos is the first null position
    * in the window.
    */
-  private long flushWindow(AppendEntryResult result, long leaderCommit) {
-    long windowPrevLogIndex = firstPosPrevIndex;
-    long windowPrevLogTerm = prevTerms[0];
+  private boolean flushWindow(AppendEntryResult result) {
 
     int flushPos = 0;
     for (; flushPos < windowCapacity; flushPos++) {
@@ -126,14 +124,12 @@ public class SlidingWindowLogAppender implements LogAppender {
         logs.get(logs.size() - 1));
 
     long startWaitingTime = System.currentTimeMillis();
-    long success;
+    boolean success;
     while (true) {
       // TODO: Consider memory footprint to execute a precise rejection
       if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
           <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
-        success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, logs);
-        member.tryUpdateCommitIndex(
-            member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
+        success = logManager.maybeAppend(logs);
         break;
       }
       try {
@@ -141,24 +137,25 @@ public class SlidingWindowLogAppender implements LogAppender {
         if (System.currentTimeMillis() - startWaitingTime
             > config.getMaxWaitingTimeWhenInsertBlocked()) {
           result.status = Response.RESPONSE_TOO_BUSY;
-          return -1;
+          return false;
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
     }
-    if (success != -1) {
+    if (success) {
       moveWindowRightward(flushPos, logs.get(logs.size() - 1).getCurrLogIndex());
+      result.status = Response.RESPONSE_STRONG_ACCEPT;
+      return true;
+    } else {
+      result.status = Response.RESPONSE_LOG_MISMATCH;
+      logger.warn("Cannot flush the window to log");
+      return false;
     }
-    result.status = Response.RESPONSE_STRONG_ACCEPT;
-    result.setLastLogIndex(firstPosPrevIndex);
-    result.setLastLogTerm(logManager.getLastLogTerm());
-    return success;
   }
 
   private void moveWindowRightward(int step, long newIndex) {
     System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
-    System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
     for (int i = 1; i <= step; i++) {
       logWindow[windowCapacity - i] = null;
     }
@@ -168,7 +165,6 @@ public class SlidingWindowLogAppender implements LogAppender {
   private void moveWindowLeftward(int step) {
     int length = Math.max(windowCapacity - step, 0);
     System.arraycopy(logWindow, 0, logWindow, step, length);
-    System.arraycopy(prevTerms, 0, prevTerms, step, length);
     for (int i = 0; i < length; i++) {
       logWindow[i] = null;
     }
@@ -176,8 +172,7 @@ public class SlidingWindowLogAppender implements LogAppender {
   }
 
   @Override
-  public AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries) {
+  public AppendEntryResult appendEntries(long leaderCommit, long term, List<Entry> entries) {
     if (entries.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE)
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -185,51 +180,44 @@ public class SlidingWindowLogAppender implements LogAppender {
 
     AppendEntryResult result = null;
     for (Entry entry : entries) {
-      result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, entry);
+      result = appendEntry(leaderCommit, entry);
 
       if (result.status != Response.RESPONSE_AGREE
           && result.status != Response.RESPONSE_STRONG_ACCEPT
           && result.status != Response.RESPONSE_WEAK_ACCEPT) {
         return result;
       }
-      prevLogIndex = entry.getCurrLogIndex();
-      prevLogTerm = entry.getCurrLogTerm();
     }
 
     return result;
   }
 
-  private AppendEntryResult appendEntry(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, Entry entry) {
-    long appendedPos = 0;
+  private AppendEntryResult appendEntry(long leaderCommit, Entry entry) {
+    boolean appended = false;
 
     AppendEntryResult result = new AppendEntryResult();
+    long prevLogIndex = entry.getCurrLogIndex() - 1;
+    long startTime = Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.getOperationStartTime();
     synchronized (this) {
-      int windowPos = (int) (entry.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
+      Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.calOperationCostTimeFromStart(startTime);
+      int windowPos = (int) (prevLogIndex - firstPosPrevIndex);
       if (windowPos < 0) {
         // the new entry may replace an appended entry
-        appendedPos =
-            logManager.maybeAppend(prevLogIndex, prevLogTerm, Collections.singletonList(entry));
-        member.tryUpdateCommitIndex(
-            member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
-        result.status = Response.RESPONSE_STRONG_ACCEPT;
-        result.setLastLogIndex(logManager.getLastLogIndex());
-        result.setLastLogTerm(logManager.getLastLogTerm());
+        appended = logManager.maybeAppend(Collections.singletonList(entry));
         moveWindowLeftward(-windowPos);
+        result.status = Response.RESPONSE_STRONG_ACCEPT;
       } else if (windowPos < windowCapacity) {
         // the new entry falls into the window
         logWindow[windowPos] = entry;
-        prevTerms[windowPos] = prevLogTerm;
         if (windowLength < windowPos + 1) {
           windowLength = windowPos + 1;
         }
         checkLog(windowPos);
         if (windowPos == 0) {
-          appendedPos = flushWindow(result, leaderCommit);
+          appended = flushWindow(result);
         } else {
           result.status = Response.RESPONSE_WEAK_ACCEPT;
         }
-
       } else {
         result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
         result.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -237,9 +225,9 @@ public class SlidingWindowLogAppender implements LogAppender {
       }
     }
 
-    if (appendedPos == -1) {
-      // the incoming log points to an illegal position, reject it
-      result.status = Response.RESPONSE_LOG_MISMATCH;
+    if (appended) {
+      result.setLastLogIndex(logManager.getLastEntryIndexUnsafe());
+      member.tryUpdateCommitIndex(member.getStatus().getTerm().get(), leaderCommit, -1);
     }
     return result;
   }
@@ -247,9 +235,7 @@ public class SlidingWindowLogAppender implements LogAppender {
   @Override
   public void reset() {
     this.firstPosPrevIndex = logManager.getLastLogIndex();
-    this.prevTerms[0] = logManager.getLastLogTerm();
     logWindow = new Entry[windowCapacity];
-    prevTerms = new long[windowCapacity];
     windowLength = 0;
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
index c38e5fc260..ed2b4dddba 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
@@ -53,9 +53,11 @@ public class BaseApplier implements LogApplier {
       if (e instanceof RequestEntry) {
         RequestEntry requestLog = (RequestEntry) e;
         IConsensusRequest request = requestLog.getRequest();
-        TSStatus status = applyRequest(request);
-        if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          e.setException(new ConsensusException(status.message + ":" + status.code));
+        if (!member.getConfig().isIgnoreStateMachine()) {
+          TSStatus status = applyRequest(request);
+          if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            e.setException(new ConsensusException(status.message + ":" + status.code));
+          }
         }
       } else if (e instanceof ConfigChangeEntry) {
         member.applyConfigChange(((ConfigChangeEntry) e));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
index 13e770f9e7..454ab7a6c9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
@@ -79,18 +79,6 @@ public class LogCatchUpTask implements Callable<Boolean> {
     request.setTerm(raftMember.getStatus().getTerm().get());
 
     request.setEntries(logList);
-    // set index for raft
-    request.setPrevLogIndex(logs.get(startPos).getCurrLogIndex() - 1);
-    if (startPos != 0) {
-      request.setPrevLogTerm(logs.get(startPos - 1).getCurrLogTerm());
-    } else {
-      try {
-        request.setPrevLogTerm(
-            raftMember.getLogManager().getTerm(logs.get(0).getCurrLogIndex() - 1));
-      } catch (Exception e) {
-        logger.error("getTerm failed for newly append entries", e);
-      }
-    }
     logger.debug("{}, node={} catchup request={}", raftMember.getName(), node, request);
     return request;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 4d90e0d9b2..3c98156091 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
 import org.apache.thrift.TApplicationException;
@@ -48,7 +49,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
 
   protected RaftMember member;
-  protected VotingEntry log;
+  protected VotingEntry votingEntry;
   protected Peer directReceiver;
 
   public AppendNodeEntryHandler() {}
@@ -64,13 +65,16 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
             : directReceiver;
 
     logger.debug(
-        "{}: Append response {} from {} for log {}", member.getName(), response, trueReceiver, log);
+        "{}: Append response {} from {} for log {}",
+        member.getName(),
+        response,
+        trueReceiver,
+        votingEntry);
 
     long resp = response.status;
 
     if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
-      member.getVotingLogList().onStronglyAccept(log, trueReceiver);
-
+      member.getVotingLogList().onStronglyAccept(votingEntry, trueReceiver);
       member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
     } else if (resp > 0) {
       // a response > 0 is the follower's term
@@ -80,15 +84,18 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
           member.getName(),
           trueReceiver,
           resp,
-          log);
+          votingEntry);
       member.stepDown(resp, null);
-      synchronized (log) {
-        log.notifyAll();
+      synchronized (votingEntry.getEntry()) {
+        votingEntry.getEntry().notifyAll();
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
-      synchronized (log) {
-        log.addWeaklyAcceptedNodes(trueReceiver);
-        log.notifyAll();
+      votingEntry.getEntry().acceptedTime = System.nanoTime();
+      Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.add(
+          votingEntry.getEntry().acceptedTime - votingEntry.getEntry().createTime);
+      synchronized (votingEntry.getEntry()) {
+        votingEntry.addWeaklyAcceptedNodes(trueReceiver);
+        votingEntry.getEntry().notifyAll();
       }
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
@@ -96,14 +103,14 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
         logger.debug(
             "{}: The log {} is rejected by {} because: {}",
             member.getName(),
-            log,
+            votingEntry,
             trueReceiver,
             resp);
       } else {
         logger.warn(
             "{}: The log {} is rejected by {} because: {}",
             member.getName(),
-            log,
+            votingEntry,
             trueReceiver,
             resp);
       }
@@ -124,17 +131,21 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       logger.debug(
           "{}: Cannot append log {}: cannot connect to {}: {}",
           member.getName(),
-          log,
+          votingEntry,
           directReceiver,
           exception.getMessage());
     } else {
       logger.warn(
-          "{}: Cannot append log {} to {}", member.getName(), log, directReceiver, exception);
+          "{}: Cannot append log {} to {}",
+          member.getName(),
+          votingEntry,
+          directReceiver,
+          exception);
     }
   }
 
-  public void setLog(VotingEntry log) {
-    this.log = log;
+  public void setVotingEntry(VotingEntry votingEntry) {
+    this.votingEntry = votingEntry;
   }
 
   public void setMember(RaftMember member) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 5b5995064c..463f9f0748 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -75,8 +75,6 @@ public class LogDispatcher {
   protected Map<Peer, RateLimiter> nodesRateLimiter = new HashMap<>();
   protected Map<Peer, Double> nodesRate = new HashMap<>();
   protected Map<Peer, ExecutorService> executorServices = new HashMap<>();
-  protected ExecutorService resultHandlerThread =
-      IoTDBThreadPoolFactory.newFixedThreadPool(2, "AppendResultHandler");
   protected boolean queueOrdered;
   protected boolean enableCompressedDispatching;
   protected ICompressor compressor;
@@ -91,9 +89,6 @@ public class LogDispatcher {
     this.enableCompressedDispatching = config.isEnableCompressedDispatching();
     this.compressor = ICompressor.getCompressor(config.getDispatchingCompressionType());
     this.bindingThreadNum = config.getDispatcherBindingThreadNum();
-    if (!queueOrdered) {
-      maxBatchSize = 1;
-    }
     this.allNodes = member.getAllNodes();
     this.newNodes = member.getNewNodes();
     createQueueAndBindingThreads(unionNodes(allNodes, newNodes));
@@ -151,7 +146,6 @@ public class LogDispatcher {
         logger.warn("Cannot shut down dispatcher pool of {}-{}", member.getName(), entry.getKey());
       }
     }
-    resultHandlerThread.shutdownNow();
   }
 
   protected boolean addToQueue(BlockingQueue<VotingEntry> nodeLogQueue, VotingEntry request) {
@@ -238,7 +232,7 @@ public class LogDispatcher {
             VotingEntry poll = logBlockingDeque.poll();
             if (poll != null) {
               currBatch.add(poll);
-              logBlockingDeque.drainTo(currBatch, maxBatchSize);
+              logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
             } else {
               logBlockingDeque.wait(10);
               continue;
@@ -309,48 +303,28 @@ public class LogDispatcher {
       }
     }
 
-    protected AppendEntriesRequest prepareRequest(
-        List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
+    protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
       AppendEntriesRequest request = new AppendEntriesRequest();
 
       request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
       request.setLeader(member.getThisNode().getEndpoint());
       request.setLeaderId(member.getThisNode().getNodeId());
       request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-
       request.setTerm(member.getStatus().getTerm().get());
-
       request.setEntries(logList);
-      // set index for raft
-      request.setPrevLogIndex(currBatch.get(firstIndex).getEntry().getCurrLogIndex() - 1);
-      try {
-        request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
-      } catch (Exception e) {
-        logger.error("getTerm failed for newly append entries", e);
-      }
       return request;
     }
 
-    protected AppendCompressedEntriesRequest prepareCompressedRequest(
-        List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
+    protected AppendCompressedEntriesRequest prepareCompressedRequest(List<ByteBuffer> logList) {
       AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
 
       request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
       request.setLeader(member.getThisNode().getEndpoint());
       request.setLeaderId(member.getThisNode().getNodeId());
       request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-
       request.setTerm(member.getStatus().getTerm().get());
-
       request.setEntryBytes(LogUtils.compressEntries(logList, compressor));
       request.setCompressionType((byte) compressor.getType().ordinal());
-      // set index for raft
-      request.setPrevLogIndex(currBatch.get(firstIndex).getEntry().getCurrLogIndex() - 1);
-      try {
-        request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
-      } catch (Exception e) {
-        logger.error("getTerm failed for newly append entries", e);
-      }
       return request;
     }
 
@@ -383,11 +357,10 @@ public class LogDispatcher {
         }
 
         if (!enableCompressedDispatching) {
-          AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
+          AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
           appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
         } else {
-          AppendCompressedEntriesRequest appendEntriesRequest =
-              prepareCompressedRequest(logList, currBatch, prevIndex);
+          AppendCompressedEntriesRequest appendEntriesRequest = prepareCompressedRequest(logList);
           appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
         }
 
@@ -401,7 +374,7 @@ public class LogDispatcher {
     public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
       AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
       handler.setDirectReceiver(node);
-      handler.setLog(log);
+      handler.setVotingEntry(log);
       handler.setMember(member);
       return handler;
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 1e59fccc49..62cad1f6b8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -40,13 +40,14 @@ public class ConfigChangeEntry extends Entry {
   }
 
   @Override
-  public ByteBuffer serialize() {
+  protected ByteBuffer serializeInternal() {
     ByteArrayOutputStream byteArrayOutputStream =
         new ByteArrayOutputStream(getDefaultSerializationBufferSize());
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
+      dataOutputStream.writeLong(getPrevTerm());
 
       dataOutputStream.writeInt(oldPeers.size());
       for (Peer oldPeer : oldPeers) {
@@ -66,6 +67,7 @@ public class ConfigChangeEntry extends Entry {
   public void deserialize(ByteBuffer buffer) {
     setCurrLogIndex(buffer.getLong());
     setCurrLogTerm(buffer.getLong());
+    setPrevTerm(buffer.getLong());
 
     int size = buffer.getInt();
     oldPeers = new ArrayList<>(size);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
index 28a8724115..db6d7f40da 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
@@ -36,13 +36,14 @@ public class EmptyEntry extends Entry {
   }
 
   @Override
-  public ByteBuffer serialize() {
+  protected ByteBuffer serializeInternal() {
     ByteArrayOutputStream byteArrayOutputStream =
         new ByteArrayOutputStream(getDefaultSerializationBufferSize());
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
+      dataOutputStream.writeLong(getPrevTerm());
     } catch (IOException e) {
       // unreachable
     }
@@ -53,6 +54,7 @@ public class EmptyEntry extends Entry {
   public void deserialize(ByteBuffer buffer) {
     setCurrLogIndex(buffer.getLong());
     setCurrLogTerm(buffer.getLong());
+    setPrevTerm(buffer.getLong());
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index a81bb5924b..42f1a2659f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -47,13 +47,14 @@ public class RequestEntry extends Entry {
   }
 
   @Override
-  public ByteBuffer serialize() {
+  protected ByteBuffer serializeInternal() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultSerializationBufferSize());
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) CLIENT_REQUEST.ordinal());
 
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
+      dataOutputStream.writeLong(getPrevTerm());
 
       ByteBuffer byteBuffer = request.serializeToByteBuffer();
       byteBuffer.rewind();
@@ -80,6 +81,7 @@ public class RequestEntry extends Entry {
   public void deserialize(ByteBuffer buffer) {
     setCurrLogIndex(buffer.getLong());
     setCurrLogTerm(buffer.getLong());
+    setPrevTerm(buffer.getLong());
     int len = buffer.getInt();
     byte[] bytes = new byte[len];
     buffer.get(bytes);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 5f2c4f6f04..aee2fa76cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -249,12 +249,7 @@ public abstract class RaftLogManager {
    * @return lastIndex
    */
   public long getLastLogIndex() {
-    try {
-      lock.readLock().lock();
-      return entries.get(entries.size() - 1).getCurrLogIndex();
-    } finally {
-      lock.readLock().unlock();
-    }
+    return getLastEntry().getCurrLogIndex();
   }
 
   public Entry getLastEntry() {
@@ -266,6 +261,29 @@ public abstract class RaftLogManager {
     }
   }
 
+  public Entry getLastEntryUnsafe() {
+    while (true) {
+      if (entries.isEmpty()) {
+        logger.error("{} should at least have one entry", name);
+        return getLastEntry();
+      }
+      try {
+        return entries.get(entries.size() - 1);
+      } catch (IndexOutOfBoundsException e) {
+        // ignore
+      }
+    }
+  }
+
+  public long getLastEntryIndexUnsafe() {
+    Entry lastEntryUnsafe = getLastEntryUnsafe();
+    if (lastEntryUnsafe != null) {
+      return lastEntryUnsafe.getCurrLogIndex();
+    } else {
+      return getLastLogIndex();
+    }
+  }
+
   /**
    * Returns the term for given index.
    *
@@ -329,17 +347,23 @@ public abstract class RaftLogManager {
    * Used by follower node to support leader's complicated log replication rpc parameters and try to
    * commit entries.
    *
-   * @param lastIndex leader's matchIndex for this follower node
-   * @param lastTerm the entry's term which index is leader's matchIndex for this follower node
    * @param entries entries sent from the leader node Note that the leader must ensure
    *     entries[0].index = lastIndex + 1
    * @return -1 if the entries cannot be appended, otherwise the last index of new entries
    */
-  public long maybeAppend(long lastIndex, long lastTerm, List<Entry> entries) {
+  public boolean maybeAppend(List<Entry> entries) {
+    if (entries.isEmpty()) {
+      return true;
+    }
+
+    long lastIndex = entries.get(0).getCurrLogIndex() - 1;
+    long lastTerm = entries.get(0).getPrevTerm();
+    long startTime = Statistic.RAFT_RECEIVER_WAIT_LOCK.getOperationStartTime();
     try {
       lock.writeLock().lock();
+      Statistic.RAFT_RECEIVER_WAIT_LOCK.calOperationCostTimeFromStart(startTime);
+      startTime = Statistic.RAFT_RECEIVER_APPEND_INTERNAL.getOperationStartTime();
       if (matchTerm(lastTerm, lastIndex)) {
-        long newLastIndex = lastIndex + entries.size();
         long ci = findConflict(entries);
         if (ci <= commitIndex) {
           if (ci != -1) {
@@ -357,14 +381,25 @@ public abstract class RaftLogManager {
                   entries.size() - 1);
             }
           }
-
         } else {
           long offset = lastIndex + 1;
-          append(entries.subList((int) (ci - offset), entries.size()), false);
+          try {
+            append(entries.subList((int) (ci - offset), entries.size()), false);
+          } catch (IllegalArgumentException e) {
+            logger.error(
+                "Appending {}, ci {}, offset {}, lastIndex {}, localLast {}",
+                entries,
+                ci,
+                offset,
+                lastIndex,
+                getLastLogIndex());
+            throw e;
+          }
         }
-        return newLastIndex;
+        Statistic.RAFT_RECEIVER_APPEND_INTERNAL.calOperationCostTimeFromStart(startTime);
+        return true;
       }
-      return -1;
+      return false;
     } finally {
       lock.writeLock().unlock();
     }
@@ -378,7 +413,7 @@ public abstract class RaftLogManager {
    * @return the newly generated lastIndex
    */
   public long append(List<Entry> appendingEntries, boolean isLeader) {
-    if (entries.isEmpty()) {
+    if (appendingEntries.isEmpty()) {
       return getLastLogIndex();
     }
 
@@ -410,7 +445,7 @@ public abstract class RaftLogManager {
       }
     }
 
-    if (!isLeader) {
+    if (!isLeader && !config.isUseFollowerSlidingWindow()) {
       // log update condition is to inform follower appending threads that the log is updated, and
       // the leader does not concern it
       Object logUpdateCondition =
@@ -427,11 +462,12 @@ public abstract class RaftLogManager {
    * Used by leader node to try to commit entries.
    *
    * @param leaderCommit leader's commitIndex
-   * @param term the entry's term which index is leaderCommit in leader's log module
+   * @param term the entry's term which index is leaderCommit in leader's log module, if term is -1,
+   *     the commit is after a successful appending and unnecessary to check term
    * @return true or false
    */
   public boolean maybeCommit(long leaderCommit, long term) {
-    if (leaderCommit > commitIndex && matchTerm(term, leaderCommit)) {
+    if (leaderCommit > commitIndex && (term == -1 || matchTerm(term, leaderCommit))) {
       try {
         commitTo(leaderCommit);
       } catch (LogExecutionException e) {
@@ -597,6 +633,7 @@ public abstract class RaftLogManager {
         entry.committedTime = System.nanoTime();
         Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(entry.committedTime - entry.createTime);
       }
+      entry.setSerializationCache(null);
     }
   }
 
@@ -612,9 +649,12 @@ public abstract class RaftLogManager {
 
     try {
       lock.writeLock().lock();
+      long startTime = Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.getOperationStartTime();
       long lo = commitIndex + 1;
       long hi = newCommitIndex + 1;
+      long getLogStart = Statistic.RAFT_SENDER_GET_LOG_FOR_COMMIT.getOperationStartTime();
       List<Entry> entries = new ArrayList<>(getEntries(lo, hi));
+      Statistic.RAFT_SENDER_GET_LOG_FOR_COMMIT.calOperationCostTimeFromStart(getLogStart);
 
       if (entries.isEmpty()) {
         return;
@@ -629,6 +669,7 @@ public abstract class RaftLogManager {
       checkCompaction(entries);
       commitEntries(entries);
       applyEntries(entries);
+      Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.calOperationCostTimeFromStart(startTime);
     } finally {
       lock.writeLock().unlock();
     }
@@ -686,6 +727,9 @@ public abstract class RaftLogManager {
     }
     try {
       logApplier.apply(entry);
+      if (entry.createTime != 0) {
+        Statistic.LOG_DISPATCHER_FROM_CREATE_TO_APPLIER.add(System.nanoTime() - entry.createTime);
+      }
     } catch (Exception e) {
       entry.setException(e);
       entry.setApplied(true);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index e348541ce2..a9aaae7d83 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -1353,7 +1353,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         byte[] bytes = ReadWriteIOUtils.readBytes(bufferedInputStream, logBlockSize);
         ByteBuffer uncompressed = ByteBuffer.wrap(unCompressor.uncompress(bytes));
         while (uncompressed.remaining() > 0) {
-          Entry e = parser.parse(uncompressed);
+          Entry e = parser.parse(uncompressed, null);
           result.add(e);
         }
         currentReadOffset = currentReadOffset + Integer.BYTES + logBlockSize;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index e27fdb86c4..9287f3b7dd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -54,10 +54,14 @@ public class DirectorySnapshot extends Snapshot {
     serializeBase(dataOutputStream);
 
     try {
-      dataOutputStream.writeBytes(directory.getAbsolutePath());
+      byte[] bytes = directory.getAbsolutePath().getBytes(StandardCharsets.UTF_8);
+      dataOutputStream.writeInt(bytes.length);
+      dataOutputStream.write(bytes);
       dataOutputStream.writeInt(filePaths.size());
       for (Path filePath : filePaths) {
-        dataOutputStream.writeBytes(filePath.toString());
+        byte[] pathBytes = filePath.toString().getBytes(StandardCharsets.UTF_8);
+        dataOutputStream.writeInt(pathBytes.length);
+        dataOutputStream.write(pathBytes);
       }
     } catch (IOException e) {
       // unreachable
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index fced0f868a..564a3a954d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -81,7 +81,7 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
       throws UnknownLogTypeException {
     ConfigChangeEntry configChangeEntry = null;
     for (ByteBuffer entryBuffer : request.entries) {
-      Entry entry = LogParser.getINSTANCE().parse(entryBuffer);
+      Entry entry = LogParser.getINSTANCE().parse(entryBuffer, null);
       if (entry instanceof ConfigChangeEntry) {
         configChangeEntry = (ConfigChangeEntry) entry;
         break;
@@ -160,8 +160,6 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
     decompressedRequest
         .setTerm(request.getTerm())
         .setLeader(request.leader)
-        .setPrevLogIndex(request.prevLogIndex)
-        .setPrevLogTerm(request.prevLogTerm)
         .setLeaderCommit(request.leaderCommit)
         .setGroupId(request.groupId)
         .setLeaderId(request.leaderId);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index c536763f37..5c7e2d6659 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.utils;
 
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
@@ -62,21 +63,12 @@ public class LogUtils {
       entry.setByteSize(byteBuffer.array().length);
       request.entry = byteBuffer;
     }
-    try {
-      if (entry.getPrevTerm() != -1) {
-        request.setPrevLogTerm(entry.getPrevTerm());
-      } else {
-        request.setPrevLogTerm(member.getLogManager().getTerm(entry.getCurrLogIndex() - 1));
-      }
-    } catch (Exception e) {
-      logger.error("getTerm failed for newly append entries", e);
-    }
+
     request.setLeader(member.getThisNode().getEndpoint());
     request.setLeaderId(member.getThisNode().getNodeId());
     // don't need lock because even if it's larger than the commitIndex when appending this log to
     // logManager, the follower can handle the larger commitIndex with no effect
     request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-    request.setPrevLogIndex(entry.getCurrLogIndex() - 1);
     request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
 
     return request;
@@ -138,14 +130,16 @@ public class LogUtils {
     return buffers;
   }
 
-  public static List<Entry> parseEntries(List<ByteBuffer> buffers) throws UnknownLogTypeException {
+  public static List<Entry> parseEntries(List<ByteBuffer> buffers, IStateMachine stateMachine)
+      throws UnknownLogTypeException {
     List<Entry> entries = new ArrayList<>();
     for (ByteBuffer buffer : buffers) {
       buffer.mark();
       Entry e;
       try {
-        e = LogParser.getINSTANCE().parse(buffer);
-        e.setByteSize(buffer.limit() - buffer.position());
+        e = LogParser.getINSTANCE().parse(buffer, stateMachine);
+        buffer.reset();
+        e.setSerializationCache(buffer);
       } catch (BufferUnderflowException ex) {
         buffer.reset();
         throw ex;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index 3a35b9dab6..9631b892fd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -202,6 +202,24 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_WAIT_FOR_WINDOW(
+        RAFT_MEMBER_RECEIVER,
+        "receiver wait for window",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_WAIT_LOCK(
+        RAFT_MEMBER_RECEIVER,
+        "receiver wait for lock",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_APPEND_INTERNAL(
+        RAFT_MEMBER_RECEIVER,
+        "append entry (internal)",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_RECEIVER_APPEND_ENTRY(
         RAFT_MEMBER_RECEIVER,
         "append entrys",
@@ -290,18 +308,36 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_GET_LOG_FOR_COMMIT(
+        LOG_DISPATCHER,
+        "get log for commit",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_SENDER_LOG_FROM_CREATE_TO_READY_COMMIT(
         LOG_DISPATCHER,
         "from create to ready commit",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_COMMIT_HOLD_LOCK(
+        LOG_DISPATCHER,
+        "commit hold lock",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
         LOG_DISPATCHER,
         "from create to committed",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_CREATE_TO_APPLIER(
+        LOG_DISPATCHER,
+        "from create to applier",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_CREATE_TO_APPLIED(
         LOG_DISPATCHER,
         "from create to applied",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 7bbb716f14..dc2ee8e971 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -108,6 +108,7 @@ import java.util.Map;
 
 /** Convert SQL and RPC requests to {@link Statement}. */
 public class StatementGenerator {
+
   // TODO @spricoder optimize the method adding metrics
   public static Statement createStatement(String sql, ZoneId zoneId) {
     return invokeParser(sql, zoneId);
@@ -278,13 +279,15 @@ public class StatementGenerator {
     insertStatement.setDevicePath(new PartialPath(insertTabletReq.getPrefixPath()));
     insertStatement.setMeasurements(insertTabletReq.getMeasurements().toArray(new String[0]));
     insertStatement.setTimes(
-        QueryDataSetUtils.readTimesFromBuffer(insertTabletReq.timestamps, insertTabletReq.size));
+        QueryDataSetUtils.readTimesFromBuffer(
+            insertTabletReq.timestamps, insertTabletReq.size, insertTabletReq.compression));
     insertStatement.setColumns(
         QueryDataSetUtils.readTabletValuesFromBuffer(
             insertTabletReq.values,
             insertTabletReq.types,
             insertTabletReq.types.size(),
-            insertTabletReq.size));
+            insertTabletReq.size,
+            insertTabletReq.compression));
     insertStatement.setBitMaps(
         QueryDataSetUtils.readBitMapsFromBuffer(
             insertTabletReq.values, insertTabletReq.types.size(), insertTabletReq.size));
@@ -310,13 +313,15 @@ public class StatementGenerator {
       insertTabletStatement.setDevicePath(new PartialPath(req.prefixPaths.get(i)));
       insertTabletStatement.setMeasurements(req.measurementsList.get(i).toArray(new String[0]));
       insertTabletStatement.setTimes(
-          QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
+          QueryDataSetUtils.readTimesFromBuffer(
+              req.timestampsList.get(i), req.sizeList.get(i), req.compression));
       insertTabletStatement.setColumns(
           QueryDataSetUtils.readTabletValuesFromBuffer(
               req.valuesList.get(i),
               req.typesList.get(i),
               req.measurementsList.get(i).size(),
-              req.sizeList.get(i)));
+              req.sizeList.get(i),
+              req.compression));
       insertTabletStatement.setBitMaps(
           QueryDataSetUtils.readBitMapsFromBuffer(
               req.valuesList.get(i), req.measurementsList.get(i).size(), req.sizeList.get(i)));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index c023792b95..19e157493f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -699,7 +699,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
 
     rowCount = buffer.getInt();
     times = new long[rowCount];
-    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount, null);
 
     boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
     if (hasBitMaps) {
@@ -969,7 +969,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
 
     rowCount = buffer.getInt();
     times = new long[rowCount];
-    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount, null);
 
     boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
     if (hasBitMaps) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 6baee72738..5b21eb2fa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.utils;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -271,7 +273,8 @@ public class QueryDataSetUtils {
     return new Pair<>(res, !queryExecution.hasNextResult());
   }
 
-  public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
+  public static long[] readTimesFromBuffer(ByteBuffer buffer, int size, String compression) {
+    buffer = uncompressBuffer(buffer, compression);
     long[] times = new long[size];
     for (int i = 0; i < size; i++) {
       times[i] = buffer.getLong();
@@ -324,8 +327,32 @@ public class QueryDataSetUtils {
     return bitMaps;
   }
 
+  public static ByteBuffer uncompressBuffer(ByteBuffer buffer, String compression) {
+    if (compression != null) {
+      IUnCompressor unCompressor =
+          IUnCompressor.getUnCompressor(CompressionType.valueOf(compression));
+      try {
+        int uncompressedLength =
+            unCompressor.getUncompressedLength(
+                buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+        byte[] uncompressedBuffer = new byte[uncompressedLength];
+        unCompressor.uncompress(
+            buffer.array(),
+            buffer.arrayOffset() + buffer.position(),
+            buffer.remaining(),
+            uncompressedBuffer,
+            0);
+        return ByteBuffer.wrap(uncompressedBuffer);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return buffer;
+  }
+
   public static Object[] readTabletValuesFromBuffer(
-      ByteBuffer buffer, List<Integer> types, int columns, int size) {
+      ByteBuffer buffer, List<Integer> types, int columns, int size, String compression) {
+    buffer = uncompressBuffer(buffer, compression);
     TSDataType[] dataTypes = new TSDataType[types.size()];
     for (int i = 0; i < dataTypes.length; i++) {
       dataTypes[i] = TSDataType.values()[types.get(i)];
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java b/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java
index 43adca20b2..02ec81bbbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java
@@ -20,24 +20,22 @@ package org.apache.iotdb.db.utils;
 
 import java.io.Closeable;
 
-import static java.util.Objects.requireNonNull;
-
 public class SetThreadName implements Closeable {
-  private final String originalThreadName;
+  // private final String originalThreadName;
 
   public SetThreadName(String suffix) {
-    requireNonNull(suffix, "suffix is null");
-    originalThreadName = Thread.currentThread().getName();
-    int index = originalThreadName.indexOf("$");
-    if (index < 0) {
-      Thread.currentThread().setName(originalThreadName + '$' + suffix);
-    } else {
-      Thread.currentThread().setName(originalThreadName.substring(0, index) + '$' + suffix);
-    }
+    //    requireNonNull(suffix, "suffix is null");
+    //    originalThreadName = Thread.currentThread().getName();
+    //    int index = originalThreadName.indexOf("$");
+    //    if (index < 0) {
+    //      Thread.currentThread().setName(originalThreadName + '$' + suffix);
+    //    } else {
+    //      Thread.currentThread().setName(originalThreadName.substring(0, index) + '$' + suffix);
+    //    }
   }
 
   @Override
   public void close() {
-    Thread.currentThread().setName(originalThreadName);
+    //    Thread.currentThread().setName(originalThreadName);
   }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5043ef89e4..d3edee11d5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -141,6 +141,7 @@ public class Session implements ISession {
 
   // The version number of the client which used for compatibility in the server
   protected Version version;
+  protected CompressionType compressionType = CompressionType.SNAPPY;
 
   public Session(String host, int rpcPort) {
     this(
@@ -2509,9 +2510,19 @@ public class Session implements ISession {
 
     request.setPrefixPath(tablet.deviceId);
     request.setIsAligned(isAligned);
-    request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
-    request.setValues(SessionUtils.getValueBuffer(tablet));
-    request.setSize(tablet.rowSize);
+    try {
+      request.setTimestamps(SessionUtils.getTimeBuffer(tablet, compressionType));
+      request.setValues(SessionUtils.getValueBuffer(tablet, compressionType));
+      request.setSize(tablet.rowSize);
+      if (compressionType != CompressionType.UNCOMPRESSED) {
+        request.setCompression(compressionType.toString());
+      }
+    } catch (IOException e) {
+      request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
+      request.setValues(SessionUtils.getValueBuffer(tablet));
+      request.setSize(tablet.rowSize);
+    }
+
     return request;
   }
 
diff --git a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 32ed19add9..0bbd3d0cb9 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.session.util;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -33,6 +35,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -272,4 +275,24 @@ public class SessionUtils {
       throw new NumberFormatException("NodeUrl Incorrect format");
     }
   }
+
+  public static byte[] getTimeBuffer(Tablet tablet, CompressionType compressionType)
+      throws IOException {
+    ByteBuffer timeBuffer = getTimeBuffer(tablet);
+    ICompressor compressor = ICompressor.getCompressor(compressionType);
+    return compressor.compress(
+        timeBuffer.array(),
+        timeBuffer.arrayOffset() + timeBuffer.position(),
+        timeBuffer.remaining());
+  }
+
+  public static byte[] getValueBuffer(Tablet tablet, CompressionType compressionType)
+      throws IOException {
+    ByteBuffer timeBuffer = getValueBuffer(tablet);
+    ICompressor compressor = ICompressor.getCompressor(compressionType);
+    return compressor.compress(
+        timeBuffer.array(),
+        timeBuffer.arrayOffset() + timeBuffer.position(),
+        timeBuffer.remaining());
+  }
 }
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index 67466b2080..95eaeb441d 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -24,32 +24,27 @@ struct AppendEntriesRequest {
   1: required i64 term // leader's
   2: required common.TEndPoint leader
   3: required list<binary> entries // data
-  4: required i64 prevLogIndex
-  5: required i64 prevLogTerm
-  6: required i64 leaderCommit
-  7: required common.TConsensusGroupId groupId
-  8: required i32 leaderId
+  4: required i64 leaderCommit
+  5: required common.TConsensusGroupId groupId
+  6: required i32 leaderId
 }
 
 struct AppendCompressedEntriesRequest {
   1: required i64 term // leader's
   2: required common.TEndPoint leader
   3: required binary entryBytes // data
-  4: required i64 prevLogIndex
-  5: required i64 prevLogTerm
-  6: required i64 leaderCommit
-  7: required common.TConsensusGroupId groupId
-  8: required i32 leaderId
-  9: required i8 compressionType
+  4: required i64 leaderCommit
+  5: required common.TConsensusGroupId groupId
+  6: required i32 leaderId
+  7: required i8 compressionType
 }
 
 struct AppendEntryResult {
   1: required i64 status;
-  2: optional i64 lastLogTerm;
-  3: optional i64 lastLogIndex;
-  4: optional common.TConsensusGroupId groupId;
-  5: optional common.TEndPoint receiver;
-  6: optional i32 receiverId;
+  2: optional i64 lastLogIndex;
+  3: optional common.TConsensusGroupId groupId;
+  4: optional common.TEndPoint receiver;
+  5: optional i32 receiverId;
 }
 
 // leader -> follower
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 357589db82..e888554fb0 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -233,6 +233,7 @@ struct TSInsertTabletReq {
   6: required list<i32> types
   7: required i32 size
   8: optional bool isAligned
+  9: optional string compression
 }
 
 struct TSInsertTabletsReq {
@@ -244,6 +245,7 @@ struct TSInsertTabletsReq {
   6: required list<list<i32>> typesList
   7: required list<i32> sizeList
   8: optional bool isAligned
+  9: optional string compression
 }
 
 struct TSInsertRecordsReq {