You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/26 15:43:29 UTC
[iotdb] branch xingtanzjr/query_execution updated: fix the bug in Parallel plan stage
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
new 4805516 fix the bug in Parallel plan stage
4805516 is described below
commit 4805516f2d27c9f2792a1b3ff9f551b8ea2cbd35
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Mar 26 23:42:36 2022 +0800
fix the bug in Parallel plan stage
---
.../org/apache/iotdb/db/mpp/execution/QueryExecution.java | 5 +++++
.../mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java | 7 ++++++-
.../db/mpp/sql/planner/plan/node/process/ExchangeNode.java | 4 +++-
.../db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java | 13 +++++++++++--
4 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 194802a..b35c88a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
@@ -54,6 +55,10 @@ public class QueryExecution {
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
+ // The result of QueryExecution will be written to the DataBlockManager in current Node.
+ // We use this SourceHandle to fetch the TsBlock from it.
+ private ISourceHandle resultHandle;
+
public QueryExecution(Statement statement, MPPQueryContext context) {
this.context = context;
this.analysis = analyze(statement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 3603310..8178c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -72,7 +72,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// one by one
int instanceIdx = 0;
PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
- FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+ FragmentInstance fragmentInstance = new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
// Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
// of them.
@@ -115,6 +115,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
return instanceMap.get(planNodeMap.get(exchangeNodeId));
}
+ private void addSinkNodeToRootPlanFragment() {
+ FragmentInstance rootInstance = instanceMap.get(subPlan.getPlanFragment().getId());
+
+ }
+
private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
planNodeMap.put(root.getId(), planFragmentId);
for (PlanNode child : root.getChildren()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 80bbf41..0df6d36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -60,7 +60,9 @@ public class ExchangeNode extends PlanNode {
public PlanNode clone() {
ExchangeNode node = new ExchangeNode(getId());
if (remoteSourceNode != null) {
- node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone());
+ FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone();
+ remoteSourceNodeClone.setDownStreamNode(node);
+ node.setRemoteSourceNode(remoteSourceNode);
}
return node;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 3ab12c7..b9940c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+import org.apache.commons.lang.Validate;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -51,12 +52,20 @@ public class FragmentSinkNode extends SinkNode {
@Override
public PlanNode clone() {
- return null;
+ FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
+ sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
+ sinkNode.setDownStreamNode(downStreamNode);
+ return sinkNode;
}
@Override
public PlanNode cloneWithChildren(List<PlanNode> children) {
- return null;
+ Validate.isTrue(children == null || children.size() == 1, "Children size of FragmentSinkNode should be 0 or 1");
+ FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
+ if (children != null) {
+ sinkNode.setChild(children.get(0));
+ }
+ return sinkNode;
}
@Override