You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/29 06:28:21 UTC
[iotdb] branch ml_0729_test updated: add cache logic in data region statemachine for multi leader write
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test by this push:
new bbf2bf1dff add cache logic in data region statemachine for multi leader write
bbf2bf1dff is described below
commit bbf2bf1dff1fbe128d6422fb0fcc48e4699f3ecf
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 14:27:18 2022 +0800
add cache logic in data region statemachine for multi leader write
---
.../common/request/IndexedConsensusRequest.java | 13 ++++++
.../multileader/MultiLeaderServerImpl.java | 5 ++-
.../service/MultiLeaderRPCServiceProcessor.java | 9 ++++-
.../statemachine/DataRegionStateMachine.java | 46 +++++++++++++++++++++-
4 files changed, 68 insertions(+), 5 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..9c5112fa2b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -29,11 +29,20 @@ public class IndexedConsensusRequest implements IConsensusRequest {
/** we do not need to serialize these two fields as they are useless in other nodes. */
private final long searchIndex;
+ private final long syncIndex;
private final List<IConsensusRequest> requests;
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
this.searchIndex = searchIndex;
this.requests = requests;
+ this.syncIndex = -1L;
+ }
+
+ public IndexedConsensusRequest(
+ long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
+ this.searchIndex = searchIndex;
+ this.requests = requests;
+ this.syncIndex = syncIndex;
}
@Override
@@ -49,6 +58,10 @@ public class IndexedConsensusRequest implements IConsensusRequest {
return searchIndex;
}
+ public long getSyncIndex() {
+ return syncIndex;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 3e8cbbfb3b..8b21fa920e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -186,8 +186,9 @@ public class MultiLeaderServerImpl {
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
- List<IConsensusRequest> requests) {
- return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
+ long syncIndex, List<IConsensusRequest> requests) {
+ return new IndexedConsensusRequest(
+ ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
}
/**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 69375872c1..b24be2a1e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -81,8 +81,11 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
if (batch.getSearchIndex() != currentSearchIndex) {
statuses.add(
impl.getStateMachine()
- .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests)));
consensusRequests = new ArrayList<>();
+ currentSearchIndex = batch.getSearchIndex();
}
consensusRequests.add(request);
}
@@ -90,7 +93,9 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
if (!consensusRequests.isEmpty()) {
statuses.add(
impl.getStateMachine()
- .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests)));
}
}
logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 285ad31d74..b56586ee80 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -42,12 +42,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.PriorityQueue;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -58,8 +60,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
private DataRegion region;
+ private static final int MAX_REQUEST_CACHE_SIZE = 50;
+ private PriorityQueue<InsertNodeWrapper> requestCache;
+
public DataRegionStateMachine(DataRegion region) {
this.region = region;
+ this.requestCache = new PriorityQueue<>();
}
@Override
@@ -104,6 +110,38 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
+ private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
+ requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
+ if (requestCache.size() >= MAX_REQUEST_CACHE_SIZE) {
+ return requestCache.poll().getInsertNode();
+ } else {
+ return null;
+ }
+ }
+
+ private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
+ private final long syncIndex;
+ private final InsertNode insertNode;
+
+ public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
+ this.syncIndex = syncIndex;
+ this.insertNode = insertNode;
+ }
+
+ @Override
+ public int compareTo(@NotNull InsertNodeWrapper o) {
+ return Long.compare(syncIndex, o.syncIndex);
+ }
+
+ public long getSyncIndex() {
+ return syncIndex;
+ }
+
+ public InsertNode getInsertNode() {
+ return insertNode;
+ }
+ }
+
@Override
public TSStatus write(IConsensusRequest request) {
PlanNode planNode;
@@ -117,7 +155,13 @@ public class DataRegionStateMachine extends BaseStateMachine {
innerNode.setSearchIndex(indexedRequest.getSearchIndex());
insertNodes.add(innerNode);
}
- planNode = mergeInsertNodes(insertNodes);
+ planNode =
+ cacheAndGetLatestInsertNode(
+ indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+ // TODO: tmp way to do the test
+ if (planNode == null) {
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
} else {
planNode = getPlanNode(request);
}