You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/08 08:08:59 UTC

[iotdb] 01/01: change to parallel write in follower side

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1d63c3e688feafd2000d52c102f97e44a3a6bc63
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 8 16:08:45 2022 +0800

    change to parallel write in follower side
---
 .../org/apache/iotdb/consensus/IStateMachine.java  |   7 --
 .../iotdb/consensus/config/MultiLeaderConfig.java  |   2 +-
 .../service/MultiLeaderRPCServiceProcessor.java    |  24 ++---
 .../consensus/standalone/StandAloneServerImpl.java |  11 --
 .../apache/iotdb/consensus/EmptyStateMachine.java  |   9 --
 .../multileader/util/TestStateMachine.java         |   7 --
 .../apache/iotdb/consensus/ratis/TestUtils.java    |   7 --
 .../standalone/StandAloneConsensusTest.java        |   8 --
 .../consensus/statemachine/BaseStateMachine.java   |  12 ---
 .../statemachine/DataRegionStateMachine.java       | 119 +++++++++------------
 .../plan/planner/plan/node/write/InsertNode.java   |  10 ++
 11 files changed, 74 insertions(+), 142 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 3b5e6b1796..203b7d2a6b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -25,10 +25,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -45,9 +41,6 @@ public interface IStateMachine {
 
   void stop();
 
-  void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler);
-
   /**
    * apply a write-request from user
    *
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 4a95a85661..1a9b4dac9f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -249,7 +249,7 @@ public class MultiLeaderConfig {
     public static class Builder {
       private int maxPendingRequestNumPerNode = 200;
       private int maxRequestPerBatch = 40;
-      private int maxPendingBatch = 1;
+      private int maxPendingBatch = 5;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index ad1304a608..2e00442e37 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 
 public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -71,7 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
         return;
       }
-      List<IndexedConsensusRequest> indexedConsensusRequests = new LinkedList<>();
+      List<TSStatus> statuses = new ArrayList<>();
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
         List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -83,9 +81,11 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
                   : new ByteBufferConsensusRequest(batch.data);
           // merge TLogBatch with same search index into one request
           if (batch.getSearchIndex() != currentSearchIndex) {
-            indexedConsensusRequests.add(
-                impl.buildIndexedConsensusRequestForRemoteRequest(
-                    currentSearchIndex, consensusRequests));
+            statuses.add(
+                impl.getStateMachine()
+                    .write(
+                        impl.buildIndexedConsensusRequestForRemoteRequest(
+                            currentSearchIndex, consensusRequests)));
             consensusRequests = new ArrayList<>();
             currentSearchIndex = batch.getSearchIndex();
           }
@@ -93,14 +93,12 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         }
         // write last request
         if (!consensusRequests.isEmpty()) {
-          indexedConsensusRequests.add(
-              impl.buildIndexedConsensusRequestForRemoteRequest(
-                  currentSearchIndex, consensusRequests));
+          statuses.add(
+              impl.getStateMachine()
+                  .write(
+                      impl.buildIndexedConsensusRequestForRemoteRequest(
+                          currentSearchIndex, consensusRequests)));
         }
-        long followerWriteRequestStartTime = System.nanoTime();
-        impl.getStateMachine().multiLeaderWriteAsync(indexedConsensusRequests, resultHandler);
-        StepTracker.trace(
-            "followerWriteRequest", 25, followerWriteRequestStartTime, System.nanoTime());
       }
     } catch (Exception e) {
       resultHandler.onError(e);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index cd57b252ec..8438a5fbe4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -24,13 +24,8 @@ import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
 
 import java.io.File;
-import java.util.List;
 
 public class StandAloneServerImpl implements IStateMachine {
 
@@ -62,12 +57,6 @@ public class StandAloneServerImpl implements IStateMachine {
     stateMachine.stop();
   }
 
-  @Override
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public TSStatus write(IConsensusRequest request) {
     return stateMachine.write(request);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 86aeddd19c..01992f472f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -22,13 +22,8 @@ package org.apache.iotdb.consensus;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
 
 import java.io.File;
-import java.util.List;
 
 public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi {
 
@@ -38,10 +33,6 @@ public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi
   @Override
   public void stop() {}
 
-  @Override
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
   @Override
   public TSStatus write(IConsensusRequest IConsensusRequest) {
     return new TSStatus(0);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index dc7b31f831..eab940cf87 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -25,14 +25,11 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.async.AsyncMethodCallback;
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -68,10 +65,6 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
   @Override
   public void stop() {}
 
-  @Override
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
   @Override
   public TSStatus write(IConsensusRequest request) {
     synchronized (requestSets) {
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 72e741e6c9..d383fe902b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -27,10 +27,7 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,10 +86,6 @@ public class TestUtils {
     @Override
     public void stop() {}
 
-    @Override
-    public void multiLeaderWriteAsync(
-        List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
     @Override
     public TSStatus write(IConsensusRequest request) {
       TestRequest testRequest;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index b9438177f5..9e0e13be6d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -41,10 +40,8 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 
 import org.apache.ratis.util.FileUtils;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,7 +50,6 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -101,10 +97,6 @@ public class StandAloneConsensusTest {
     @Override
     public void stop() {}
 
-    @Override
-    public void multiLeaderWriteAsync(
-        List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
     @Override
     public TSStatus write(IConsensusRequest request) {
       if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 27d696129f..3bd74bd91a 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -23,31 +23,19 @@ import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.NotSupportedException;
-
-import java.util.List;
-
 public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
 
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
-    throw new NotSupportedException();
-  }
-
   protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4744163a83..4f53926670 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -44,14 +44,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -64,13 +63,15 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private DataRegion region;
 
-  private static final int MAX_REQUEST_CACHE_SIZE = 1;
+  private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
-  private final PriorityQueue<InsertNodeWrapper> requestCache;
+
+  private final PriorityQueue<InsertNode> requestCache;
+  private long nextSyncIndex = -1;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.requestCache = new PriorityQueue<>();
+    this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
   }
 
   @Override
@@ -115,31 +116,49 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
-  private InsertNodeWrapper cacheAndGetLatestInsertNode(
-      long syncIndex,
-      List<InsertNode> insertNodes,
-      AsyncMethodCallback<TSyncLogRes> resultHandler) {
+  private TSStatus cacheAndInsertLatestNode(long syncIndex, InsertNode insertNode) {
+    long cacheRequestStartTime = System.nanoTime();
+    insertNode.setSyncIndex(syncIndex);
     synchronized (requestCache) {
-      requestCache.add(new InsertNodeWrapper(syncIndex, insertNodes, resultHandler));
+      requestCache.add(insertNode);
+      // If the peek is not hold by current thread, it should notify the corresponding thread to
+      // process the peek when the queue is full
       if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
-        return requestCache.poll();
+        requestCache.notifyAll();
       }
-      return null;
+      while (true) {
+        if (insertNode.getSyncIndex() == nextSyncIndex) {
+          requestCache.remove(insertNode);
+          nextSyncIndex++;
+          break;
+        }
+        if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+            && requestCache.peek().getSyncIndex() == insertNode.getSyncIndex()) {
+          requestCache.remove();
+          nextSyncIndex = insertNode.getSyncIndex() + 1;
+          break;
+        }
+        try {
+          requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime());
+      logger.info("queue size {}, syncIndex = {}", requestCache.size(), insertNode.getSyncIndex());
+      TSStatus tsStatus = write(insertNode);
+      requestCache.notifyAll();
+      return tsStatus;
     }
   }
 
   private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
     private final long syncIndex;
-    private final List<InsertNode> insertNodes;
-    private final AsyncMethodCallback<TSyncLogRes> resultHandler;
+    private final InsertNode insertNode;
 
-    public InsertNodeWrapper(
-        long syncIndex,
-        List<InsertNode> insertNode,
-        AsyncMethodCallback<TSyncLogRes> resultHandler) {
+    public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
       this.syncIndex = syncIndex;
-      this.insertNodes = insertNode;
-      this.resultHandler = resultHandler;
+      this.insertNode = insertNode;
     }
 
     @Override
@@ -151,51 +170,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
       return syncIndex;
     }
 
-    public List<InsertNode> getInsertNodes() {
-      return insertNodes;
-    }
-
-    public AsyncMethodCallback<TSyncLogRes> getResultHandler() {
-      return resultHandler;
-    }
-  }
-
-  public void multiLeaderWriteAsync(
-      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
-    long prepareStartTime = System.nanoTime();
-    List<TSStatus> statuses = new LinkedList<>();
-    try {
-      List<InsertNode> insertNodesInAllRequests = new LinkedList<>();
-      for (IndexedConsensusRequest indexedRequest : requests) {
-        List<InsertNode> insertNodesInOneRequest =
-            new ArrayList<>(indexedRequest.getRequests().size());
-        for (IConsensusRequest req : indexedRequest.getRequests()) {
-          // PlanNode in IndexedConsensusRequest should always be InsertNode
-          InsertNode innerNode = (InsertNode) getPlanNode(req);
-          innerNode.setSearchIndex(indexedRequest.getSearchIndex());
-          insertNodesInOneRequest.add(innerNode);
-        }
-        insertNodesInAllRequests.add(mergeInsertNodes(insertNodesInOneRequest));
-      }
-      long startTime = System.nanoTime();
-      InsertNodeWrapper insertNodeWrapper =
-          cacheAndGetLatestInsertNode(
-              requests.get(0).getSyncIndex(), insertNodesInAllRequests, resultHandler);
-      StepTracker.trace("cacheAndGet", 25, startTime, System.nanoTime());
-      StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime());
-      long writeStartTime = System.nanoTime();
-      if (insertNodeWrapper != null) {
-        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
-          statuses.add(write(insertNode));
-        }
-        insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
-      } else {
-        logger.error("insertNodeWrapper is null");
-      }
-      StepTracker.trace("followerWriteInsert", 25, writeStartTime, System.nanoTime());
-    } catch (IllegalArgumentException e) {
-      logger.error(e.getMessage(), e);
-      resultHandler.onError(e);
+    public InsertNode getInsertNode() {
+      return insertNode;
     }
   }
 
@@ -212,7 +188,16 @@ public class DataRegionStateMachine extends BaseStateMachine {
           innerNode.setSearchIndex(indexedRequest.getSearchIndex());
           insertNodes.add(innerNode);
         }
-        planNode = mergeInsertNodes(insertNodes);
+        if (indexedRequest.getSearchIndex() == ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
+
+          TSStatus status =
+              cacheAndInsertLatestNode(
+                  indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+
+          return status;
+        } else {
+          planNode = mergeInsertNodes(insertNodes);
+        }
       } else {
         planNode = getPlanNode(request);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index f5b5bddb52..787dfbf9ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode {
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
+  protected long syncIndex = NO_CONSENSUS_INDEX;
+
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
@@ -153,6 +155,14 @@ public abstract class InsertNode extends WritePlanNode {
     return searchIndex;
   }
 
+  public void setSyncIndex(long syncIndex) {
+    this.syncIndex = syncIndex;
+  }
+
+  public long getSyncIndex() {
+    return syncIndex;
+  }
+
   /** Search index should start from 1 */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;