You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by sp...@apache.org on 2023/02/02 06:07:14 UTC

[iotdb] branch master updated: [IOTDB-5257] Optimize SyncLog Logic in IoT Consensus (#8863)

This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c06988797 [IOTDB-5257] Optimize SyncLog Logic in IoT Consensus (#8863)
5c06988797 is described below

commit 5c069887972f8332b10df41e2f1634b41d8de338
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Thu Feb 2 14:07:08 2023 +0800

    [IOTDB-5257] Optimize SyncLog Logic in IoT Consensus (#8863)
    
    * Refactor syncLog Logic
    
    * Fix the create of statemachine
    
    * Optimize comment
    
    * Fix deserialize
    
    * Fix format
    
    * Fix the implementation of getPlanNode
    
    * Optimize implementation
    
    * move base implementation to BaseStateMachine
    
    * Add deserialize
---
 .../statemachine/ConfigNodeRegionStateMachine.java |  20 +-
 .../org/apache/iotdb/consensus/IStateMachine.java  |  39 ++--
 .../DeserializedBatchIndexedConsensusRequest.java  |  65 ++++++
 .../iotdb/consensus/config/IoTConsensusConfig.java |  15 ++
 .../consensus/iot/IoTConsensusServerImpl.java      | 122 +++++++++-
 .../iot/client/IoTConsensusClientPool.java         |   4 +-
 .../iot/logdispatcher/IndexController.java         |   3 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |   4 +-
 .../consensus/iot/logdispatcher/SyncStatus.java    |   4 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |   6 +-
 .../ratis/ApplicationStateMachineProxy.java        |   4 +-
 .../iotdb/consensus/simple/SimpleServerImpl.java   |   5 +
 .../apache/iotdb/consensus/EmptyStateMachine.java  |   5 +
 .../iotdb/consensus/iot/util/TestStateMachine.java |  29 ++-
 .../apache/iotdb/consensus/ratis/TestUtils.java    |  13 +-
 .../consensus/simple/SimpleConsensusTest.java      |   5 +
 .../db/consensus/DataRegionConsensusImpl.java      |  20 +-
 .../consensus/statemachine/BaseStateMachine.java   |   9 +-
 .../statemachine/DataRegionStateMachine.java       | 245 ++++-----------------
 .../IoTConsensusDataRegionStateMachine.java        | 111 ++++++++++
 .../statemachine/SchemaRegionStateMachine.java     |  11 +-
 21 files changed, 484 insertions(+), 255 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
index db16cb9c83..16d75de30b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -96,28 +97,35 @@ public class ConfigNodeRegionStateMachine
 
   @Override
   public TSStatus write(IConsensusRequest request) {
-    ConfigPhysicalPlan plan;
+    return Optional.ofNullable(request)
+        .map(o -> write((ConfigPhysicalPlan) request))
+        .orElseGet(() -> new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+  }
+
+  @Override
+  public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+    IConsensusRequest result;
     if (request instanceof ByteBufferConsensusRequest) {
       try {
-        plan = ConfigPhysicalPlan.Factory.create(request.serializeToByteBuffer());
+        result = ConfigPhysicalPlan.Factory.create(request.serializeToByteBuffer());
       } catch (Throwable e) {
         LOGGER.error(
             "Deserialization error for write plan, request: {}, bytebuffer: {}",
             request,
             request.serializeToByteBuffer(),
             e);
-        return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        return null;
       }
     } else if (request instanceof ConfigPhysicalPlan) {
-      plan = (ConfigPhysicalPlan) request;
+      result = request;
     } else {
       LOGGER.error(
           "Unexpected write plan, request: {}, bytebuffer: {}",
           request,
           request.serializeToByteBuffer());
-      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      return null;
     }
-    return write(plan);
+    return result;
   }
 
   /** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 75f50721f9..056c897aff 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -47,18 +47,26 @@ public interface IStateMachine {
   }
 
   /**
-   * apply a write-request from user
+   * apply a deserialized write-request from user.
    *
-   * @param IConsensusRequest write request
+   * @param request deserialized request
    */
-  TSStatus write(IConsensusRequest IConsensusRequest);
+  TSStatus write(IConsensusRequest request);
 
   /**
-   * read local data and return
+   * deserialize IConsensusRequest.
    *
-   * @param IConsensusRequest read request
+   * @param request write request
+   * @return deserialized request
    */
-  DataSet read(IConsensusRequest IConsensusRequest);
+  IConsensusRequest deserializeRequest(IConsensusRequest request);
+
+  /**
+   * read local data and return.
+   *
+   * @param request read request
+   */
+  DataSet read(IConsensusRequest request);
 
   /**
    * Take a snapshot of current statemachine. All files are required to be stored under snapshotDir,
@@ -83,7 +91,7 @@ public interface IStateMachine {
   }
 
   /**
-   * Load the latest snapshot from given dir
+   * Load the latest snapshot from given dir.
    *
    * @param latestSnapshotRootDir dir where the latest snapshot sits
    */
@@ -110,14 +118,13 @@ public interface IStateMachine {
    * retry the operation until it succeed.
    */
   interface RetryPolicy {
-
-    /** Given the last write result, should we retry? */
+    /** whether we should retry according to the last write result. */
     default boolean shouldRetry(TSStatus writeResult) {
       return false;
     }
 
     /**
-     * Use the latest write result to update final write result
+     * Use the latest write result to update final write result.
      *
      * @param previousResult previous write result
      * @param retryResult latest write result
@@ -128,13 +135,13 @@ public interface IStateMachine {
     }
 
     /**
-     * sleep time before the next retry
+     * sleep time before the next retry.
      *
      * @return time in millis
      */
     default long getSleepTime() {
       return 100;
-    };
+    }
   }
 
   /** An optional API for event notifications. */
@@ -146,7 +153,9 @@ public interface IStateMachine {
      * @param groupId The id of this consensus group.
      * @param newLeaderId The id of the new leader node.
      */
-    default void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {}
+    default void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
+      // do nothing default
+    }
 
     /**
      * Notify the {@link IStateMachine} a configuration change. This method will be invoked when a
@@ -156,7 +165,9 @@ public interface IStateMachine {
      * @param index index which is being updated
      * @param newConfiguration new configuration
      */
-    default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {}
+    default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {
+      // do nothing default
+    }
   }
 
   /**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
new file mode 100644
index 0000000000..abddbf25b2
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeserializedBatchIndexedConsensusRequest
+    implements IConsensusRequest, Comparable<DeserializedBatchIndexedConsensusRequest> {
+  private final long startSyncIndex;
+  private final long endSyncIndex;
+  private final List<IConsensusRequest> insertNodes;
+
+  public DeserializedBatchIndexedConsensusRequest(
+      long startSyncIndex, long endSyncIndex, int size) {
+    this.startSyncIndex = startSyncIndex;
+    this.endSyncIndex = endSyncIndex;
+    // use arraylist here because we know the number of requests
+    this.insertNodes = new ArrayList<>(size);
+  }
+
+  public long getStartSyncIndex() {
+    return startSyncIndex;
+  }
+
+  public long getEndSyncIndex() {
+    return endSyncIndex;
+  }
+
+  public List<IConsensusRequest> getInsertNodes() {
+    return insertNodes;
+  }
+
+  public void add(IConsensusRequest insertNode) {
+    this.insertNodes.add(insertNode);
+  }
+
+  @Override
+  public int compareTo(DeserializedBatchIndexedConsensusRequest o) {
+    return Long.compare(startSyncIndex, o.startSyncIndex);
+  }
+
+  @Override
+  public ByteBuffer serializeToByteBuffer() {
+    return null;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index ae070852fc..9c676fdaa9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -235,6 +235,7 @@ public class IoTConsensusConfig {
     private final int maxLogEntriesNumPerBatch;
     private final int maxSizePerBatch;
     private final int maxPendingBatchesNum;
+    private final long maxWaitingTimeForWaitBatchInMs;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
     private final long maxRetryWaitTimeMs;
@@ -248,6 +249,7 @@ public class IoTConsensusConfig {
         int maxLogEntriesNumPerBatch,
         int maxSizePerBatch,
         int maxPendingBatchesNum,
+        long maxWaitingTimeForWaitBatchInMs,
         int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
         long maxRetryWaitTimeMs,
@@ -259,6 +261,7 @@ public class IoTConsensusConfig {
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
+      this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
       this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
       this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
@@ -281,6 +284,10 @@ public class IoTConsensusConfig {
       return maxPendingBatchesNum;
     }
 
+    public long getMaxWaitingTimeForWaitBatchInMs() {
+      return maxWaitingTimeForWaitBatchInMs;
+    }
+
     public int getMaxWaitingTimeForAccumulatingBatchInMs() {
       return maxWaitingTimeForAccumulatingBatchInMs;
     }
@@ -324,6 +331,7 @@ public class IoTConsensusConfig {
       // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
       // in DataRegionStateMachine
       private int maxPendingBatchesNum = 5;
+      private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
@@ -348,6 +356,12 @@ public class IoTConsensusConfig {
         return this;
       }
 
+      public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
+          long maxWaitingTimeForWaitBatchInMs) {
+        this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
+        return this;
+      }
+
       public Replication.Builder setMaxWaitingTimeForAccumulatingBatchInMs(
           int maxWaitingTimeForAccumulatingBatchInMs) {
         this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
@@ -394,6 +408,7 @@ public class IoTConsensusConfig {
             maxLogEntriesNumPerBatch,
             maxSizePerBatch,
             maxPendingBatchesNum,
+            maxWaitingTimeForWaitBatchInMs,
             maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
             maxRetryWaitTimeMs,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index f8a4316db9..e24478179a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
@@ -75,7 +76,10 @@ import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -93,6 +97,7 @@ public class IoTConsensusServerImpl {
 
   private final Peer thisNode;
   private final IStateMachine stateMachine;
+  private final ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
   private final Lock stateMachineLock = new ReentrantLock();
   private final Condition stateMachineCondition = stateMachineLock.newCondition();
   private final String storageDir;
@@ -121,6 +126,7 @@ public class IoTConsensusServerImpl {
     this.storageDir = storageDir;
     this.thisNode = thisNode;
     this.stateMachine = stateMachine;
+    this.cacheQueueMap = new ConcurrentHashMap<>();
     this.syncClientManager = syncClientManager;
     this.configuration = configuration;
     if (configuration.isEmpty()) {
@@ -155,7 +161,8 @@ public class IoTConsensusServerImpl {
   }
 
   /**
-   * records the index of the log and writes locally, and then asynchronous replication is performed
+   * records the index of the log and writes locally, and then asynchronous replication is
+   * performed.
    */
   public TSStatus write(IConsensusRequest request) {
     long consensusWriteStartTime = System.currentTimeMillis();
@@ -216,7 +223,8 @@ public class IoTConsensusServerImpl {
             indexedConsensusRequest.getSearchIndex());
       }
       // TODO wal and memtable
-      TSStatus result = stateMachine.write(indexedConsensusRequest);
+      IConsensusRequest planNode = stateMachine.deserializeRequest(indexedConsensusRequest);
+      TSStatus result = stateMachine.write(planNode);
       long writeToStateMachineEndTime = System.currentTimeMillis();
       // statistic the time of writing request into stateMachine
       MetricService.getInstance()
@@ -491,7 +499,8 @@ public class IoTConsensusServerImpl {
           // after current operation
           // TODO: (xingtanzjr) design more reliable way for IoTConsensus
           logger.error(
-              "cannot notify {} to build sync log channel. Please check the status of this node manually",
+              "cannot notify {} to build sync log channel. "
+                  + "Please check the status of this node manually",
               peer,
               e);
         }
@@ -800,4 +809,111 @@ public class IoTConsensusServerImpl {
       reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
     }
   }
+
+  public TSStatus syncLog(String sourcePeerId, IConsensusRequest request) {
+    return cacheQueueMap
+        .computeIfAbsent(sourcePeerId, SyncLogCacheQueue::new)
+        .cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest) request);
+  }
+
+  /**
+   * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write order
+   * in follower the same as the leader. And besides order insurance, we can make the
+   * deserialization of PlanNode to be concurrent
+   */
+  private class SyncLogCacheQueue {
+    private final String sourcePeerId;
+    private final Lock queueLock = new ReentrantLock();
+    private final Condition queueSortCondition = queueLock.newCondition();
+    private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache;
+    private long nextSyncIndex = -1;
+
+    public SyncLogCacheQueue(String sourcePeerId) {
+      this.sourcePeerId = sourcePeerId;
+      this.requestCache = new PriorityQueue<>();
+    }
+
+    /**
+     * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write
+     * order in follower the same as the leader. And besides order insurance, we can make the
+     * deserialization of PlanNode to be concurrent
+     */
+    private TSStatus cacheAndInsertLatestNode(DeserializedBatchIndexedConsensusRequest request) {
+      queueLock.lock();
+      try {
+        requestCache.add(request);
+        // If the peek is not hold by current thread, it should notify the corresponding thread to
+        // process the peek when the queue is full
+        if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
+            && requestCache.peek() != null
+            && requestCache.peek().getStartSyncIndex() != request.getStartSyncIndex()) {
+          queueSortCondition.signalAll();
+        }
+        while (true) {
+          // If current InsertNode is the next target InsertNode, write it
+          if (request.getStartSyncIndex() == nextSyncIndex) {
+            requestCache.remove(request);
+            nextSyncIndex = request.getEndSyncIndex() + 1;
+            break;
+          }
+          // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+          // the peek request. This is used to keep the whole write correct when nextSyncIndex
+          // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+          // There are some cases that nextSyncIndex is not set:
+          //   1. When the system was just started
+          //   2. When some exception occurs during SyncLog
+          if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
+              && requestCache.peek() != null
+              && requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
+            requestCache.remove();
+            nextSyncIndex = request.getEndSyncIndex() + 1;
+            break;
+          }
+          try {
+            boolean timeout =
+                !queueSortCondition.await(
+                    config.getReplication().getMaxWaitingTimeForWaitBatchInMs(),
+                    TimeUnit.MILLISECONDS);
+            if (timeout) {
+              // although the timeout is triggered, current thread cannot write its request
+              // if current thread does not hold the peek request. And there should be some
+              // other thread who hold the peek request. In this scenario, current thread
+              // should go into await again and wait until its request becoming peek request
+              if (requestCache.peek() != null
+                  && requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
+                // current thread hold the peek request thus it can write the peek immediately.
+                logger.info(
+                    "waiting target request timeout. current index: {}, target index: {}",
+                    request.getStartSyncIndex(),
+                    nextSyncIndex);
+                requestCache.remove(request);
+                break;
+              }
+            }
+          } catch (InterruptedException e) {
+            logger.warn(
+                "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+                request.getStartSyncIndex(),
+                e);
+            Thread.currentThread().interrupt();
+          }
+        }
+        logger.debug(
+            "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+            sourcePeerId,
+            consensusGroupId,
+            requestCache.size(),
+            request.getStartSyncIndex(),
+            request.getEndSyncIndex());
+        List<TSStatus> subStatus = new LinkedList<>();
+        for (IConsensusRequest insertNode : request.getInsertNodes()) {
+          subStatus.add(stateMachine.write(insertNode));
+        }
+        queueSortCondition.signalAll();
+        return new TSStatus().setSubStatus(subStatus);
+      } finally {
+        queueLock.unlock();
+      }
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index 90e5aed12c..92b2cc28cf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -32,7 +32,9 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 
 public class IoTConsensusClientPool {
 
-  private IoTConsensusClientPool() {}
+  private IoTConsensusClientPool() {
+    // empty constructor
+  }
 
   public static class SyncIoTConsensusServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, SyncIoTConsensusServiceClient> {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 7630849928..116a5f3c96 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -126,7 +126,8 @@ public class IndexController {
         // because it won't infect the correctness
         logger.info(
             "failed to flush sync index because previous version file {} does not exists. "
-                + "It may be caused by the target Peer is removed from current group. target file is {}",
+                + "It may be caused by the target Peer is removed from current group. "
+                + "target file is {}",
             oldFile.getAbsolutePath(),
             newFile.getAbsolutePath());
       }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 93de711b76..883952275a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -234,7 +234,7 @@ public class LogDispatcher {
       return bufferedEntries.size();
     }
 
-    /** try to offer a request into queue with memory control */
+    /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
       if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
         return false;
@@ -254,7 +254,7 @@ public class LogDispatcher {
       return success;
     }
 
-    /** try to remove a request from queue with memory control */
+    /** try to remove a request from queue with memory control. */
     private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
       iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize());
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 891961d70b..3f0c1477d0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -39,7 +39,7 @@ public class SyncStatus {
     this.config = config;
   }
 
-  /** we may block here if the synchronization pipeline is full */
+  /** we may block here if the synchronization pipeline is full. */
   public void addNextBatch(Batch batch) throws InterruptedException {
     synchronized (this) {
       while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
@@ -86,7 +86,7 @@ public class SyncStatus {
     iotConsensusMemoryManager.free(size);
   }
 
-  /** Gets the first index that is not currently synchronized */
+  /** Gets the first index that is not currently synchronized. */
   public long getNextSendingIndex() {
     // we do not use ReentrantReadWriteLock because there will be only one thread reading this field
     synchronized (this) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 6b24f596a2..0921d0a962 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
 import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.iot.IoTConsensus;
@@ -114,7 +115,10 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy
                             : ByteBufferConsensusRequest::new)
                     .collect(Collectors.toList())));
       }
-      TSStatus writeStatus = impl.getStateMachine().write(logEntriesInThisBatch);
+      IConsensusRequest deserializedRequest =
+          impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
+      TSStatus writeStatus =
+          impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest);
       logger.debug(
           "execute TSyncLogEntriesReq for {} with result {}",
           req.consensusGroupId,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index c262996089..10acca5386 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -134,7 +134,9 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
         if (!firstTry) {
           Thread.sleep(retryPolicy.getSleepTime());
         }
-        TSStatus result = applicationStateMachine.write(applicationRequest);
+        IConsensusRequest deserializedRequest =
+            applicationStateMachine.deserializeRequest(applicationRequest);
+        TSStatus result = applicationStateMachine.write(deserializedRequest);
 
         if (firstTry) {
           finalStatus = result;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
index bb3ef2f557..3f2b68857c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
@@ -67,6 +67,11 @@ public class SimpleServerImpl implements IStateMachine {
     return stateMachine.write(request);
   }
 
+  @Override
+  public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+    return request;
+  }
+
   @Override
   public DataSet read(IConsensusRequest request) {
     return stateMachine.read(request);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 01992f472f..4580a865c2 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -38,6 +38,11 @@ public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi
     return new TSStatus(0);
   }
 
+  @Override
+  public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+    return request;
+  }
+
   @Override
   public DataSet read(IConsensusRequest IConsensusRequest) {
     return null;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
index 91e614bb72..2aa22b1616 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
@@ -76,12 +77,12 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
       if (request instanceof IndexedConsensusRequest) {
         writeOneRequest((IndexedConsensusRequest) request);
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-      } else if (request instanceof BatchIndexedConsensusRequest) {
-        BatchIndexedConsensusRequest batchIndexedConsensusRequest =
-            (BatchIndexedConsensusRequest) request;
+      } else if (request instanceof DeserializedBatchIndexedConsensusRequest) {
+        DeserializedBatchIndexedConsensusRequest batchIndexedConsensusRequest =
+            (DeserializedBatchIndexedConsensusRequest) request;
         List<TSStatus> subStatus = new ArrayList<>();
-        for (IndexedConsensusRequest innerRequest : batchIndexedConsensusRequest.getRequests()) {
-          writeOneRequest(innerRequest);
+        for (IConsensusRequest innerRequest : batchIndexedConsensusRequest.getInsertNodes()) {
+          writeOneRequest((IndexedConsensusRequest) innerRequest);
           subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
         }
         return new TSStatus().setSubStatus(subStatus);
@@ -92,6 +93,24 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
     }
   }
 
+  @Override
+  public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+    if (request instanceof BatchIndexedConsensusRequest) {
+      BatchIndexedConsensusRequest consensusRequest = (BatchIndexedConsensusRequest) request;
+      DeserializedBatchIndexedConsensusRequest result =
+          new DeserializedBatchIndexedConsensusRequest(
+              consensusRequest.getStartSyncIndex(),
+              consensusRequest.getEndSyncIndex(),
+              consensusRequest.getRequests().size());
+      for (IndexedConsensusRequest r : consensusRequest.getRequests()) {
+        result.add(r);
+      }
+      return result;
+    } else {
+      return request;
+    }
+  }
+
   private void writeOneRequest(IndexedConsensusRequest indexedConsensusRequest) {
     List<IConsensusRequest> transformedRequest = new ArrayList<>();
     for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 0c66c38330..03682b0312 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -89,16 +89,21 @@ public class TestUtils {
 
     @Override
     public TSStatus write(IConsensusRequest request) {
+      if (((TestRequest) request).isIncr()) {
+        integer.incrementAndGet();
+      }
+      return new TSStatus(200);
+    }
+
+    @Override
+    public IConsensusRequest deserializeRequest(IConsensusRequest request) {
       TestRequest testRequest;
       if (request instanceof ByteBufferConsensusRequest) {
         testRequest = new TestRequest(request.serializeToByteBuffer());
       } else {
         testRequest = (TestRequest) request;
       }
-      if (testRequest.isIncr()) {
-        integer.incrementAndGet();
-      }
-      return new TSStatus(200);
+      return testRequest;
     }
 
     @Override
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index 8bea3db3d5..558c8bdf4f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -108,6 +108,11 @@ public class SimpleConsensusTest {
       return new TSStatus();
     }
 
+    @Override
+    public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+      return request;
+    }
+
     @Override
     public DataSet read(IConsensusRequest request) {
       return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 104fbc087f..905a8275c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
@@ -31,7 +32,9 @@ import org.apache.iotdb.consensus.config.RatisConfig.Snapshot;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.IoTConsensusDataRegionStateMachine;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -48,7 +51,9 @@ public class DataRegionConsensusImpl {
 
   private static IConsensus INSTANCE = null;
 
-  private DataRegionConsensusImpl() {}
+  private DataRegionConsensusImpl() {
+    // do nothing
+  }
 
   // need to create instance before calling this method
   public static IConsensus getInstance() {
@@ -171,9 +176,7 @@ public class DataRegionConsensusImpl {
                                       .build())
                               .build())
                       .build(),
-                  gid ->
-                      new DataRegionStateMachine(
-                          StorageEngine.getInstance().getDataRegion((DataRegionId) gid)))
+                  DataRegionConsensusImpl::createDataRegionStateMachine)
               .orElseThrow(
                   () ->
                       new IllegalArgumentException(
@@ -183,4 +186,13 @@ public class DataRegionConsensusImpl {
     }
     return INSTANCE;
   }
+
+  private static DataRegionStateMachine createDataRegionStateMachine(ConsensusGroupId gid) {
+    DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
+    if (ConsensusFactory.IOT_CONSENSUS.equals(conf.getDataRegionConsensusProtocolClass())) {
+      return new IoTConsensusDataRegionStateMachine(dataRegion);
+    } else {
+      return new DataRegionStateMachine(dataRegion);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index f36429e702..5c9b4016ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,11 +22,9 @@ package org.apache.iotdb.db.consensus.statemachine;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.wal.buffer.WALEntry;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,12 +47,15 @@ public abstract class BaseStateMachine
     return instance;
   }
 
+  @Override
+  public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+    return getPlanNode(request);
+  }
+
   protected PlanNode getPlanNode(IConsensusRequest request) {
     PlanNode node;
     if (request instanceof ByteBufferConsensusRequest) {
       node = PlanNodeType.deserialize(request.serializeToByteBuffer());
-    } else if (request instanceof IoTConsensusRequest) {
-      node = WALEntry.deserializeForConsensus(request.serializeToByteBuffer());
     } else if (request instanceof PlanNode) {
       node = (PlanNode) request;
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index c2c8bd0b7e..f7a47cbc37 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
@@ -55,14 +54,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 public class DataRegionStateMachine extends BaseStateMachine {
@@ -72,24 +64,21 @@ public class DataRegionStateMachine extends BaseStateMachine {
   private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER =
       FragmentInstanceManager.getInstance();
 
-  private DataRegion region;
-
-  private static final int MAX_REQUEST_CACHE_SIZE = 5;
-  private static final long CACHE_WINDOW_TIME_IN_MS =
-      IoTDBDescriptor.getInstance().getConfig().getCacheWindowTimeInMs();
-
-  private ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
+  protected DataRegion region;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.cacheQueueMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  public void start() {}
+  public void start() {
+    // do nothing
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    // do nothing
+  }
 
   @Override
   public boolean isReadOnly() {
@@ -151,146 +140,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
-  /**
-   * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write order
-   * in follower the same as the leader. And besides order insurance, we can make the
-   * deserialization of PlanNode to be concurrent
-   */
-  private class SyncLogCacheQueue {
-    private final String sourcePeerId;
-    private final Lock queueLock = new ReentrantLock();
-    private final Condition queueSortCondition = queueLock.newCondition();
-    private final PriorityQueue<InsertNodeWrapper> requestCache;
-    private long nextSyncIndex = -1;
-
-    public SyncLogCacheQueue(String sourcePeerId, int queueSize, long timeout) {
-      this.sourcePeerId = sourcePeerId;
-      this.requestCache = new PriorityQueue<>();
-    }
-
-    /**
-     * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write
-     * order in follower the same as the leader. And besides order insurance, we can make the
-     * deserialization of PlanNode to be concurrent
-     */
-    private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
-      queueLock.lock();
-      try {
-        requestCache.add(insertNodeWrapper);
-        // If the peek is not hold by current thread, it should notify the corresponding thread to
-        // process the peek when the queue is full
-        if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
-            && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
-          queueSortCondition.signalAll();
-        }
-        while (true) {
-          // If current InsertNode is the next target InsertNode, write it
-          if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
-            requestCache.remove(insertNodeWrapper);
-            nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
-            break;
-          }
-          // If all write thread doesn't hit nextSyncIndex and the heap is full, write
-          // the peek request. This is used to keep the whole write correct when nextSyncIndex
-          // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
-          // There are some cases that nextSyncIndex is not set:
-          //   1. When the system was just started
-          //   2. When some exception occurs during SyncLog
-          if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
-              && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
-            requestCache.remove();
-            nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
-            break;
-          }
-          try {
-            boolean timeout =
-                !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
-            if (timeout) {
-              // although the timeout is triggered, current thread cannot write its request
-              // if current thread does not hold the peek request. And there should be some
-              // other thread who hold the peek request. In this scenario, current thread
-              // should go into await again and wait until its request becoming peek request
-              if (requestCache.peek().getStartSyncIndex()
-                  == insertNodeWrapper.getStartSyncIndex()) {
-                // current thread hold the peek request thus it can write the peek immediately.
-                logger.info(
-                    "waiting target request timeout. current index: {}, target index: {}",
-                    insertNodeWrapper.getStartSyncIndex(),
-                    nextSyncIndex);
-                requestCache.remove(insertNodeWrapper);
-                break;
-              }
-            }
-          } catch (InterruptedException e) {
-            logger.warn(
-                "current waiting is interrupted. SyncIndex: {}. Exception: {}",
-                insertNodeWrapper.getStartSyncIndex(),
-                e);
-            Thread.currentThread().interrupt();
-          }
-        }
-        logger.debug(
-            "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
-            sourcePeerId,
-            region.getDataRegionId(),
-            requestCache.size(),
-            insertNodeWrapper.getStartSyncIndex(),
-            insertNodeWrapper.getEndSyncIndex());
-        List<TSStatus> subStatus = new LinkedList<>();
-        for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
-          subStatus.add(write(planNode));
-        }
-        queueSortCondition.signalAll();
-        return new TSStatus().setSubStatus(subStatus);
-      } finally {
-        queueLock.unlock();
-      }
-    }
-  }
-
-  private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
-    private final long startSyncIndex;
-    private final long endSyncIndex;
-    private final List<PlanNode> insertNodes;
-
-    public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
-      this.startSyncIndex = startSyncIndex;
-      this.endSyncIndex = endSyncIndex;
-      this.insertNodes = new LinkedList<>();
-    }
-
-    @Override
-    public int compareTo(InsertNodeWrapper o) {
-      return Long.compare(startSyncIndex, o.startSyncIndex);
-    }
-
-    public void add(PlanNode insertNode) {
-      this.insertNodes.add(insertNode);
-    }
-
-    public long getStartSyncIndex() {
-      return startSyncIndex;
-    }
-
-    public long getEndSyncIndex() {
-      return endSyncIndex;
-    }
-
-    public List<PlanNode> getInsertNodes() {
-      return insertNodes;
-    }
-  }
-
-  private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
-    InsertNodeWrapper insertNodeWrapper =
-        new InsertNodeWrapper(batchRequest.getStartSyncIndex(), batchRequest.getEndSyncIndex());
-    for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
-      insertNodeWrapper.add(grabInsertNode(indexedRequest));
-    }
-    return insertNodeWrapper;
-  }
-
-  private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+  protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
     List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
     for (IConsensusRequest req : indexedRequest.getRequests()) {
       // PlanNode in IndexedConsensusRequest should always be InsertNode
@@ -305,63 +155,22 @@ public class DataRegionStateMachine extends BaseStateMachine {
         return planNode;
       } else {
         throw new IllegalArgumentException(
-            "PlanNodes in IndexedConsensusRequest are not InsertNode and the size of requests are larger than 1");
+            "PlanNodes in IndexedConsensusRequest are not InsertNode and "
+                + "the size of requests are larger than 1");
       }
     }
     return mergeInsertNodes(insertNodes);
   }
 
-  @Override
-  public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
-    try {
-      return new SnapshotLoader(
-              latestSnapshotRootDir.getAbsolutePath(),
-              region.getDatabaseName(),
-              region.getDataRegionId())
-          .getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
-    } catch (IOException e) {
-      logger.error(
-          "Meets error when getting snapshot files for {}-{}",
-          region.getDatabaseName(),
-          region.getDataRegionId(),
-          e);
-      return null;
-    }
-  }
-
-  @Override
-  public TSStatus write(IConsensusRequest request) {
-    PlanNode planNode;
-    try {
-      if (request instanceof IndexedConsensusRequest) {
-        IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
-        planNode = grabInsertNode(indexedRequest);
-      } else if (request instanceof BatchIndexedConsensusRequest) {
-        InsertNodeWrapper insertNodeWrapper =
-            deserializeAndWrap((BatchIndexedConsensusRequest) request);
-        String sourcePeerId = ((BatchIndexedConsensusRequest) request).getSourcePeerId();
-        return cacheQueueMap
-            .computeIfAbsent(
-                sourcePeerId,
-                k -> new SyncLogCacheQueue(k, MAX_REQUEST_CACHE_SIZE, CACHE_WINDOW_TIME_IN_MS))
-            .cacheAndInsertLatestNode(insertNodeWrapper);
-      } else {
-        planNode = getPlanNode(request);
-      }
-      return write(planNode);
-    } catch (IllegalArgumentException e) {
-      logger.error(e.getMessage(), e);
-      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-    }
-  }
-
   /**
    * Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
    * merged to one multi-tablet). <br>
    * Notice: the continuity of insert nodes sharing same search index should be protected by the
    * upper layer.
+   *
+   * @exception RuntimeException when insertNodes is empty
    */
-  private InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
+  protected InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
     int size = insertNodes.size();
     if (size == 0) {
       throw new RuntimeException();
@@ -407,6 +216,34 @@ public class DataRegionStateMachine extends BaseStateMachine {
     return result;
   }
 
+  @Override
+  public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
+    try {
+      return new SnapshotLoader(
+              latestSnapshotRootDir.getAbsolutePath(),
+              region.getDatabaseName(),
+              region.getDataRegionId())
+          .getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
+    } catch (IOException e) {
+      logger.error(
+          "Meets error when getting snapshot files for {}-{}",
+          region.getDatabaseName(),
+          region.getDataRegionId(),
+          e);
+      return null;
+    }
+  }
+
+  @Override
+  public TSStatus write(IConsensusRequest request) {
+    try {
+      return write((PlanNode) request);
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+  }
+
   protected TSStatus write(PlanNode planNode) {
     return planNode.accept(new DataExecutionVisitor(), region);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
new file mode 100644
index 0000000000..26d52eed22
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.statemachine;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class IoTConsensusDataRegionStateMachine extends DataRegionStateMachine {
+
+  public Logger logger = LoggerFactory.getLogger(IoTConsensusDataRegionStateMachine.class);
+
+  public IoTConsensusDataRegionStateMachine(DataRegion region) {
+    super(region);
+  }
+
+  @Override
+  public TSStatus write(IConsensusRequest request) {
+    try {
+      if (request instanceof DeserializedBatchIndexedConsensusRequest) {
+        List<TSStatus> subStatus = new LinkedList<>();
+        for (IConsensusRequest consensusRequest :
+            ((DeserializedBatchIndexedConsensusRequest) request).getInsertNodes()) {
+          subStatus.add(write((PlanNode) consensusRequest));
+        }
+        return new TSStatus().setSubStatus(subStatus);
+      } else {
+        return write((PlanNode) request);
+      }
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+  }
+
+  /**
+   * Deserialize request to PlanNode or BatchedIndexedRequest
+   *
+   * @param request write request
+   */
+  @Override
+  public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+    IConsensusRequest result;
+    if (request instanceof IndexedConsensusRequest) {
+      IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
+      result = grabInsertNode(indexedRequest);
+    } else if (request instanceof BatchIndexedConsensusRequest) {
+      BatchIndexedConsensusRequest batchRequest = (BatchIndexedConsensusRequest) request;
+      DeserializedBatchIndexedConsensusRequest deserializedRequest =
+          new DeserializedBatchIndexedConsensusRequest(
+              batchRequest.getStartSyncIndex(),
+              batchRequest.getEndSyncIndex(),
+              batchRequest.getRequests().size());
+      for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
+        deserializedRequest.add(grabInsertNode(indexedRequest));
+      }
+      result = deserializedRequest;
+    } else {
+      result = getPlanNode(request);
+    }
+    return result;
+  }
+
+  @Override
+  protected PlanNode getPlanNode(IConsensusRequest request) {
+    PlanNode node;
+    if (request instanceof ByteBufferConsensusRequest) {
+      node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+    } else if (request instanceof IoTConsensusRequest) {
+      node = WALEntry.deserializeForConsensus(request.serializeToByteBuffer());
+    } else if (request instanceof PlanNode) {
+      node = (PlanNode) request;
+    } else {
+      logger.error("Unexpected IConsensusRequest : {}", request);
+      throw new IllegalArgumentException("Unexpected IConsensusRequest!");
+    }
+    return node;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index c324b35d07..9b20fbb13f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -47,10 +48,14 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public void start() {}
+  public void start() {
+    // do nothing
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    // do nothing
+  }
 
   @Override
   public boolean isReadOnly() {
@@ -70,7 +75,7 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
   @Override
   public TSStatus write(IConsensusRequest request) {
     try {
-      return getPlanNode(request).accept(new SchemaExecutionVisitor(), schemaRegion);
+      return ((PlanNode) request).accept(new SchemaExecutionVisitor(), schemaRegion);
     } catch (IllegalArgumentException e) {
       logger.error(e.getMessage(), e);
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());