You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/26 15:43:29 UTC

[iotdb] branch xingtanzjr/query_execution updated: fix the bug in Parallel plan stage

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
     new 4805516  fix the bug in Parallel plan stage
4805516 is described below

commit 4805516f2d27c9f2792a1b3ff9f551b8ea2cbd35
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Mar 26 23:42:36 2022 +0800

    fix the bug in Parallel plan stage
---
 .../org/apache/iotdb/db/mpp/execution/QueryExecution.java   |  5 +++++
 .../mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java |  7 ++++++-
 .../db/mpp/sql/planner/plan/node/process/ExchangeNode.java  |  4 +++-
 .../db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java | 13 +++++++++++--
 4 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 194802a..b35c88a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
@@ -54,6 +55,10 @@ public class QueryExecution {
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
+  // The result of QueryExecution will be written to the DataBlockManager in current Node.
+  // We use this SourceHandle to fetch the TsBlock from it.
+  private ISourceHandle resultHandle;
+
   public QueryExecution(Statement statement, MPPQueryContext context) {
     this.context = context;
     this.analysis = analyze(statement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 3603310..8178c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -72,7 +72,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // one by one
     int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
-    FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+    FragmentInstance fragmentInstance = new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
 
     // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
     // of them.
@@ -115,6 +115,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     return instanceMap.get(planNodeMap.get(exchangeNodeId));
   }
 
+  private void addSinkNodeToRootPlanFragment() {
+    FragmentInstance rootInstance = instanceMap.get(subPlan.getPlanFragment().getId());
+
+  }
+
   private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
     planNodeMap.put(root.getId(), planFragmentId);
     for (PlanNode child : root.getChildren()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 80bbf41..0df6d36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -60,7 +60,9 @@ public class ExchangeNode extends PlanNode {
   public PlanNode clone() {
     ExchangeNode node = new ExchangeNode(getId());
     if (remoteSourceNode != null) {
-      node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone());
+      FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone();
+      remoteSourceNodeClone.setDownStreamNode(node);
+      node.setRemoteSourceNode(remoteSourceNode);
     }
     return node;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 3ab12c7..b9940c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
+import org.apache.commons.lang.Validate;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -51,12 +52,20 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public PlanNode clone() {
-    return null;
+    FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
+    sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
+    sinkNode.setDownStreamNode(downStreamNode);
+    return sinkNode;
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+    Validate.isTrue(children == null || children.size() == 1, "Children size of FragmentSinkNode should be 0 or 1");
+    FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
+    if (children != null) {
+      sinkNode.setChild(children.get(0));
+    }
+    return sinkNode;
   }
 
   @Override