You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/27 13:32:34 UTC
[iotdb] 01/07: complete source rewrite for DeviceViewNode
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/align_by_device_distribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d039dfae94bd9f63580bae5b9b2f830c4d020da7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri May 27 16:23:40 2022 +0800
complete source rewrite for DeviceViewNode
---
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 4 +
.../plan/planner/distribution/SourceRewriter.java | 90 +++++++++++++++++++++-
.../planner/plan/node/process/DeviceViewNode.java | 4 +
3 files changed, 95 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 2e4eab4950..2a7137dd22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -152,6 +152,10 @@ public class Analysis {
return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(), null);
}
+ public List<TRegionReplicaSet> getPartitionInfo(String deviceName, Filter globalTimeFilter) {
+ return dataPartition.getDataRegionReplicaSet(deviceName, null);
+ }
+
public Statement getStatement() {
return statement;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 65d4d6552f..02f95e0572 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -23,10 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.RegionReplicaSetInfo;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
@@ -35,6 +37,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryM
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
@@ -74,9 +78,89 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
this.analysis = analysis;
}
- // TODO: (xingtanzjr) implement the method visitDeviceMergeNode()
- public PlanNode visitDeviceMerge(TimeJoinNode node, DistributionPlanContext context) {
- return null;
+ @Override
+ public PlanNode visitDeviceView(DeviceViewNode node, DistributionPlanContext context) {
+ checkArgument(
+ node.getDevices().size() == node.getChildren().size(),
+ "size of devices and its children in DeviceViewNode should be same");
+
+ Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
+
+ List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
+ // Step 1: constructs DeviceViewSplit
+ for (int i = 0; i < node.getDevices().size(); i++) {
+ String device = node.getDevices().get(i);
+ PlanNode child = node.getChildren().get(i);
+ List<TRegionReplicaSet> regionReplicaSets =
+ analysis.getPartitionInfo(device, analysis.getGlobalTimeFilter());
+ deviceViewSplits.add(new DeviceViewSplit(device, child, regionReplicaSets));
+ relatedDataRegions.addAll(regionReplicaSets);
+ }
+
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrders(),
+ node.getDevices());
+
+ // Step 2: Iterate all partition and create DeviceViewNode for each region
+ for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
+ List<String> devices = new ArrayList<>();
+ List<PlanNode> children = new ArrayList<>();
+ for (DeviceViewSplit split : deviceViewSplits) {
+ if (split.needDistributeTo(regionReplicaSet)) {
+ devices.add(split.device);
+ children.add(split.buildPlanNodeInRegion(regionReplicaSet, context.queryContext));
+ }
+ }
+ DeviceViewNode regionDeviceViewNode =
+ new DeviceViewNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrders(),
+ node.getOutputColumnNames(),
+ node.getDeviceToMeasurementIndexesMap());
+ for (int i = 0; i < devices.size(); i++) {
+ regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i));
+ }
+ deviceMergeNode.addChild(regionDeviceViewNode);
+ }
+
+ return deviceMergeNode;
+ }
+
+ private static class DeviceViewSplit {
+ protected String device;
+ protected PlanNode root;
+ protected Set<TRegionReplicaSet> dataPartitions;
+
+ protected DeviceViewSplit(
+ String device, PlanNode root, List<TRegionReplicaSet> dataPartitions) {
+ this.device = device;
+ this.root = root;
+ this.dataPartitions = new HashSet<>();
+ this.dataPartitions.addAll(dataPartitions);
+ }
+
+ protected PlanNode buildPlanNodeInRegion(
+ TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+ return buildPlanNodeInRegion(this.root, regionReplicaSet, context);
+ }
+
+ protected boolean needDistributeTo(TRegionReplicaSet regionReplicaSet) {
+ return this.dataPartitions.contains(regionReplicaSet);
+ }
+
+ private PlanNode buildPlanNodeInRegion(
+ PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+ List<PlanNode> children =
+ root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+ PlanNode newRoot = root.cloneWithChildren(children);
+ newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
+ if (newRoot instanceof SourceNode) {
+ ((SourceNode) root).setRegionReplicaSet(regionReplicaSet);
+ }
+ return newRoot;
+ }
}
public PlanNode visitDeleteTimeseries(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 589e666297..60de016a0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -117,6 +117,10 @@ public class DeviceViewNode extends ProcessNode {
getPlanNodeId(), mergeOrders, outputColumnNames, devices, deviceToMeasurementIndexesMap);
}
+ public List<OrderBy> getMergeOrders() {
+ return mergeOrders;
+ }
+
@Override
public List<String> getOutputColumnNames() {
return outputColumnNames;