You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/28 03:53:23 UTC

[iotdb] 02/02: fix bufs

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch geely_car_0206
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 627c840a6bae57b2ef8f5fabb077621649fb881b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Feb 28 11:53:12 2023 +0800

    fix bufs
---
 .../planner/distribution/DistributionPlanner.java     | 19 ++++++++-----------
 .../distribution/SimpleFragmentParallelPlanner.java   | 16 ++++++----------
 .../iotdb/db/mpp/plan/planner/plan/PlanFragment.java  | 18 ------------------
 3 files changed, 14 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 4733371321..3c6faaca6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -93,7 +93,7 @@ public class DistributionPlanner {
                 && (((QueryStatement) analysis.getStatement()).isAlignByDevice()),
             root);
     PlanNode newRoot = adder.visit(root, nodeGroupContext);
-    adjustUpStream(root, nodeGroupContext);
+    adjustUpStream(newRoot, nodeGroupContext);
     return newRoot;
   }
 
@@ -129,16 +129,13 @@ public class DistributionPlanner {
         MultiChildrenSinkNode newChild =
             memo.computeIfAbsent(
                 regionOfChild,
-                tRegionReplicaSet -> {
-                  MultiChildrenSinkNode result =
-                      needShuffleSinkNode
-                          ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
-                          : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
-                  result.addChild(exchangeNode.getChild());
-                  result.addDownStreamChannelLocation(
-                      new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
-                  return result;
-                });
+                tRegionReplicaSet ->
+                    needShuffleSinkNode
+                        ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+                        : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId()));
+        newChild.addChild(exchangeNode.getChild());
+        newChild.addDownStreamChannelLocation(
+            new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
         exchangeNode.setChild(newChild);
         exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index d86c96d452..6361a91f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -34,12 +34,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.MultiChildrenSinkNode;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +64,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   // Record all the FragmentInstances belonged to same PlanFragment
   Map<PlanFragmentId, FragmentInstance> instanceMap;
   // Record which PlanFragment the PlanNode belongs
-  Map<PlanNodeId, PlanFragmentId> planNodeMap;
+  Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
   List<FragmentInstance> fragmentInstanceList;
 
   public SimpleFragmentParallelPlanner(
@@ -93,13 +93,10 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   }
 
   private void produceFragmentInstance(PlanFragment fragment) {
-    // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
-    // one by one
-    PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getPlanNodeTree());
     Filter timeFilter = analysis.getGlobalTimeFilter();
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            new PlanFragment(fragment.getId(), rootCopy),
+            fragment,
             fragment.getId().genFragmentInstanceId(),
             timeFilter,
             queryContext.getQueryType(),
@@ -217,8 +214,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
                       downStreamInstance.getId().toThrift());
 
                   // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
-                  PlanNode downStreamExchangeNode =
-                      downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+                  PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right;
                   ((ExchangeNode) downStreamExchangeNode)
                       .setUpstream(
                           instance.getHostDataNode().getMPPDataExchangeEndPoint(),
@@ -230,11 +226,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   }
 
   private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
-    return instanceMap.get(planNodeMap.get(exchangeNodeId));
+    return instanceMap.get(planNodeMap.get(exchangeNodeId).left);
   }
 
   private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
-    planNodeMap.put(root.getPlanNodeId(), planFragmentId);
+    planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
     for (PlanNode child : root.getChildren()) {
       recordPlanNodeRelation(child, planFragmentId);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index aac15e7940..5fd8b79a7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.SubPlanTypeExtractor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.VirtualSourceNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -133,23 +132,6 @@ public class PlanFragment {
     return null;
   }
 
-  public PlanNode getPlanNodeById(PlanNodeId nodeId) {
-    return getPlanNodeById(planNodeTree, nodeId);
-  }
-
-  private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
-    if (root.getPlanNodeId().equals(nodeId)) {
-      return root;
-    }
-    for (PlanNode child : root.getChildren()) {
-      PlanNode node = getPlanNodeById(child, nodeId);
-      if (node != null) {
-        return node;
-      }
-    }
-    return null;
-  }
-
   public void serialize(ByteBuffer byteBuffer) {
     id.serialize(byteBuffer);
     planNodeTree.serialize(byteBuffer);