You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/24 09:47:41 UTC
[iotdb] branch xingtanzjr/delete_timeseries_plan updated: complete basic distribution
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/delete_timeseries_plan by this push:
new 3eae1e0349 complete basic distribution
3eae1e0349 is described below
commit 3eae1e0349a751d2ea6a56dc2a7d16b38cbcc968
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 24 17:46:56 2022 +0800
complete basic distribution
---
.../commons/partition/RegionReplicaSetInfo.java | 8 +-
.../iotdb/commons/partition/SchemaPartition.java | 13 +
.../planner/distribution/DistributionPlanner.java | 3 +
.../planner/distribution/ExchangeNodeAdder.java | 28 +
.../plan/planner/distribution/SourceRewriter.java | 57 +-
.../iotdb/db/mpp/plan/planner/plan/SubPlan.java | 2 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +
.../node/metedata/write/DeleteTimeSeriesNode.java | 32 +-
.../planner/plan/node/write/DeleteDataNode.java | 27 +-
.../metadata/DeleteTimeSeriesStatement.java | 2 +-
.../db/mpp/plan/plan/DistributionPlannerTest.java | 934 ---------------------
.../distribution/AggregationDistributionTest.java | 412 +++++++++
.../db/mpp/plan/plan/distribution/BasicTest.java | 378 +++++++++
.../plan/distribution/DeleteTimeseriesTest.java | 71 ++
.../iotdb/db/mpp/plan/plan/distribution/util.java | 191 +++++
15 files changed, 1213 insertions(+), 950 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
index 298547d866..f2017fdb24 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
@@ -22,15 +22,17 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class RegionReplicaSetInfo {
private TRegionReplicaSet regionReplicaSet;
- private List<String> ownedStorageGroups;
+ private Set<String> ownedStorageGroups;
public RegionReplicaSetInfo(TRegionReplicaSet regionReplicaSet) {
this.regionReplicaSet = regionReplicaSet;
- this.ownedStorageGroups = new ArrayList<>();
+ this.ownedStorageGroups = new HashSet<>();
}
public void addStorageGroup(String storageGroup) {
@@ -42,6 +44,6 @@ public class RegionReplicaSetInfo {
}
public List<String> getOwnedStorageGroups() {
- return ownedStorageGroups;
+ return new ArrayList<>(ownedStorageGroups);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index 2b05700add..200f6107b5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -223,6 +223,19 @@ public class SchemaPartition extends Partition {
return result;
}
+ public List<RegionReplicaSetInfo> getSchemaDistributionInfo() {
+ Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();
+ schemaPartitionMap.forEach(
+ (storageGroup, partition) -> {
+ for (TRegionReplicaSet regionReplicaSet : partition.values()) {
+ distributionMap
+ .computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
+ .addStorageGroup(storageGroup);
+ }
+ });
+ return new ArrayList<>(distributionMap.values());
+ }
+
private void writeMap(
Map<TSeriesPartitionSlot, TRegionReplicaSet> valueMap,
OutputStream outputStream,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 91ca95a5f9..3be4a6daf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
@@ -66,7 +67,9 @@ public class DistributionPlanner {
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
+ System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+ System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
if (analysis.getStatement() instanceof QueryStatement) {
analysis
.getRespDatasetHeader()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 781c84b28e..daef502c7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchM
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
@@ -39,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import java.util.ArrayList;
import java.util.Collections;
@@ -155,6 +157,32 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return node.clone();
}
+ public PlanNode visitDeleteData(DeleteDataNode node, NodeGroupContext context) {
+ context.putNodeDistribution(
+ node.getPlanNodeId(),
+ new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+ return node;
+ }
+
+ public PlanNode visitDeleteTimeseries(DeleteTimeSeriesNode node, NodeGroupContext context) {
+ List<PlanNode> visitedChildren = new ArrayList<>();
+ node.getChildren()
+ .forEach(
+ child -> {
+ visitedChildren.add(visit(child, context));
+ });
+ node.getChildren().clear();
+ visitedChildren.forEach(
+ child -> {
+ ExchangeNode exchangeNode =
+ new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(child);
+ exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+ node.addChild(exchangeNode);
+ });
+ return node;
+ }
+
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 752383eff2..d0ba535c26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -60,6 +60,8 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
+
public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
private Analysis analysis;
@@ -75,7 +77,44 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
public PlanNode visitDeleteTimeseries(
DeleteTimeSeriesNode node, DistributionPlanContext context) {
- return null;
+ // Step 1: split DeleteDataNode by partition
+ checkArgument(node.getChildren().size() == 1, "DeleteTimeSeriesNode should have 1 child");
+ checkArgument(
+ node.getChildren().get(0) instanceof DeleteDataNode,
+ "Child of DeleteTimeSeriesNode should be DeleteDataNode");
+
+ DeleteDataNode deleteDataNode = (DeleteDataNode) node.getChildren().get(0);
+ List<DeleteDataNode> deleteDataNodes = splitDeleteDataNode(deleteDataNode, context);
+
+ // Step 2: split DeleteTimeseriesNode by partition
+ List<DeleteTimeSeriesNode> deleteTimeSeriesNodes = splitDeleteTimeseries(node, context);
+
+ // Step 3: construct them as a Tree
+ checkArgument(
+ deleteTimeSeriesNodes.size() > 0,
+ "Size of DeleteTimeseriesNode splits should be larger than 0");
+ deleteDataNodes.forEach(split -> deleteTimeSeriesNodes.get(0).addChild(split));
+ for (int i = 1; i < deleteTimeSeriesNodes.size(); i++) {
+ deleteTimeSeriesNodes.get(i).addChild(deleteTimeSeriesNodes.get(i - 1));
+ }
+ return deleteTimeSeriesNodes.get(deleteTimeSeriesNodes.size() - 1);
+ }
+
+ private List<DeleteTimeSeriesNode> splitDeleteTimeseries(
+ DeleteTimeSeriesNode node, DistributionPlanContext context) {
+ List<DeleteTimeSeriesNode> ret = new ArrayList<>();
+ List<PartialPath> rawPaths = node.getPathList();
+ List<RegionReplicaSetInfo> relatedRegions =
+ analysis.getSchemaPartitionInfo().getSchemaDistributionInfo();
+ for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {
+ List<PartialPath> newPaths =
+ getRelatedPaths(rawPaths, regionReplicaSetInfo.getOwnedStorageGroups());
+ DeleteTimeSeriesNode split =
+ new DeleteTimeSeriesNode(context.queryContext.getQueryId().genPlanNodeId(), newPaths);
+ split.setRegionReplicaSet(regionReplicaSetInfo.getRegionReplicaSet());
+ ret.add(split);
+ }
+ return ret;
}
private List<DeleteDataNode> splitDeleteDataNode(
@@ -84,9 +123,19 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
List<PartialPath> rawPaths = node.getPathList();
List<RegionReplicaSetInfo> relatedRegions =
analysis.getDataPartitionInfo().getDataDistributionInfo();
- for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {}
-
- return null;
+ for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {
+ List<PartialPath> newPaths =
+ getRelatedPaths(rawPaths, regionReplicaSetInfo.getOwnedStorageGroups());
+ DeleteDataNode split =
+ new DeleteDataNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ context.queryContext.getQueryId(),
+ newPaths,
+ regionReplicaSetInfo.getOwnedStorageGroups());
+ split.setRegionReplicaSet(regionReplicaSetInfo.getRegionReplicaSet());
+ ret.add(split);
+ }
+ return ret;
}
private List<PartialPath> getRelatedPaths(List<PartialPath> paths, List<String> storageGroups) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
index b922c37c8a..7ac6f92d0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
@@ -62,7 +62,7 @@ public class SubPlan {
result.add(this.planFragment);
this.children.forEach(
child -> {
- result.add(child.getPlanFragment());
+ result.addAll(child.getPlanFragmentList());
});
return result;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 4c7f1a5587..2af623bfbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -260,4 +261,8 @@ public abstract class PlanVisitor<R, C> {
public R visitDeleteTimeseries(DeleteTimeSeriesNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitDeleteData(DeleteDataNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
index 05356144cb..454db8da22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -36,9 +37,13 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
private final List<PartialPath> pathList;
+ private TRegionReplicaSet regionReplicaSet;
+ private List<PlanNode> children;
+
public DeleteTimeSeriesNode(PlanNodeId id, List<PartialPath> pathList) {
super(id);
this.pathList = pathList;
+ this.children = new ArrayList<>();
}
public List<PartialPath> getPathList() {
@@ -47,11 +52,13 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
@Override
public List<PlanNode> getChildren() {
- return null;
+ return children;
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ children.add(child);
+ }
@Override
public PlanNode clone() {
@@ -60,7 +67,7 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
@Override
public int allowedChildCount() {
- return 0;
+ return CHILD_COUNT_NO_LIMIT;
}
@Override
@@ -68,6 +75,11 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
return null;
}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitDeleteTimeseries(this, context);
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.DELETE_TIMESERIES.serialize(byteBuffer);
@@ -89,6 +101,18 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
@Override
public TRegionReplicaSet getRegionReplicaSet() {
- return null;
+ return regionReplicaSet;
+ }
+
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
+
+ public String toString() {
+ return String.format(
+ "DeleteTimeseriesNode-%s: %s. Region: %s",
+ getPlanNodeId(),
+ pathList,
+ regionReplicaSet == null ? "Not Assigned" : regionReplicaSet.getRegionId());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
index 3e16c1021e..a594bba0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -39,6 +40,8 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
private final List<PartialPath> pathList;
private final List<String> storageGroups;
+ private TRegionReplicaSet regionReplicaSet;
+
public DeleteDataNode(
PlanNodeId id, QueryId queryId, List<PartialPath> pathList, List<String> storageGroups) {
super(id);
@@ -53,7 +56,7 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return new ArrayList<>();
}
@Override
@@ -66,7 +69,7 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
@Override
public int allowedChildCount() {
- return 0;
+ return NO_CHILD_ALLOWED;
}
@Override
@@ -104,9 +107,18 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
return new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitDeleteData(this, context);
+ }
+
@Override
public TRegionReplicaSet getRegionReplicaSet() {
- return null;
+ return regionReplicaSet;
+ }
+
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
}
public QueryId getQueryId() {
@@ -116,4 +128,13 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
public List<String> getStorageGroups() {
return storageGroups;
}
+
+ public String toString() {
+ return String.format(
+ "DeleteDataNode-%s[ Paths: %s, StorageGroups: %s, Region: %s ]",
+ getPlanNodeId(),
+ pathList,
+ storageGroups,
+ regionReplicaSet == null ? "Not Assigned" : regionReplicaSet.getRegionId());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java
index edcccd58e0..99cb5b57ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java
@@ -31,7 +31,7 @@ public class DeleteTimeSeriesStatement extends Statement {
@Override
public List<PartialPath> getPaths() {
- return null;
+ return partialPaths;
}
public void setPartialPaths(List<PartialPath> partialPaths) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
deleted file mode 100644
index 58f0153825..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ /dev/null
@@ -1,934 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.plan.plan;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.commons.partition.SchemaPartition;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
-import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
-import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class DistributionPlannerTest {
-
- @Test
- public void testSingleSeriesScan() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query");
- SeriesScanNode root =
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC);
-
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(2, plan.getInstances().size());
- }
-
- @Test
- public void testSingleSeriesScanRewriteSource() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query");
- SeriesScanNode root =
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC);
-
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
- PlanNode rootAfterRewrite = planner.rewriteSource();
- assertEquals(2, rootAfterRewrite.getChildren().size());
- }
-
- @Test
- public void testRewriteSourceNode() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query");
-
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
-
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
- Analysis analysis = constructAnalysis();
-
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
- PlanNode newRoot = planner.rewriteSource();
- assertEquals(4, newRoot.getChildren().get(0).getChildren().size());
- }
-
- @Test
- public void testRewriteMetaSourceNode() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query");
- SchemaQueryMergeNode metaMergeNode = new SchemaQueryMergeNode(queryId.genPlanNodeId(), false);
- metaMergeNode.addChild(
- new TimeSeriesSchemaScanNode(
- queryId.genPlanNodeId(),
- new PartialPath("root.sg.d1.s1"),
- null,
- null,
- 10,
- 0,
- false,
- false,
- false));
- metaMergeNode.addChild(
- new TimeSeriesSchemaScanNode(
- queryId.genPlanNodeId(),
- new PartialPath("root.sg.d1.s2"),
- null,
- null,
- 10,
- 0,
- false,
- false,
- false));
- metaMergeNode.addChild(
- new TimeSeriesSchemaScanNode(
- queryId.genPlanNodeId(),
- new PartialPath("root.sg.d22.s1"),
- null,
- null,
- 10,
- 0,
- false,
- false,
- false));
- LimitNode root2 = new LimitNode(queryId.genPlanNodeId(), metaMergeNode, 10);
- Analysis analysis = constructAnalysis();
- DistributionPlanner planner2 =
- new DistributionPlanner(
- analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root2));
- PlanNode newRoot2 = planner2.rewriteSource();
- assertEquals(newRoot2.getChildren().get(0).getChildren().size(), 2);
- }
-
- @Test
- public void testAddExchangeNode() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
-
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
- Analysis analysis = constructAnalysis();
-
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
- PlanNode rootAfterRewrite = planner.rewriteSource();
- PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
- assertEquals(4, rootWithExchange.getChildren().get(0).getChildren().size());
- int exchangeNodeCount = 0;
- for (PlanNode child : rootWithExchange.getChildren().get(0).getChildren()) {
- exchangeNodeCount += child instanceof ExchangeNode ? 1 : 0;
- }
- assertEquals(2, exchangeNodeCount);
- }
-
- @Test
- public void testSplitFragment() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
-
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
- PlanNode rootAfterRewrite = planner.rewriteSource();
- PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
- SubPlan subPlan = planner.splitFragment(rootWithExchange);
- assertEquals(subPlan.getChildren().size(), 2);
- }
-
- @Test
- public void testParallelPlan() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
-
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(3, plan.getInstances().size());
- }
-
- @Test
- public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query_time_join_aggregation");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
- String d1s1Path = "root.sg.d1.s1";
- timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
-
- String d2s1Path = "root.sg.d22.s1";
- timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT));
-
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(3, plan.getInstances().size());
- Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
- }
-
- private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
- if (root == null) {
- return;
- }
- if (root instanceof SeriesAggregationSourceNode) {
- SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
- List<AggregationDescriptor> descriptorList = handle.getAggregationDescriptorList();
- descriptorList.forEach(
- d -> {
- assertEquals(expected.get(handle.getPartitionPath().getFullPath()), d.getStep());
- });
- }
- root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
- }
-
- @Test
- public void testTimeJoinAggregationMultiPerRegion() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query_time_join_aggregation");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
- String d1s1Path = "root.sg.d1.s1";
- timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
-
- String d3s1Path = "root.sg.d333.s1";
- timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
-
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(3, plan.getInstances().size());
- Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
- }
-
- @Test
- public void testTimeJoinAggregationMultiPerRegion2() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query_time_join_aggregation");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
- String d3s1Path = "root.sg.d333.s1";
- timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
-
- String d4s1Path = "root.sg.d4444.s1";
- timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT));
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(2, plan.getInstances().size());
- Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
- }
-
- @Test
- public void testGroupByLevelWithTwoChildren() throws IllegalPathException {
- QueryId queryId = new QueryId("test_group_by_level_two_children");
- String d1s1Path = "root.sg.d1.s1";
- String d2s1Path = "root.sg.d22.s1";
- String groupedPath = "root.sg.*.s1";
-
- GroupByLevelNode groupByLevelNode =
- new GroupByLevelNode(
- new PlanNodeId("TestGroupByLevelNode"),
- Arrays.asList(
- genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
- genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
- Collections.singletonList(
- new GroupByLevelDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Arrays.asList(
- new TimeSeriesOperand(new PartialPath(d1s1Path)),
- new TimeSeriesOperand(new PartialPath(d2s1Path))),
- new TimeSeriesOperand(new PartialPath(groupedPath)))));
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(3, plan.getInstances().size());
- Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
- }
-
- @Test
- public void testAggregationWithMultiGroupByLevelNode() throws IllegalPathException {
- QueryId queryId = new QueryId("test_group_by_level_two_children");
- String d3s1Path = "root.sg.d333.s1";
- String d4s1Path = "root.sg.d4444.s1";
- String groupedPath = "root.sg.*.s1";
-
- GroupByLevelNode groupByLevelNode =
- new GroupByLevelNode(
- new PlanNodeId("TestGroupByLevelNode"),
- Arrays.asList(
- genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT),
- genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT)),
- Collections.singletonList(
- new GroupByLevelDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Arrays.asList(
- new TimeSeriesOperand(new PartialPath(d3s1Path)),
- new TimeSeriesOperand(new PartialPath(d4s1Path))),
- new TimeSeriesOperand(new PartialPath(groupedPath)))));
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(2, plan.getInstances().size());
- Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-
- Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
- expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
- verifyGroupByLevelDescriptor(
- expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
-
- Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
- expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
- verifyGroupByLevelDescriptor(
- expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
- }
-
- @Test
- public void testGroupByLevelTwoSeries() throws IllegalPathException {
- QueryId queryId = new QueryId("test_group_by_level_two_series");
- String d1s1Path = "root.sg.d1.s1";
- String d1s2Path = "root.sg.d1.s2";
- String groupedPathS1 = "root.sg.*.s1";
- String groupedPathS2 = "root.sg.*.s2";
-
- GroupByLevelNode groupByLevelNode =
- new GroupByLevelNode(
- new PlanNodeId("TestGroupByLevelNode"),
- Arrays.asList(
- genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
- genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT)),
- Arrays.asList(
- new GroupByLevelDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path))),
- new TimeSeriesOperand(new PartialPath(groupedPathS1))),
- new GroupByLevelDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
- new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(2, plan.getInstances().size());
- Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-
- Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
- expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
- expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
- verifyGroupByLevelDescriptor(
- expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
-
- Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
- expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
- expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
- verifyGroupByLevelDescriptor(
- expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
- }
-
- @Test
- public void testGroupByLevel2Series2Devices3Regions() throws IllegalPathException {
- QueryId queryId = new QueryId("test_group_by_level_two_series");
- String d1s1Path = "root.sg.d1.s1";
- String d1s2Path = "root.sg.d1.s2";
- String d2s1Path = "root.sg.d22.s1";
- String groupedPathS1 = "root.sg.*.s1";
- String groupedPathS2 = "root.sg.*.s2";
-
- GroupByLevelNode groupByLevelNode =
- new GroupByLevelNode(
- new PlanNodeId("TestGroupByLevelNode"),
- Arrays.asList(
- genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
- genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT),
- genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
- Arrays.asList(
- new GroupByLevelDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Arrays.asList(
- new TimeSeriesOperand(new PartialPath(d1s1Path)),
- new TimeSeriesOperand(new PartialPath(d2s1Path))),
- new TimeSeriesOperand(new PartialPath(groupedPathS1))),
- new GroupByLevelDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
- new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(3, plan.getInstances().size());
- Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
- expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-
- Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
- expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path, d2s1Path));
- expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
- verifyGroupByLevelDescriptor(
- expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
-
- Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
- expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
- expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
- verifyGroupByLevelDescriptor(
- expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
- }
-
- @Test
- public void testAggregation1Series1Region() throws IllegalPathException {
- QueryId queryId = new QueryId("test_aggregation_1_series_1_region");
- String d2s1Path = "root.sg.d22.s1";
-
- PlanNode root = genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT);
- Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(1, plan.getInstances().size());
- assertEquals(root, plan.getInstances().get(0).getFragment().getRoot().getChildren().get(0));
- }
-
- private void verifyGroupByLevelDescriptor(
- Map<String, List<String>> expected, GroupByLevelNode node) {
- List<GroupByLevelDescriptor> descriptors = node.getGroupByLevelDescriptors();
- assertEquals(expected.size(), descriptors.size());
- for (GroupByLevelDescriptor descriptor : descriptors) {
- String outputExpression = descriptor.getOutputExpression().getExpressionString();
- assertEquals(expected.get(outputExpression).size(), descriptor.getInputExpressions().size());
- for (Expression inputExpression : descriptor.getInputExpressions()) {
- assertTrue(expected.get(outputExpression).contains(inputExpression.getExpressionString()));
- }
- }
- }
-
- private SeriesAggregationSourceNode genAggregationSourceNode(
- QueryId queryId, String path, AggregationType type) throws IllegalPathException {
- List<AggregationDescriptor> descriptors = new ArrayList<>();
- descriptors.add(
- new AggregationDescriptor(
- type,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(path)))));
-
- return new SeriesAggregationScanNode(
- queryId.genPlanNodeId(), new MeasurementPath(path, TSDataType.INT32), descriptors);
- }
-
- @Test
- public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query_aligned");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
- timeJoinNode.addChild(
- new AlignedSeriesScanNode(
- queryId.genPlanNodeId(),
- new AlignedPath("root.sg.d1", Arrays.asList("s1", "s2")),
- OrderBy.TIMESTAMP_ASC));
- timeJoinNode.addChild(
- new SeriesScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
- OrderBy.TIMESTAMP_ASC));
-
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(3, plan.getInstances().size());
- }
-
- @Test
- public void testSingleAlignedSeries() throws IllegalPathException {
- QueryId queryId = new QueryId("test_query_aligned");
- TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
- timeJoinNode.addChild(
- new AlignedSeriesScanNode(
- queryId.genPlanNodeId(),
- new AlignedPath("root.sg.d22", Arrays.asList("s1", "s2")),
- OrderBy.TIMESTAMP_ASC));
-
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(1, plan.getInstances().size());
- }
-
- @Test
- public void testInsertRowNodeParallelPlan() throws IllegalPathException {
- QueryId queryId = new QueryId("test_write");
- InsertRowNode insertRowNode =
- new InsertRowNode(
- queryId.genPlanNodeId(),
- new PartialPath("root.sg.d1"),
- false,
- new String[] {
- "s1",
- },
- new TSDataType[] {TSDataType.INT32},
- 1L,
- new Object[] {10},
- false);
- insertRowNode.setMeasurementSchemas(
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
- });
-
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- context.setQueryType(QueryType.WRITE);
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(1, plan.getInstances().size());
- }
-
- @Test
- public void testInsertRowsNodeParallelPlan() throws IllegalPathException {
- QueryId queryId = new QueryId("test_write");
- InsertRowNode insertRowNode1 =
- new InsertRowNode(
- queryId.genPlanNodeId(),
- new PartialPath("root.sg.d1"),
- false,
- new String[] {"s1"},
- new TSDataType[] {TSDataType.INT32},
- 1L,
- new Object[] {10},
- false);
- insertRowNode1.setMeasurementSchemas(
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
- });
-
- InsertRowNode insertRowNode2 =
- new InsertRowNode(
- queryId.genPlanNodeId(),
- new PartialPath("root.sg.d1"),
- false,
- new String[] {"s1"},
- new TSDataType[] {TSDataType.INT32},
- 100000L,
- new Object[] {10},
- false);
- insertRowNode2.setMeasurementSchemas(
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
- });
-
- InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
- node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
- node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
-
- Analysis analysis = constructAnalysis();
-
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- context.setQueryType(QueryType.WRITE);
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
- DistributedQueryPlan plan = planner.planFragments();
- assertEquals(1, plan.getInstances().size());
- }
-
- private Analysis constructAnalysis() throws IllegalPathException {
-
- SeriesPartitionExecutor executor =
- SeriesPartitionExecutor.getSeriesPartitionExecutor(
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
- Analysis analysis = new Analysis();
-
- String device1 = "root.sg.d1";
- String device2 = "root.sg.d22";
- String device3 = "root.sg.d333";
- String device4 = "root.sg.d4444";
-
- DataPartition dataPartition =
- new DataPartition(
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-
- Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
- dataPartitionMap = new HashMap<>();
- Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
- new HashMap<>();
-
- List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
- d1DataRegions.add(
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(11)
- .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(12)
- .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
- d1DataRegions.add(
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(21)
- .setExternalEndPoint(new TEndPoint("192.0.2.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(22)
- .setExternalEndPoint(new TEndPoint("192.0.2.2", 9000)))));
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
- d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
-
- List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
- d2DataRegions.add(
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(31)
- .setExternalEndPoint(new TEndPoint("192.0.3.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(32)
- .setExternalEndPoint(new TEndPoint("192.0.3.2", 9000)))));
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
- d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
-
- List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
- d3DataRegions.add(
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(11)
- .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(12)
- .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
- d3DataRegions.add(
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(41)
- .setExternalEndPoint(new TEndPoint("192.0.4.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(42)
- .setExternalEndPoint(new TEndPoint("192.0.4.2", 9000)))));
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
- d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
-
- List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
- d4DataRegions.add(
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(11)
- .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(12)
- .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
- d4DataRegions.add(
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(41)
- .setExternalEndPoint(new TEndPoint("192.0.4.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(42)
- .setExternalEndPoint(new TEndPoint("192.0.4.2", 9000)))));
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
- d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
-
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
-
- dataPartitionMap.put("root.sg", sgPartitionMap);
-
- dataPartition.setDataPartitionMap(dataPartitionMap);
-
- analysis.setDataPartitionInfo(dataPartition);
-
- // construct AggregationExpression for GroupByLevel
- Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
- Set<Expression> s1Expression = new HashSet<>();
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
-
- Set<Expression> s2Expression = new HashSet<>();
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
-
- aggregationExpression.put("root.sg.*.s1", s1Expression);
- aggregationExpression.put("root.sg.*.s2", s2Expression);
- // analysis.setAggregationExpressions(aggregationExpression);
-
- // construct schema partition
- SchemaPartition schemaPartition =
- new SchemaPartition(
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
- Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>();
-
- TRegionReplicaSet schemaRegion1 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(11)
- .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(12)
- .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
- Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
-
- TRegionReplicaSet schemaRegion2 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
- Arrays.asList(
- new TDataNodeLocation()
- .setDataNodeId(21)
- .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
- new TDataNodeLocation()
- .setDataNodeId(22)
- .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
-
- schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
- schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
- schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
- schemaPartitionMap.put("root.sg", schemaRegionMap);
- schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
-
- analysis.setDataPartitionInfo(dataPartition);
- analysis.setSchemaPartitionInfo(schemaPartition);
- return analysis;
- }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
new file mode 100644
index 0000000000..0991684f2d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AggregationDistributionTest {
+ @Test
+ public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_time_join_aggregation");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+ String d1s1Path = "root.sg.d1.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
+
+ String d2s1Path = "root.sg.d22.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT));
+
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ }
+
+ private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
+ if (root == null) {
+ return;
+ }
+ if (root instanceof SeriesAggregationSourceNode) {
+ SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
+ List<AggregationDescriptor> descriptorList = handle.getAggregationDescriptorList();
+ descriptorList.forEach(
+ d -> {
+ assertEquals(expected.get(handle.getPartitionPath().getFullPath()), d.getStep());
+ });
+ }
+ root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
+ }
+
+ @Test
+ public void testTimeJoinAggregationMultiPerRegion() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_time_join_aggregation");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+ String d1s1Path = "root.sg.d1.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
+
+ String d3s1Path = "root.sg.d333.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
+
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ }
+
+ @Test
+ public void testTimeJoinAggregationMultiPerRegion2() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_time_join_aggregation");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+ String d3s1Path = "root.sg.d333.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
+
+ String d4s1Path = "root.sg.d4444.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT));
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ }
+
+ @Test
+ public void testGroupByLevelWithTwoChildren() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_children");
+ String d1s1Path = "root.sg.d1.s1";
+ String d2s1Path = "root.sg.d22.s1";
+ String groupedPath = "root.sg.*.s1";
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Arrays.asList(
+ genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+ genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
+ Collections.singletonList(
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Arrays.asList(
+ new TimeSeriesOperand(new PartialPath(d1s1Path)),
+ new TimeSeriesOperand(new PartialPath(d2s1Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPath)))));
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ }
+
+ @Test
+ public void testAggregationWithMultiGroupByLevelNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_children");
+ String d3s1Path = "root.sg.d333.s1";
+ String d4s1Path = "root.sg.d4444.s1";
+ String groupedPath = "root.sg.*.s1";
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Arrays.asList(
+ genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT),
+ genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT)),
+ Collections.singletonList(
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Arrays.asList(
+ new TimeSeriesOperand(new PartialPath(d3s1Path)),
+ new TimeSeriesOperand(new PartialPath(d4s1Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPath)))));
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+
+ Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+ expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue,
+ (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+ Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+ expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue2,
+ (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+ }
+
+ @Test
+ public void testGroupByLevelTwoSeries() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_series");
+ String d1s1Path = "root.sg.d1.s1";
+ String d1s2Path = "root.sg.d1.s2";
+ String groupedPathS1 = "root.sg.*.s1";
+ String groupedPathS2 = "root.sg.*.s2";
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Arrays.asList(
+ genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+ genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT)),
+ Arrays.asList(
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+
+ Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+ expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
+ expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue,
+ (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+ Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+ expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
+ expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue2,
+ (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+ }
+
+ @Test
+ public void testGroupByLevel2Series2Devices3Regions() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_series");
+ String d1s1Path = "root.sg.d1.s1";
+ String d1s2Path = "root.sg.d1.s2";
+ String d2s1Path = "root.sg.d22.s1";
+ String groupedPathS1 = "root.sg.*.s1";
+ String groupedPathS2 = "root.sg.*.s2";
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Arrays.asList(
+ genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+ genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT),
+ genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
+ Arrays.asList(
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Arrays.asList(
+ new TimeSeriesOperand(new PartialPath(d1s1Path)),
+ new TimeSeriesOperand(new PartialPath(d2s1Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
+ expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+
+ Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+ expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path, d2s1Path));
+ expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue,
+ (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+ Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+ expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
+ expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue2,
+ (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+ }
+
+ @Test
+ public void testAggregation1Series1Region() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_aggregation_1_series_1_region");
+ String d2s1Path = "root.sg.d22.s1";
+
+ PlanNode root = genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT);
+ Analysis analysis = util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+ assertEquals(root, plan.getInstances().get(0).getFragment().getRoot().getChildren().get(0));
+ }
+
+ private void verifyGroupByLevelDescriptor(
+ Map<String, List<String>> expected, GroupByLevelNode node) {
+ List<GroupByLevelDescriptor> descriptors = node.getGroupByLevelDescriptors();
+ assertEquals(expected.size(), descriptors.size());
+ for (GroupByLevelDescriptor descriptor : descriptors) {
+ String outputExpression = descriptor.getOutputExpression().getExpressionString();
+ assertEquals(expected.get(outputExpression).size(), descriptor.getInputExpressions().size());
+ for (Expression inputExpression : descriptor.getInputExpressions()) {
+ assertTrue(expected.get(outputExpression).contains(inputExpression.getExpressionString()));
+ }
+ }
+ }
+
+ private SeriesAggregationSourceNode genAggregationSourceNode(
+ QueryId queryId, String path, AggregationType type) throws IllegalPathException {
+ List<AggregationDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(
+ new AggregationDescriptor(
+ type,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(path)))));
+
+ return new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(), new MeasurementPath(path, TSDataType.INT32), descriptors);
+ }
+
+ @Test
+ public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_aligned");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+ timeJoinNode.addChild(
+ new AlignedSeriesScanNode(
+ queryId.genPlanNodeId(),
+ new AlignedPath("root.sg.d1", Arrays.asList("s1", "s2")),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
new file mode 100644
index 0000000000..8437ec06c3
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class BasicTest {
+
+ @Test
+ public void testSingleSeriesScan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ SeriesScanNode root =
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC);
+
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ }
+
+ @Test
+ public void testSingleSeriesScanRewriteSource() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ SeriesScanNode root =
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC);
+
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ PlanNode rootAfterRewrite = planner.rewriteSource();
+ assertEquals(2, rootAfterRewrite.getChildren().size());
+ }
+
+ @Test
+ public void testRewriteSourceNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+ Analysis analysis = util.constructAnalysis();
+
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
+ PlanNode newRoot = planner.rewriteSource();
+ assertEquals(4, newRoot.getChildren().get(0).getChildren().size());
+ }
+
+ @Test
+ public void testRewriteMetaSourceNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ SchemaQueryMergeNode metaMergeNode = new SchemaQueryMergeNode(queryId.genPlanNodeId(), false);
+ metaMergeNode.addChild(
+ new TimeSeriesSchemaScanNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1.s1"),
+ null,
+ null,
+ 10,
+ 0,
+ false,
+ false,
+ false));
+ metaMergeNode.addChild(
+ new TimeSeriesSchemaScanNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1.s2"),
+ null,
+ null,
+ 10,
+ 0,
+ false,
+ false,
+ false));
+ metaMergeNode.addChild(
+ new TimeSeriesSchemaScanNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d22.s1"),
+ null,
+ null,
+ 10,
+ 0,
+ false,
+ false,
+ false));
+ LimitNode root2 = new LimitNode(queryId.genPlanNodeId(), metaMergeNode, 10);
+ Analysis analysis = util.constructAnalysis();
+ DistributionPlanner planner2 =
+ new DistributionPlanner(
+ analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root2));
+ PlanNode newRoot2 = planner2.rewriteSource();
+ assertEquals(newRoot2.getChildren().get(0).getChildren().size(), 2);
+ }
+
+ @Test
+ public void testAddExchangeNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+ Analysis analysis = util.constructAnalysis();
+
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
+ PlanNode rootAfterRewrite = planner.rewriteSource();
+ PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
+ assertEquals(4, rootWithExchange.getChildren().get(0).getChildren().size());
+ int exchangeNodeCount = 0;
+ for (PlanNode child : rootWithExchange.getChildren().get(0).getChildren()) {
+ exchangeNodeCount += child instanceof ExchangeNode ? 1 : 0;
+ }
+ assertEquals(2, exchangeNodeCount);
+ }
+
+ @Test
+ public void testSplitFragment() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ PlanNode rootAfterRewrite = planner.rewriteSource();
+ PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
+ SubPlan subPlan = planner.splitFragment(rootWithExchange);
+ assertEquals(subPlan.getChildren().size(), 2);
+ }
+
+ @Test
+ public void testParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+ timeJoinNode.addChild(
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
+ OrderBy.TIMESTAMP_ASC));
+
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ }
+
+ @Test
+ public void testSingleAlignedSeries() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_aligned");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+ timeJoinNode.addChild(
+ new AlignedSeriesScanNode(
+ queryId.genPlanNodeId(),
+ new AlignedPath("root.sg.d22", Arrays.asList("s1", "s2")),
+ OrderBy.TIMESTAMP_ASC));
+
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+ }
+
+ @Test
+ public void testInsertRowNodeParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_write");
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {
+ "s1",
+ },
+ new TSDataType[] {TSDataType.INT32},
+ 1L,
+ new Object[] {10},
+ false);
+ insertRowNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ });
+
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ context.setQueryType(QueryType.WRITE);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+ }
+
+ @Test
+ public void testInsertRowsNodeParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_write");
+ InsertRowNode insertRowNode1 =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {"s1"},
+ new TSDataType[] {TSDataType.INT32},
+ 1L,
+ new Object[] {10},
+ false);
+ insertRowNode1.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ });
+
+ InsertRowNode insertRowNode2 =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {"s1"},
+ new TSDataType[] {TSDataType.INT32},
+ 100000L,
+ new Object[] {10},
+ false);
+ insertRowNode2.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ });
+
+ InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
+ node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
+ node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
+
+ Analysis analysis = util.constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ context.setQueryType(QueryType.WRITE);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java
new file mode 100644
index 0000000000..1656dce37d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class DeleteTimeseriesTest {
+
+ @Test
+ public void test1DataRegion1SchemaRegion() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_series");
+ String d2s1Path = "root.sg.d22.s1";
+ List<PartialPath> paths = Collections.singletonList(new PartialPath(d2s1Path));
+ Analysis analysis = util.constructAnalysis();
+ LogicalQueryPlan logicalQueryPlan = constructLogicalPlan(queryId, paths, analysis);
+
+ System.out.println(PlanNodeUtil.nodeToString(logicalQueryPlan.getRootNode()));
+
+ DistributionPlanner planner = new DistributionPlanner(analysis, logicalQueryPlan);
+ DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+ distributedQueryPlan.getInstances().forEach(System.out::println);
+ Assert.assertEquals(6, distributedQueryPlan.getInstances().size());
+ }
+
+ private LogicalQueryPlan constructLogicalPlan(
+ QueryId queryId, List<PartialPath> paths, Analysis analysis) throws IllegalPathException {
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DeleteTimeSeriesStatement statement = new DeleteTimeSeriesStatement();
+
+ analysis.setStatement(statement);
+ statement.setPartialPaths(paths);
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ return planner.plan(analysis);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/util.java
new file mode 100644
index 0000000000..76af829fe1
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/util.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class util {
+ public static Analysis constructAnalysis() throws IllegalPathException {
+
+ SeriesPartitionExecutor executor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ Analysis analysis = new Analysis();
+
+ String device1 = "root.sg.d1";
+ String device2 = "root.sg.d22";
+ String device3 = "root.sg.d333";
+ String device4 = "root.sg.d4444";
+
+ TRegionReplicaSet dataRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+ Arrays.asList(
+ genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+ TRegionReplicaSet dataRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+ Arrays.asList(
+ genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+ TRegionReplicaSet dataRegion3 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
+ Arrays.asList(
+ genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2")));
+
+ TRegionReplicaSet dataRegion4 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+ Arrays.asList(
+ genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2")));
+
+ TRegionReplicaSet dataRegion5 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 5),
+ Arrays.asList(
+ genDataNodeLocation(51, "192.0.5.1"), genDataNodeLocation(52, "192.0.5.2")));
+
+ DataPartition dataPartition =
+ new DataPartition(
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
+ new HashMap<>();
+
+ List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(dataRegion1);
+ d1DataRegions.add(dataRegion2);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+ d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
+
+ List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(dataRegion3);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+ d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
+
+ List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
+ d3DataRegions.add(dataRegion1);
+ d3DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+ d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
+
+ List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
+ d4DataRegions.add(dataRegion1);
+ d4DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
+ d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
+
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
+
+ dataPartitionMap.put("root.sg", sgPartitionMap);
+
+ dataPartition.setDataPartitionMap(dataPartitionMap);
+
+ analysis.setDataPartitionInfo(dataPartition);
+
+ // construct AggregationExpression for GroupByLevel
+ Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
+ Set<Expression> s1Expression = new HashSet<>();
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
+
+ Set<Expression> s2Expression = new HashSet<>();
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
+
+ aggregationExpression.put("root.sg.*.s1", s1Expression);
+ aggregationExpression.put("root.sg.*.s2", s2Expression);
+ // analysis.setAggregationExpressions(aggregationExpression);
+
+ // construct schema partition
+ SchemaPartition schemaPartition =
+ new SchemaPartition(
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>();
+
+ TRegionReplicaSet schemaRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+ Arrays.asList(
+ genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+ TRegionReplicaSet schemaRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+ Arrays.asList(
+ genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
+ schemaPartitionMap.put("root.sg", schemaRegionMap);
+ schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+ analysis.setDataPartitionInfo(dataPartition);
+ analysis.setSchemaPartitionInfo(schemaPartition);
+ return analysis;
+ }
+
+ private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) {
+ return new TDataNodeLocation()
+ .setDataNodeId(dataNodeId)
+ .setExternalEndPoint(new TEndPoint(ip, 9000))
+ .setDataBlockManagerEndPoint(new TEndPoint(ip, 9001))
+ .setInternalEndPoint(new TEndPoint(ip, 9002));
+ }
+}