You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/30 05:18:03 UTC

[iotdb] 01/01: add the logic to process other PlanNode besides InsertNode

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_wirte_issue_ml
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 40f0613ab76f97e429ff5509fa19d19ad364704f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 30 13:17:44 2022 +0800

    add the logic to process other PlanNode besides InsertNode
---
 .../statemachine/DataRegionStateMachine.java       | 28 +++++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index d5cf5e4533..c53bb069a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -196,8 +196,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
           insertNodeWrapper.getStartSyncIndex(),
           insertNodeWrapper.getEndSyncIndex());
       List<TSStatus> subStatus = new LinkedList<>();
-      for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
-        subStatus.add(write(insertNode));
+      for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
+        subStatus.add(write(planNode));
       }
       queueSortCondition.signalAll();
       return new TSStatus().setSubStatus(subStatus);
@@ -209,7 +209,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
   private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
     private final long startSyncIndex;
     private final long endSyncIndex;
-    private final List<InsertNode> insertNodes;
+    private final List<PlanNode> insertNodes;
 
     public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
       this.startSyncIndex = startSyncIndex;
@@ -222,7 +222,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
       return Long.compare(startSyncIndex, o.startSyncIndex);
     }
 
-    public void add(InsertNode insertNode) {
+    public void add(PlanNode insertNode) {
       this.insertNodes.add(insertNode);
     }
 
@@ -234,7 +234,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
       return endSyncIndex;
     }
 
-    public List<InsertNode> getInsertNodes() {
+    public List<PlanNode> getInsertNodes() {
       return insertNodes;
     }
   }
@@ -248,13 +248,23 @@ public class DataRegionStateMachine extends BaseStateMachine {
     return insertNodeWrapper;
   }
 
-  private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+  private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
     List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
     for (IConsensusRequest req : indexedRequest.getRequests()) {
       // PlanNode in IndexedConsensusRequest should always be InsertNode
-      InsertNode innerNode = (InsertNode) getPlanNode(req);
-      innerNode.setSearchIndex(indexedRequest.getSearchIndex());
-      insertNodes.add(innerNode);
+      PlanNode planNode = getPlanNode(req);
+      if (planNode instanceof InsertNode) {
+        InsertNode innerNode = (InsertNode) planNode;
+        innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+        insertNodes.add(innerNode);
+      } else if (indexedRequest.getRequests().size() == 1) {
+        // If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only
+        // contains one request
+        return planNode;
+      } else {
+        throw new IllegalArgumentException(
+            "PlanNodes in IndexedConsensusRequest are not InsertNode and the size of requests are larger than 1");
+      }
     }
     return mergeInsertNodes(insertNodes);
   }