You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/28 03:53:23 UTC
[iotdb] 02/02: fix bufs
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch geely_car_0206
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 627c840a6bae57b2ef8f5fabb077621649fb881b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Feb 28 11:53:12 2023 +0800
fix bufs
---
.../planner/distribution/DistributionPlanner.java | 19 ++++++++-----------
.../distribution/SimpleFragmentParallelPlanner.java | 16 ++++++----------
.../iotdb/db/mpp/plan/planner/plan/PlanFragment.java | 18 ------------------
3 files changed, 14 insertions(+), 39 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 4733371321..3c6faaca6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -93,7 +93,7 @@ public class DistributionPlanner {
&& (((QueryStatement) analysis.getStatement()).isAlignByDevice()),
root);
PlanNode newRoot = adder.visit(root, nodeGroupContext);
- adjustUpStream(root, nodeGroupContext);
+ adjustUpStream(newRoot, nodeGroupContext);
return newRoot;
}
@@ -129,16 +129,13 @@ public class DistributionPlanner {
MultiChildrenSinkNode newChild =
memo.computeIfAbsent(
regionOfChild,
- tRegionReplicaSet -> {
- MultiChildrenSinkNode result =
- needShuffleSinkNode
- ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
- : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
- result.addChild(exchangeNode.getChild());
- result.addDownStreamChannelLocation(
- new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
- return result;
- });
+ tRegionReplicaSet ->
+ needShuffleSinkNode
+ ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+ : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId()));
+ newChild.addChild(exchangeNode.getChild());
+ newChild.addDownStreamChannelLocation(
+ new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
exchangeNode.setChild(newChild);
exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index d86c96d452..6361a91f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -34,12 +34,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.MultiChildrenSinkNode;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +64,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// Record all the FragmentInstances belonged to same PlanFragment
Map<PlanFragmentId, FragmentInstance> instanceMap;
// Record which PlanFragment the PlanNode belongs
- Map<PlanNodeId, PlanFragmentId> planNodeMap;
+ Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
List<FragmentInstance> fragmentInstanceList;
public SimpleFragmentParallelPlanner(
@@ -93,13 +93,10 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
}
private void produceFragmentInstance(PlanFragment fragment) {
- // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
- // one by one
- PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getPlanNodeTree());
Filter timeFilter = analysis.getGlobalTimeFilter();
FragmentInstance fragmentInstance =
new FragmentInstance(
- new PlanFragment(fragment.getId(), rootCopy),
+ fragment,
fragment.getId().genFragmentInstanceId(),
timeFilter,
queryContext.getQueryType(),
@@ -217,8 +214,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
downStreamInstance.getId().toThrift());
// Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
- PlanNode downStreamExchangeNode =
- downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+ PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right;
((ExchangeNode) downStreamExchangeNode)
.setUpstream(
instance.getHostDataNode().getMPPDataExchangeEndPoint(),
@@ -230,11 +226,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
}
private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
- return instanceMap.get(planNodeMap.get(exchangeNodeId));
+ return instanceMap.get(planNodeMap.get(exchangeNodeId).left);
}
private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
- planNodeMap.put(root.getPlanNodeId(), planFragmentId);
+ planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
for (PlanNode child : root.getChildren()) {
recordPlanNodeRelation(child, planFragmentId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index aac15e7940..5fd8b79a7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.SubPlanTypeExtractor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.VirtualSourceNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -133,23 +132,6 @@ public class PlanFragment {
return null;
}
- public PlanNode getPlanNodeById(PlanNodeId nodeId) {
- return getPlanNodeById(planNodeTree, nodeId);
- }
-
- private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
- if (root.getPlanNodeId().equals(nodeId)) {
- return root;
- }
- for (PlanNode child : root.getChildren()) {
- PlanNode node = getPlanNodeById(child, nodeId);
- if (node != null) {
- return node;
- }
- }
- return null;
- }
-
public void serialize(ByteBuffer byteBuffer) {
id.serialize(byteBuffer);
planNodeTree.serialize(byteBuffer);