You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/08 08:08:58 UTC

[iotdb] branch ml_0808_test_exp1_parallel created (now 1d63c3e688)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 1d63c3e688 change to parallel write in follower side

This branch includes the following new commits:

     new 1d63c3e688 change to parallel write in follower side

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: change to parallel write in follower side

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1d63c3e688feafd2000d52c102f97e44a3a6bc63
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 8 16:08:45 2022 +0800

    change to parallel write in follower side
---
 .../org/apache/iotdb/consensus/IStateMachine.java  |   7 --
 .../iotdb/consensus/config/MultiLeaderConfig.java  |   2 +-
 .../service/MultiLeaderRPCServiceProcessor.java    |  24 ++---
 .../consensus/standalone/StandAloneServerImpl.java |  11 --
 .../apache/iotdb/consensus/EmptyStateMachine.java  |   9 --
 .../multileader/util/TestStateMachine.java         |   7 --
 .../apache/iotdb/consensus/ratis/TestUtils.java    |   7 --
 .../standalone/StandAloneConsensusTest.java        |   8 --
 .../consensus/statemachine/BaseStateMachine.java   |  12 ---
 .../statemachine/DataRegionStateMachine.java       | 119 +++++++++------------
 .../plan/planner/plan/node/write/InsertNode.java   |  10 ++
 11 files changed, 74 insertions(+), 142 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 3b5e6b1796..203b7d2a6b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -25,10 +25,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -45,9 +41,6 @@ public interface IStateMachine {
 
   void stop();
 
-  void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler);
-
   /**
    * apply a write-request from user
    *
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 4a95a85661..1a9b4dac9f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -249,7 +249,7 @@ public class MultiLeaderConfig {
     public static class Builder {
       private int maxPendingRequestNumPerNode = 200;
       private int maxRequestPerBatch = 40;
-      private int maxPendingBatch = 1;
+      private int maxPendingBatch = 5;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index ad1304a608..2e00442e37 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 
 public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -71,7 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
         return;
       }
-      List<IndexedConsensusRequest> indexedConsensusRequests = new LinkedList<>();
+      List<TSStatus> statuses = new ArrayList<>();
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
         List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -83,9 +81,11 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
                   : new ByteBufferConsensusRequest(batch.data);
           // merge TLogBatch with same search index into one request
           if (batch.getSearchIndex() != currentSearchIndex) {
-            indexedConsensusRequests.add(
-                impl.buildIndexedConsensusRequestForRemoteRequest(
-                    currentSearchIndex, consensusRequests));
+            statuses.add(
+                impl.getStateMachine()
+                    .write(
+                        impl.buildIndexedConsensusRequestForRemoteRequest(
+                            currentSearchIndex, consensusRequests)));
             consensusRequests = new ArrayList<>();
             currentSearchIndex = batch.getSearchIndex();
           }
@@ -93,14 +93,12 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         }
         // write last request
         if (!consensusRequests.isEmpty()) {
-          indexedConsensusRequests.add(
-              impl.buildIndexedConsensusRequestForRemoteRequest(
-                  currentSearchIndex, consensusRequests));
+          statuses.add(
+              impl.getStateMachine()
+                  .write(
+                      impl.buildIndexedConsensusRequestForRemoteRequest(
+                          currentSearchIndex, consensusRequests)));
         }
-        long followerWriteRequestStartTime = System.nanoTime();
-        impl.getStateMachine().multiLeaderWriteAsync(indexedConsensusRequests, resultHandler);
-        StepTracker.trace(
-            "followerWriteRequest", 25, followerWriteRequestStartTime, System.nanoTime());
       }
     } catch (Exception e) {
       resultHandler.onError(e);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index cd57b252ec..8438a5fbe4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -24,13 +24,8 @@ import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
 
 import java.io.File;
-import java.util.List;
 
 public class StandAloneServerImpl implements IStateMachine {
 
@@ -62,12 +57,6 @@ public class StandAloneServerImpl implements IStateMachine {
     stateMachine.stop();
   }
 
-  @Override
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public TSStatus write(IConsensusRequest request) {
     return stateMachine.write(request);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 86aeddd19c..01992f472f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -22,13 +22,8 @@ package org.apache.iotdb.consensus;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
 
 import java.io.File;
-import java.util.List;
 
 public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi {
 
@@ -38,10 +33,6 @@ public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi
   @Override
   public void stop() {}
 
-  @Override
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
   @Override
   public TSStatus write(IConsensusRequest IConsensusRequest) {
     return new TSStatus(0);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index dc7b31f831..eab940cf87 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -25,14 +25,11 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.async.AsyncMethodCallback;
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -68,10 +65,6 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
   @Override
   public void stop() {}
 
-  @Override
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
   @Override
   public TSStatus write(IConsensusRequest request) {
     synchronized (requestSets) {
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 72e741e6c9..d383fe902b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -27,10 +27,7 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,10 +86,6 @@ public class TestUtils {
     @Override
     public void stop() {}
 
-    @Override
-    public void multiLeaderWriteAsync(
-        List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
     @Override
     public TSStatus write(IConsensusRequest request) {
       TestRequest testRequest;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index b9438177f5..9e0e13be6d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -41,10 +40,8 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 
 import org.apache.ratis.util.FileUtils;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,7 +50,6 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -101,10 +97,6 @@ public class StandAloneConsensusTest {
     @Override
     public void stop() {}
 
-    @Override
-    public void multiLeaderWriteAsync(
-        List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
     @Override
     public TSStatus write(IConsensusRequest request) {
       if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 27d696129f..3bd74bd91a 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -23,31 +23,19 @@ import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.NotSupportedException;
-
-import java.util.List;
-
 public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
 
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
-    throw new NotSupportedException();
-  }
-
   protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4744163a83..4f53926670 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -44,14 +44,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -64,13 +63,15 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private DataRegion region;
 
-  private static final int MAX_REQUEST_CACHE_SIZE = 1;
+  private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
-  private final PriorityQueue<InsertNodeWrapper> requestCache;
+
+  private final PriorityQueue<InsertNode> requestCache;
+  private long nextSyncIndex = -1;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.requestCache = new PriorityQueue<>();
+    this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
   }
 
   @Override
@@ -115,31 +116,49 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
-  private InsertNodeWrapper cacheAndGetLatestInsertNode(
-      long syncIndex,
-      List<InsertNode> insertNodes,
-      AsyncMethodCallback<TSyncLogRes> resultHandler) {
+  private TSStatus cacheAndInsertLatestNode(long syncIndex, InsertNode insertNode) {
+    long cacheRequestStartTime = System.nanoTime();
+    insertNode.setSyncIndex(syncIndex);
     synchronized (requestCache) {
-      requestCache.add(new InsertNodeWrapper(syncIndex, insertNodes, resultHandler));
+      requestCache.add(insertNode);
+      // If the peek is not hold by current thread, it should notify the corresponding thread to
+      // process the peek when the queue is full
       if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
-        return requestCache.poll();
+        requestCache.notifyAll();
       }
-      return null;
+      while (true) {
+        if (insertNode.getSyncIndex() == nextSyncIndex) {
+          requestCache.remove(insertNode);
+          nextSyncIndex++;
+          break;
+        }
+        if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+            && requestCache.peek().getSyncIndex() == insertNode.getSyncIndex()) {
+          requestCache.remove();
+          nextSyncIndex = insertNode.getSyncIndex() + 1;
+          break;
+        }
+        try {
+          requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime());
+      logger.info("queue size {}, syncIndex = {}", requestCache.size(), insertNode.getSyncIndex());
+      TSStatus tsStatus = write(insertNode);
+      requestCache.notifyAll();
+      return tsStatus;
     }
   }
 
   private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
     private final long syncIndex;
-    private final List<InsertNode> insertNodes;
-    private final AsyncMethodCallback<TSyncLogRes> resultHandler;
+    private final InsertNode insertNode;
 
-    public InsertNodeWrapper(
-        long syncIndex,
-        List<InsertNode> insertNode,
-        AsyncMethodCallback<TSyncLogRes> resultHandler) {
+    public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
       this.syncIndex = syncIndex;
-      this.insertNodes = insertNode;
-      this.resultHandler = resultHandler;
+      this.insertNode = insertNode;
     }
 
     @Override
@@ -151,51 +170,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
       return syncIndex;
     }
 
-    public List<InsertNode> getInsertNodes() {
-      return insertNodes;
-    }
-
-    public AsyncMethodCallback<TSyncLogRes> getResultHandler() {
-      return resultHandler;
-    }
-  }
-
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
-    long prepareStartTime = System.nanoTime();
-    List<TSStatus> statuses = new LinkedList<>();
-    try {
-      List<InsertNode> insertNodesInAllRequests = new LinkedList<>();
-      for (IndexedConsensusRequest indexedRequest : requests) {
-        List<InsertNode> insertNodesInOneRequest =
-            new ArrayList<>(indexedRequest.getRequests().size());
-        for (IConsensusRequest req : indexedRequest.getRequests()) {
-          // PlanNode in IndexedConsensusRequest should always be InsertNode
-          InsertNode innerNode = (InsertNode) getPlanNode(req);
-          innerNode.setSearchIndex(indexedRequest.getSearchIndex());
-          insertNodesInOneRequest.add(innerNode);
-        }
-        insertNodesInAllRequests.add(mergeInsertNodes(insertNodesInOneRequest));
-      }
-      long startTime = System.nanoTime();
-      InsertNodeWrapper insertNodeWrapper =
-          cacheAndGetLatestInsertNode(
-              requests.get(0).getSyncIndex(), insertNodesInAllRequests, resultHandler);
-      StepTracker.trace("cacheAndGet", 25, startTime, System.nanoTime());
-      StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime());
-      long writeStartTime = System.nanoTime();
-      if (insertNodeWrapper != null) {
-        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
-          statuses.add(write(insertNode));
-        }
-        insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
-      } else {
-        logger.error("insertNodeWrapper is null");
-      }
-      StepTracker.trace("followerWriteInsert", 25, writeStartTime, System.nanoTime());
-    } catch (IllegalArgumentException e) {
-      logger.error(e.getMessage(), e);
-      resultHandler.onError(e);
+    public InsertNode getInsertNode() {
+      return insertNode;
     }
   }
 
@@ -212,7 +188,16 @@ public class DataRegionStateMachine extends BaseStateMachine {
           innerNode.setSearchIndex(indexedRequest.getSearchIndex());
           insertNodes.add(innerNode);
         }
-        planNode = mergeInsertNodes(insertNodes);
+        if (indexedRequest.getSearchIndex() == ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
+
+          TSStatus status =
+              cacheAndInsertLatestNode(
+                  indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+
+          return status;
+        } else {
+          planNode = mergeInsertNodes(insertNodes);
+        }
       } else {
         planNode = getPlanNode(request);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index f5b5bddb52..787dfbf9ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode {
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
+  protected long syncIndex = NO_CONSENSUS_INDEX;
+
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
@@ -153,6 +155,14 @@ public abstract class InsertNode extends WritePlanNode {
     return searchIndex;
   }
 
+  public void setSyncIndex(long syncIndex) {
+    this.syncIndex = syncIndex;
+  }
+
+  public long getSyncIndex() {
+    return syncIndex;
+  }
+
   /** Search index should start from 1 */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;