You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/30 05:18:03 UTC
[iotdb] 01/01: add the logic to process other PlanNode besides InsertNode
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/fix_wirte_issue_ml
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 40f0613ab76f97e429ff5509fa19d19ad364704f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 30 13:17:44 2022 +0800
add the logic to process other PlanNode besides InsertNode
---
.../statemachine/DataRegionStateMachine.java | 28 +++++++++++++++-------
1 file changed, 19 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index d5cf5e4533..c53bb069a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -196,8 +196,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
insertNodeWrapper.getStartSyncIndex(),
insertNodeWrapper.getEndSyncIndex());
List<TSStatus> subStatus = new LinkedList<>();
- for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
- subStatus.add(write(insertNode));
+ for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
+ subStatus.add(write(planNode));
}
queueSortCondition.signalAll();
return new TSStatus().setSubStatus(subStatus);
@@ -209,7 +209,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
private final long startSyncIndex;
private final long endSyncIndex;
- private final List<InsertNode> insertNodes;
+ private final List<PlanNode> insertNodes;
public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
this.startSyncIndex = startSyncIndex;
@@ -222,7 +222,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
return Long.compare(startSyncIndex, o.startSyncIndex);
}
- public void add(InsertNode insertNode) {
+ public void add(PlanNode insertNode) {
this.insertNodes.add(insertNode);
}
@@ -234,7 +234,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
return endSyncIndex;
}
- public List<InsertNode> getInsertNodes() {
+ public List<PlanNode> getInsertNodes() {
return insertNodes;
}
}
@@ -248,13 +248,23 @@ public class DataRegionStateMachine extends BaseStateMachine {
return insertNodeWrapper;
}
- private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+ private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
- InsertNode innerNode = (InsertNode) getPlanNode(req);
- innerNode.setSearchIndex(indexedRequest.getSearchIndex());
- insertNodes.add(innerNode);
+ PlanNode planNode = getPlanNode(req);
+ if (planNode instanceof InsertNode) {
+ InsertNode innerNode = (InsertNode) planNode;
+ innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+ insertNodes.add(innerNode);
+ } else if (indexedRequest.getRequests().size() == 1) {
+ // If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only
+ // contains one request
+ return planNode;
+ } else {
+ throw new IllegalArgumentException(
+ "PlanNodes in IndexedConsensusRequest are not InsertNode and the size of requests are larger than 1");
+ }
}
return mergeInsertNodes(insertNodes);
}