You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/27 13:32:34 UTC

[iotdb] 01/07: complete source rewrite for DeviceViewNode

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/align_by_device_distribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d039dfae94bd9f63580bae5b9b2f830c4d020da7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri May 27 16:23:40 2022 +0800

    complete source rewrite for DeviceViewNode
---
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  4 +
 .../plan/planner/distribution/SourceRewriter.java  | 90 +++++++++++++++++++++-
 .../planner/plan/node/process/DeviceViewNode.java  |  4 +
 3 files changed, 95 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 2e4eab4950..2a7137dd22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -152,6 +152,10 @@ public class Analysis {
     return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(), null);
   }
 
+  public List<TRegionReplicaSet> getPartitionInfo(String deviceName, Filter globalTimeFilter) {
+    return dataPartition.getDataRegionReplicaSet(deviceName, null);
+  }
+
   public Statement getStatement() {
     return statement;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 65d4d6552f..02f95e0572 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -23,10 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.partition.RegionReplicaSetInfo;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
@@ -35,6 +37,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryM
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
@@ -74,9 +78,89 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     this.analysis = analysis;
   }
 
-  // TODO: (xingtanzjr) implement the method visitDeviceMergeNode()
-  public PlanNode visitDeviceMerge(TimeJoinNode node, DistributionPlanContext context) {
-    return null;
+  @Override
+  public PlanNode visitDeviceView(DeviceViewNode node, DistributionPlanContext context) {
+    checkArgument(
+        node.getDevices().size() == node.getChildren().size(),
+        "size of devices and its children in DeviceViewNode should be same");
+
+    Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
+
+    List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
+    // Step 1: constructs DeviceViewSplit
+    for (int i = 0; i < node.getDevices().size(); i++) {
+      String device = node.getDevices().get(i);
+      PlanNode child = node.getChildren().get(i);
+      List<TRegionReplicaSet> regionReplicaSets =
+          analysis.getPartitionInfo(device, analysis.getGlobalTimeFilter());
+      deviceViewSplits.add(new DeviceViewSplit(device, child, regionReplicaSets));
+      relatedDataRegions.addAll(regionReplicaSets);
+    }
+
+    DeviceMergeNode deviceMergeNode =
+        new DeviceMergeNode(
+            context.queryContext.getQueryId().genPlanNodeId(),
+            node.getMergeOrders(),
+            node.getDevices());
+
+    // Step 2: Iterate all partition and create DeviceViewNode for each region
+    for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
+      List<String> devices = new ArrayList<>();
+      List<PlanNode> children = new ArrayList<>();
+      for (DeviceViewSplit split : deviceViewSplits) {
+        if (split.needDistributeTo(regionReplicaSet)) {
+          devices.add(split.device);
+          children.add(split.buildPlanNodeInRegion(regionReplicaSet, context.queryContext));
+        }
+      }
+      DeviceViewNode regionDeviceViewNode =
+          new DeviceViewNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              node.getMergeOrders(),
+              node.getOutputColumnNames(),
+              node.getDeviceToMeasurementIndexesMap());
+      for (int i = 0; i < devices.size(); i++) {
+        regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i));
+      }
+      deviceMergeNode.addChild(regionDeviceViewNode);
+    }
+
+    return deviceMergeNode;
+  }
+
+  private static class DeviceViewSplit {
+    protected String device;
+    protected PlanNode root;
+    protected Set<TRegionReplicaSet> dataPartitions;
+
+    protected DeviceViewSplit(
+        String device, PlanNode root, List<TRegionReplicaSet> dataPartitions) {
+      this.device = device;
+      this.root = root;
+      this.dataPartitions = new HashSet<>();
+      this.dataPartitions.addAll(dataPartitions);
+    }
+
+    protected PlanNode buildPlanNodeInRegion(
+        TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+      return buildPlanNodeInRegion(this.root, regionReplicaSet, context);
+    }
+
+    protected boolean needDistributeTo(TRegionReplicaSet regionReplicaSet) {
+      return this.dataPartitions.contains(regionReplicaSet);
+    }
+
+    private PlanNode buildPlanNodeInRegion(
+        PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+      List<PlanNode> children =
+          root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+      PlanNode newRoot = root.cloneWithChildren(children);
+      newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
+      if (newRoot instanceof SourceNode) {
+        ((SourceNode) root).setRegionReplicaSet(regionReplicaSet);
+      }
+      return newRoot;
+    }
   }
 
   public PlanNode visitDeleteTimeseries(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 589e666297..60de016a0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -117,6 +117,10 @@ public class DeviceViewNode extends ProcessNode {
         getPlanNodeId(), mergeOrders, outputColumnNames, devices, deviceToMeasurementIndexesMap);
   }
 
+  public List<OrderBy> getMergeOrders() {
+    return mergeOrders;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return outputColumnNames;