You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/08 08:08:59 UTC
[iotdb] 01/01: change to parallel write in follower side
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1d63c3e688feafd2000d52c102f97e44a3a6bc63
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 8 16:08:45 2022 +0800
change to parallel write in follower side
---
.../org/apache/iotdb/consensus/IStateMachine.java | 7 --
.../iotdb/consensus/config/MultiLeaderConfig.java | 2 +-
.../service/MultiLeaderRPCServiceProcessor.java | 24 ++---
.../consensus/standalone/StandAloneServerImpl.java | 11 --
.../apache/iotdb/consensus/EmptyStateMachine.java | 9 --
.../multileader/util/TestStateMachine.java | 7 --
.../apache/iotdb/consensus/ratis/TestUtils.java | 7 --
.../standalone/StandAloneConsensusTest.java | 8 --
.../consensus/statemachine/BaseStateMachine.java | 12 ---
.../statemachine/DataRegionStateMachine.java | 119 +++++++++------------
.../plan/planner/plan/node/write/InsertNode.java | 10 ++
11 files changed, 74 insertions(+), 142 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 3b5e6b1796..203b7d2a6b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -25,10 +25,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
import javax.annotation.concurrent.ThreadSafe;
@@ -45,9 +41,6 @@ public interface IStateMachine {
void stop();
- void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler);
-
/**
* apply a write-request from user
*
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 4a95a85661..1a9b4dac9f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -249,7 +249,7 @@ public class MultiLeaderConfig {
public static class Builder {
private int maxPendingRequestNumPerNode = 200;
private int maxRequestPerBatch = 40;
- private int maxPendingBatch = 1;
+ private int maxPendingBatch = 5;
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index ad1304a608..2e00442e37 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -71,7 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
return;
}
- List<IndexedConsensusRequest> indexedConsensusRequests = new LinkedList<>();
+ List<TSStatus> statuses = new ArrayList<>();
// We use synchronized to ensure atomicity of executing multiple logs
if (!req.getBatches().isEmpty()) {
List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -83,9 +81,11 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
: new ByteBufferConsensusRequest(batch.data);
// merge TLogBatch with same search index into one request
if (batch.getSearchIndex() != currentSearchIndex) {
- indexedConsensusRequests.add(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests));
+ statuses.add(
+ impl.getStateMachine()
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests)));
consensusRequests = new ArrayList<>();
currentSearchIndex = batch.getSearchIndex();
}
@@ -93,14 +93,12 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
}
// write last request
if (!consensusRequests.isEmpty()) {
- indexedConsensusRequests.add(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests));
+ statuses.add(
+ impl.getStateMachine()
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests)));
}
- long followerWriteRequestStartTime = System.nanoTime();
- impl.getStateMachine().multiLeaderWriteAsync(indexedConsensusRequests, resultHandler);
- StepTracker.trace(
- "followerWriteRequest", 25, followerWriteRequestStartTime, System.nanoTime());
}
} catch (Exception e) {
resultHandler.onError(e);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index cd57b252ec..8438a5fbe4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -24,13 +24,8 @@ import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
import java.io.File;
-import java.util.List;
public class StandAloneServerImpl implements IStateMachine {
@@ -62,12 +57,6 @@ public class StandAloneServerImpl implements IStateMachine {
stateMachine.stop();
}
- @Override
- public void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
- throw new UnsupportedOperationException();
- }
-
@Override
public TSStatus write(IConsensusRequest request) {
return stateMachine.write(request);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 86aeddd19c..01992f472f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -22,13 +22,8 @@ package org.apache.iotdb.consensus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-
-import org.apache.thrift.async.AsyncMethodCallback;
import java.io.File;
-import java.util.List;
public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi {
@@ -38,10 +33,6 @@ public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi
@Override
public void stop() {}
- @Override
- public void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
@Override
public TSStatus write(IConsensusRequest IConsensusRequest) {
return new TSStatus(0);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index dc7b31f831..eab940cf87 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -25,14 +25,11 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.async.AsyncMethodCallback;
-
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -68,10 +65,6 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
@Override
public void stop() {}
- @Override
- public void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
@Override
public TSStatus write(IConsensusRequest request) {
synchronized (requestSets) {
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 72e741e6c9..d383fe902b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -27,10 +27,7 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,10 +86,6 @@ public class TestUtils {
@Override
public void stop() {}
- @Override
- public void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
@Override
public TSStatus write(IConsensusRequest request) {
TestRequest testRequest;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index b9438177f5..9e0e13be6d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -41,10 +40,8 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.ratis.util.FileUtils;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,7 +50,6 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -101,10 +97,6 @@ public class StandAloneConsensusTest {
@Override
public void stop() {}
- @Override
- public void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {}
-
@Override
public TSStatus write(IConsensusRequest request) {
if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 27d696129f..3bd74bd91a 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -23,31 +23,19 @@ import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.wal.buffer.WALEntry;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.NotSupportedException;
-
-import java.util.List;
-
public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
- public void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
- throw new NotSupportedException();
- }
-
protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
FragmentInstance instance;
if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4744163a83..4f53926670 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -44,14 +44,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
@@ -64,13 +63,15 @@ public class DataRegionStateMachine extends BaseStateMachine {
private DataRegion region;
- private static final int MAX_REQUEST_CACHE_SIZE = 1;
+ private static final int MAX_REQUEST_CACHE_SIZE = 5;
private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
- private final PriorityQueue<InsertNodeWrapper> requestCache;
+
+ private final PriorityQueue<InsertNode> requestCache;
+ private long nextSyncIndex = -1;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
- this.requestCache = new PriorityQueue<>();
+ this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
}
@Override
@@ -115,31 +116,49 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
- private InsertNodeWrapper cacheAndGetLatestInsertNode(
- long syncIndex,
- List<InsertNode> insertNodes,
- AsyncMethodCallback<TSyncLogRes> resultHandler) {
+ private TSStatus cacheAndInsertLatestNode(long syncIndex, InsertNode insertNode) {
+ long cacheRequestStartTime = System.nanoTime();
+ insertNode.setSyncIndex(syncIndex);
synchronized (requestCache) {
- requestCache.add(new InsertNodeWrapper(syncIndex, insertNodes, resultHandler));
+ requestCache.add(insertNode);
+ // If the peek is not hold by current thread, it should notify the corresponding thread to
+ // process the peek when the queue is full
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
- return requestCache.poll();
+ requestCache.notifyAll();
}
- return null;
+ while (true) {
+ if (insertNode.getSyncIndex() == nextSyncIndex) {
+ requestCache.remove(insertNode);
+ nextSyncIndex++;
+ break;
+ }
+ if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+ && requestCache.peek().getSyncIndex() == insertNode.getSyncIndex()) {
+ requestCache.remove();
+ nextSyncIndex = insertNode.getSyncIndex() + 1;
+ break;
+ }
+ try {
+ requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime());
+ logger.info("queue size {}, syncIndex = {}", requestCache.size(), insertNode.getSyncIndex());
+ TSStatus tsStatus = write(insertNode);
+ requestCache.notifyAll();
+ return tsStatus;
}
}
private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
private final long syncIndex;
- private final List<InsertNode> insertNodes;
- private final AsyncMethodCallback<TSyncLogRes> resultHandler;
+ private final InsertNode insertNode;
- public InsertNodeWrapper(
- long syncIndex,
- List<InsertNode> insertNode,
- AsyncMethodCallback<TSyncLogRes> resultHandler) {
+ public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
this.syncIndex = syncIndex;
- this.insertNodes = insertNode;
- this.resultHandler = resultHandler;
+ this.insertNode = insertNode;
}
@Override
@@ -151,51 +170,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
return syncIndex;
}
- public List<InsertNode> getInsertNodes() {
- return insertNodes;
- }
-
- public AsyncMethodCallback<TSyncLogRes> getResultHandler() {
- return resultHandler;
- }
- }
-
- public void multiLeaderWriteAsync(
- List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
- long prepareStartTime = System.nanoTime();
- List<TSStatus> statuses = new LinkedList<>();
- try {
- List<InsertNode> insertNodesInAllRequests = new LinkedList<>();
- for (IndexedConsensusRequest indexedRequest : requests) {
- List<InsertNode> insertNodesInOneRequest =
- new ArrayList<>(indexedRequest.getRequests().size());
- for (IConsensusRequest req : indexedRequest.getRequests()) {
- // PlanNode in IndexedConsensusRequest should always be InsertNode
- InsertNode innerNode = (InsertNode) getPlanNode(req);
- innerNode.setSearchIndex(indexedRequest.getSearchIndex());
- insertNodesInOneRequest.add(innerNode);
- }
- insertNodesInAllRequests.add(mergeInsertNodes(insertNodesInOneRequest));
- }
- long startTime = System.nanoTime();
- InsertNodeWrapper insertNodeWrapper =
- cacheAndGetLatestInsertNode(
- requests.get(0).getSyncIndex(), insertNodesInAllRequests, resultHandler);
- StepTracker.trace("cacheAndGet", 25, startTime, System.nanoTime());
- StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime());
- long writeStartTime = System.nanoTime();
- if (insertNodeWrapper != null) {
- for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
- statuses.add(write(insertNode));
- }
- insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
- } else {
- logger.error("insertNodeWrapper is null");
- }
- StepTracker.trace("followerWriteInsert", 25, writeStartTime, System.nanoTime());
- } catch (IllegalArgumentException e) {
- logger.error(e.getMessage(), e);
- resultHandler.onError(e);
+ public InsertNode getInsertNode() {
+ return insertNode;
}
}
@@ -212,7 +188,16 @@ public class DataRegionStateMachine extends BaseStateMachine {
innerNode.setSearchIndex(indexedRequest.getSearchIndex());
insertNodes.add(innerNode);
}
- planNode = mergeInsertNodes(insertNodes);
+ if (indexedRequest.getSearchIndex() == ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
+
+ TSStatus status =
+ cacheAndInsertLatestNode(
+ indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+
+ return status;
+ } else {
+ planNode = mergeInsertNodes(insertNodes);
+ }
} else {
planNode = getPlanNode(request);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index f5b5bddb52..787dfbf9ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode {
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
+ protected long syncIndex = NO_CONSENSUS_INDEX;
+
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
@@ -153,6 +155,14 @@ public abstract class InsertNode extends WritePlanNode {
return searchIndex;
}
+ public void setSyncIndex(long syncIndex) {
+ this.syncIndex = syncIndex;
+ }
+
+ public long getSyncIndex() {
+ return syncIndex;
+ }
+
/** Search index should start from 1 */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;