You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/12 13:26:03 UTC
[iotdb] branch master updated: [IOTDB-3079] Implememtation of complete LogicalPlanner (#5858)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3eaf0547c3 [IOTDB-3079] Implememtation of complete LogicalPlanner (#5858)
3eaf0547c3 is described below
commit 3eaf0547c35d778f6418615d8ddec51861590d78
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Thu May 12 21:25:58 2022 +0800
[IOTDB-3079] Implememtation of complete LogicalPlanner (#5858)
---
.../apache/iotdb/db/metadata/utils/MetaUtils.java | 30 +
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 26 +-
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 59 +-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 1 +
.../db/mpp/plan/analyze/ExpressionAnalyzer.java | 16 +
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 16 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 8 +-
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 433 ++++++++--
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 160 +++-
.../planner/plan/node/process/DeviceViewNode.java | 88 ++-
.../plan/node/process/GroupByLevelNode.java | 4 +-
.../source/AlignedSeriesAggregationScanNode.java | 11 +-
.../plan/node/source/AlignedSeriesScanNode.java | 4 +
.../node/source/SeriesAggregationScanNode.java | 52 +-
.../planner/plan/node/source/SeriesScanNode.java | 40 +-
.../plan/parameter/AggregationDescriptor.java | 10 +-
.../plan/parameter/GroupByTimeParameter.java | 8 +
.../org/apache/iotdb/db/utils/SchemaUtils.java | 52 ++
.../db/mpp/plan/plan/DistributionPlannerTest.java | 15 -
.../mpp/plan/plan/FragmentInstanceSerdeTest.java | 16 +-
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 878 +++++++++++++--------
.../node/process/AggregationNodeSerdeTest.java | 1 -
.../plan/node/process/DeviceViewNodeSerdeTest.java | 4 +-
.../node/process/GroupByLevelNodeSerdeTest.java | 3 -
.../plan/plan/node/process/LimitNodeSerdeTest.java | 2 -
.../plan/node/process/OffsetNodeSerdeTest.java | 2 -
.../plan/plan/node/process/SortNodeSerdeTest.java | 2 -
.../plan/node/process/TimeJoinNodeSerdeTest.java | 3 -
.../source/SeriesAggregationScanNodeSerdeTest.java | 1 -
.../plan/node/source/SeriesScanNodeSerdeTest.java | 2 -
30 files changed, 1410 insertions(+), 537 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index e231c174d0..ad2778960c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.tsfile.read.common.Path;
import java.util.ArrayList;
@@ -189,4 +190,33 @@ public class MetaUtils {
}
return alignedPathToAggrIndexesMap;
}
+
+ public static Map<PartialPath, List<AggregationDescriptor>> groupAlignedAggregations(
+ Map<PartialPath, List<AggregationDescriptor>> pathToAggregations) {
+ Map<PartialPath, List<AggregationDescriptor>> result = new HashMap<>();
+ List<AggregationDescriptor> alignedPathAggregations = new ArrayList<>();
+ AlignedPath alignedPath = null;
+ for (PartialPath path : pathToAggregations.keySet()) {
+ MeasurementPath measurementPath = (MeasurementPath) path;
+ if (!measurementPath.isUnderAlignedEntity()) {
+ result
+ .computeIfAbsent(measurementPath, key -> new ArrayList<>())
+ .addAll(pathToAggregations.get(path));
+ alignedPath = null;
+ alignedPathAggregations.clear();
+ } else {
+ if (alignedPath == null || !alignedPath.equals(measurementPath.getDevice())) {
+ alignedPath = new AlignedPath(measurementPath);
+ alignedPathAggregations.addAll(pathToAggregations.get(path));
+ } else {
+ alignedPath.addMeasurement(measurementPath);
+ alignedPathAggregations.addAll(pathToAggregations.get(path));
+ }
+ }
+ }
+ if (alignedPath != null) {
+ result.put(alignedPath, alignedPathAggregations);
+ }
+ return result;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 3c1f8b3b64..892d24cf77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -57,11 +57,11 @@ public class Analysis {
// map from device name to series/aggregation under this device
private Map<String, Set<Expression>> sourceExpressions;
- //
+ // expression of output column to be calculated
private Set<Expression> selectExpressions;
// all aggregations that need to be calculated
- private Map<String, Set<Expression>> AggregationExpressions;
+ private Map<String, Set<Expression>> aggregationExpressions;
// map from grouped path name to list of input aggregation in `GROUP BY LEVEL` clause
private Map<Expression, Set<Expression>> groupByLevelExpressions;
@@ -77,17 +77,22 @@ public class Analysis {
private Expression queryFilter;
+ // map from device name to query filter under this device (used in ALIGN BY DEVICE)
private Map<String, Expression> deviceToQueryFilter;
// indicate is there a value filter
private boolean hasValueFilter = false;
- // a global time filter used in `initQueryDataSource`
+ // a global time filter used in `initQueryDataSource` and filter push down
private Filter globalTimeFilter;
// header of result dataset
private DatasetHeader respDatasetHeader;
+ // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but
+ // not 0 because device is the first column
+ private Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+
public Analysis() {}
public List<TRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
@@ -173,11 +178,11 @@ public class Analysis {
}
public Map<String, Set<Expression>> getAggregationExpressions() {
- return AggregationExpressions;
+ return aggregationExpressions;
}
public void setAggregationExpressions(Map<String, Set<Expression>> aggregationExpressions) {
- AggregationExpressions = aggregationExpressions;
+ this.aggregationExpressions = aggregationExpressions;
}
public Map<Expression, Set<Expression>> getGroupByLevelExpressions() {
@@ -204,7 +209,7 @@ public class Analysis {
this.fillDescriptor = fillDescriptor;
}
- public boolean isHasValueFilter() {
+ public boolean hasValueFilter() {
return hasValueFilter;
}
@@ -235,4 +240,13 @@ public class Analysis {
public void setGroupByTimeParameter(GroupByTimeParameter groupByTimeParameter) {
this.groupByTimeParameter = groupByTimeParameter;
}
+
+ public void setDeviceToMeasurementIndexesMap(
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
+ this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
+ }
+
+ public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+ return deviceToMeasurementIndexesMap;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 6217f23185..f52ba6fd77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -79,6 +79,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -144,7 +145,7 @@ public class Analyzer {
}
List<Pair<Expression, String>> outputExpressions;
- Set<Expression> selectExpressions = new HashSet<>();
+ Set<Expression> selectExpressions = new LinkedHashSet<>();
Map<String, Set<Expression>> sourceExpressions = new HashMap<>();
// Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1
// outputExpressions: [<root.sg.d1.s1,null>, <root.sg.d1.s1 + root.sg.d1.s2,t>,
@@ -175,9 +176,34 @@ public class Analyzer {
// a set that contains all measurement names,
Set<String> measurementSet = new HashSet<>();
if (queryStatement.isAlignByDevice()) {
+ Map<String, Set<String>> deviceToMeasurementsMap = new HashMap<>();
outputExpressions =
analyzeFrom(
- queryStatement, schemaTree, deviceSchemaInfos, selectExpressions, measurementSet);
+ queryStatement,
+ schemaTree,
+ deviceSchemaInfos,
+ selectExpressions,
+ deviceToMeasurementsMap,
+ measurementSet);
+
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ List<String> allMeasurements =
+ outputExpressions.stream()
+ .map(Pair::getLeft)
+ .map(Expression::getExpressionString)
+ .distinct()
+ .collect(Collectors.toList());
+ for (String deviceName : deviceToMeasurementsMap.keySet()) {
+ List<String> measurementsUnderDeivce =
+ new ArrayList<>(deviceToMeasurementsMap.get(deviceName));
+ List<Integer> indexes = new ArrayList<>();
+ for (String measurement : measurementsUnderDeivce) {
+ indexes.add(
+ allMeasurements.indexOf(measurement) + 1); // add 1 to skip the device column
+ }
+ deviceToMeasurementIndexesMap.put(deviceName, indexes);
+ }
+ analysis.setDeviceToMeasurementIndexesMap(deviceToMeasurementIndexesMap);
} else {
outputExpressions = analyzeSelect(queryStatement, schemaTree);
selectExpressions =
@@ -329,10 +355,15 @@ public class Analyzer {
SchemaTree schemaTree,
List<DeviceSchemaInfo> allDeviceSchemaInfos,
Set<Expression> selectExpressions,
+ Map<String, Set<String>> deviceToMeasurementsMap,
Set<String> measurementSet) {
// device path patterns in FROM clause
List<PartialPath> devicePatternList = queryStatement.getFromComponent().getPrefixPaths();
+ // a list of measurement name with alias (null if alias not exist)
+ List<Pair<Expression, String>> measurementWithAliasList =
+ getAllMeasurements(queryStatement, measurementSet);
+
// a list contains all selected paths
List<MeasurementPath> allSelectedPaths = new ArrayList<>();
for (PartialPath devicePattern : devicePatternList) {
@@ -370,10 +401,6 @@ public class Analyzer {
// if not, throw a SemanticException
measurementNameToPathsMap.values().forEach(this::checkDataTypeConsistencyInAlignByDevice);
- // a list of measurement name with alias (null if alias not exist)
- List<Pair<Expression, String>> measurementWithAliasList =
- getAllMeasurements(queryStatement, measurementSet);
-
// apply SLIMIT & SOFFSET and set outputExpressions & selectExpressions
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
ColumnPaginationController paginationController =
@@ -399,6 +426,9 @@ public class Analyzer {
measurementAliasPair.left, measurementPath);
typeProvider.setType(tmpExpression.getExpressionString(), dataType);
selectExpressions.add(tmpExpression);
+ deviceToMeasurementsMap
+ .computeIfAbsent(measurementPath.getDevice(), key -> new LinkedHashSet<>())
+ .add(measurementAliasPair.left.getExpressionString());
}
paginationController.consumeLimit();
} else {
@@ -431,6 +461,9 @@ public class Analyzer {
expressionWithoutAlias, measurementPath);
typeProvider.setType(tmpExpression.getExpressionString(), dataType);
selectExpressions.add(tmpExpression);
+ deviceToMeasurementsMap
+ .computeIfAbsent(measurementPath.getDevice(), key -> new LinkedHashSet<>())
+ .add(expressionWithoutAlias.getExpressionString());
}
paginationController.consumeLimit();
} else {
@@ -459,6 +492,9 @@ public class Analyzer {
measurementAliasPair.left, measurementPath);
typeProvider.setType(tmpExpression.getExpressionString(), dataType);
selectExpressions.add(tmpExpression);
+ deviceToMeasurementsMap
+ .computeIfAbsent(measurementPath.getDevice(), key -> new LinkedHashSet<>())
+ .add(replacedMeasurement.getExpressionString());
}
paginationController.consumeLimit();
} else {
@@ -531,7 +567,7 @@ public class Analyzer {
sourceExpressions
.computeIfAbsent(
ExpressionAnalyzer.getDeviceNameInSourceExpression(sourceExpression),
- key -> new HashSet<>())
+ key -> new LinkedHashSet<>())
.add(sourceExpression);
}
}
@@ -668,7 +704,12 @@ public class Analyzer {
QueryStatement queryStatement, List<Pair<Expression, String>> outputExpressions) {
boolean isIgnoreTimestamp =
queryStatement.isAggregationQuery() && !queryStatement.isGroupByTime();
- List<ColumnHeader> columnHeaders =
+ List<ColumnHeader> columnHeaders = new ArrayList<>();
+ if (queryStatement.isAlignByDevice()) {
+ columnHeaders.add(new ColumnHeader(HeaderConstant.COLUMN_DEVICE, TSDataType.TEXT, null));
+ typeProvider.setType(HeaderConstant.COLUMN_DEVICE, TSDataType.TEXT);
+ }
+ columnHeaders.addAll(
outputExpressions.stream()
.map(
expressionWithAlias -> {
@@ -676,7 +717,7 @@ public class Analyzer {
String alias = expressionWithAlias.right;
return new ColumnHeader(columnName, typeProvider.getType(columnName), alias);
})
- .collect(Collectors.toList());
+ .collect(Collectors.toList()));
return new DatasetHeader(columnHeaders, isIgnoreTimestamp);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index e6f24a546e..61b6c3a99c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -317,6 +317,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
createAlignedTimeSeriesStatement.setEncodings(encodings);
createAlignedTimeSeriesStatement.setCompressors(compressors);
+ createAlignedTimeSeriesStatement.setAliasList(null);
executeCreateStatement(createAlignedTimeSeriesStatement);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 324ad66949..0578ce582e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -762,6 +762,22 @@ public class ExpressionAnalyzer {
}
}
+ /** Check for arithmetic expression, logical expression, UDF. Returns true if it exists. */
+ public static boolean checkIsNeedTransform(Expression expression) {
+ if (expression instanceof BinaryExpression) {
+ return true;
+ } else if (expression instanceof UnaryExpression) {
+ return true;
+ } else if (expression instanceof FunctionExpression) {
+ return !expression.isBuiltInAggregationFunctionExpression();
+ } else if (expression instanceof TimeSeriesOperand) {
+ return false;
+ } else {
+ throw new IllegalArgumentException(
+ "unsupported expression type: " + expression.getExpressionType());
+ }
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// Method can only be used in source expression
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 68b6e687f4..c0046625a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -60,23 +60,29 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
SchemaNode sg = new SchemaInternalNode("sg");
root.addChild("sg", sg);
- SchemaEntityNode d1 = new SchemaEntityNode("d1");
- sg.addChild("d1", d1);
-
SchemaMeasurementNode s1 =
new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
- d1.addChild("s1", s1);
SchemaMeasurementNode s2 =
- new SchemaMeasurementNode("s2", new MeasurementSchema("s1", TSDataType.INT32));
+ new SchemaMeasurementNode("s2", new MeasurementSchema("s2", TSDataType.DOUBLE));
+ SchemaMeasurementNode s3 =
+ new SchemaMeasurementNode("s3", new MeasurementSchema("s3", TSDataType.BOOLEAN));
+ SchemaMeasurementNode s4 =
+ new SchemaMeasurementNode("s4", new MeasurementSchema("s4", TSDataType.TEXT));
s2.setAlias("status");
+
+ SchemaEntityNode d1 = new SchemaEntityNode("d1");
+ sg.addChild("d1", d1);
+ d1.addChild("s1", s1);
d1.addChild("s2", s2);
d1.addAliasChild("status", s2);
+ d1.addChild("s3", s3);
SchemaEntityNode d2 = new SchemaEntityNode("d2");
sg.addChild("d2", d2);
d2.addChild("s1", s1);
d2.addChild("s2", s2);
d2.addAliasChild("status", s2);
+ d2.addChild("s4", s4);
SchemaEntityNode a = new SchemaEntityNode("a");
a.setAligned(true);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index ac215e0727..3273638f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -402,7 +402,13 @@ public class LocalExecutionPlanner {
node.getChildren().stream()
.map(child -> child.accept(this, context))
.collect(Collectors.toList());
- return new DeviceViewOperator(operatorContext, node.getDevices(), children, null, null);
+ List<List<Integer>> deviceColumnIndex =
+ node.getDevices().stream()
+ .map(deviceName -> node.getDeviceToMeasurementIndexesMap().get(deviceName))
+ .collect(Collectors.toList());
+ List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
+ return new DeviceViewOperator(
+ operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 62a8b7f83e..b0c1c48441 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
@@ -34,17 +38,40 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FilterNullParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.commons.lang.Validate;
+
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -65,79 +92,385 @@ public class LogicalPlanBuilder {
return root;
}
- public LogicalPlanBuilder planRawDataQuerySource(
- Map<String, Set<Expression>> deviceNameToPathsMap,
- OrderBy scanOrder,
- boolean isAlignByDevice) {
- Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+ public LogicalPlanBuilder withNewRoot(PlanNode newRoot) {
+ this.root = newRoot;
+ return this;
+ }
- for (Map.Entry<String, Set<Expression>> entry : deviceNameToPathsMap.entrySet()) {
- String deviceName = entry.getKey();
- Set<String> allSensors =
- entry.getValue().stream()
+ public LogicalPlanBuilder planRawDataSource(
+ Map<String, Set<Expression>> deviceNameToSourceExpressions,
+ OrderBy scanOrder,
+ Filter timeFilter) {
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ for (Set<Expression> sourceExpressionList :
+ deviceNameToSourceExpressions.values()) { // for each device
+ List<PartialPath> selectedPaths =
+ sourceExpressionList.stream()
.map(expression -> ((TimeSeriesOperand) expression).getPath())
- .map(PartialPath::getMeasurement)
- .collect(Collectors.toSet());
- for (Expression expression : entry.getValue()) {
- PartialPath path = ((TimeSeriesOperand) expression).getPath();
- deviceNameToSourceNodesMap
- .computeIfAbsent(deviceName, k -> new ArrayList<>())
- .add(
- new SeriesScanNode(
- context.getQueryId().genPlanNodeId(),
- (MeasurementPath) path,
- allSensors,
- scanOrder));
+ .collect(Collectors.toList());
+ List<PartialPath> groupedPaths = MetaUtils.groupAlignedPaths(selectedPaths);
+ for (PartialPath path : groupedPaths) {
+ if (path instanceof MeasurementPath) { // non-aligned series
+ SeriesScanNode seriesScanNode =
+ new SeriesScanNode(
+ context.getQueryId().genPlanNodeId(), (MeasurementPath) path, scanOrder);
+ seriesScanNode.setTimeFilter(timeFilter);
+ sourceNodeList.add(seriesScanNode);
+ } else if (path instanceof AlignedPath) { // aligned series
+ AlignedSeriesScanNode alignedSeriesScanNode =
+ new AlignedSeriesScanNode(
+ context.getQueryId().genPlanNodeId(), (AlignedPath) path, scanOrder);
+ alignedSeriesScanNode.setTimeFilter(timeFilter);
+ sourceNodeList.add(alignedSeriesScanNode);
+ } else {
+ throw new IllegalArgumentException("unexpected path type");
+ }
}
}
- if (isAlignByDevice) {
- planDeviceMerge(deviceNameToSourceNodesMap, scanOrder);
- } else {
- planTimeJoin(deviceNameToSourceNodesMap, scanOrder);
+ this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
+ return this;
+ }
+
+ public LogicalPlanBuilder planAggregationSource(
+ Map<String, Set<Expression>> deviceNameToSourceExpressions,
+ OrderBy scanOrder,
+ Filter timeFilter,
+ GroupByTimeParameter groupByTimeParameter,
+ Map<String, Set<Expression>> aggregationExpressions,
+ Map<Expression, Set<Expression>> groupByLevelExpressions,
+ TypeProvider typeProvider) {
+ AggregationStep curStep =
+ (groupByLevelExpressions != null
+ || (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()))
+ ? AggregationStep.PARTIAL
+ : AggregationStep.SINGLE;
+
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ for (Set<Expression> sourceExpressionList :
+ deviceNameToSourceExpressions.values()) { // for each device
+ Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
+ Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
+ for (Expression sourceExpression : sourceExpressionList) {
+ AggregationType aggregationFunction =
+ AggregationType.valueOf(
+ ((FunctionExpression) sourceExpression).getFunctionName().toUpperCase());
+ AggregationDescriptor aggregationDescriptor =
+ new AggregationDescriptor(
+ aggregationFunction, curStep, sourceExpression.getExpressions());
+ if (curStep.isOutputPartial()) {
+ updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider);
+ }
+ PartialPath selectPath =
+ ((TimeSeriesOperand) sourceExpression.getExpressions().get(0)).getPath();
+ if (SchemaUtils.isConsistentWithScanOrder(aggregationFunction, scanOrder)) {
+ ascendingAggregations
+ .computeIfAbsent(selectPath, key -> new ArrayList<>())
+ .add(aggregationDescriptor);
+ } else {
+ descendingAggregations
+ .computeIfAbsent(selectPath, key -> new ArrayList<>())
+ .add(aggregationDescriptor);
+ }
+ }
+
+ Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations =
+ MetaUtils.groupAlignedAggregations(ascendingAggregations);
+ Map<PartialPath, List<AggregationDescriptor>> groupedDescendingAggregations =
+ MetaUtils.groupAlignedAggregations(descendingAggregations);
+ for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
+ groupedAscendingAggregations.entrySet()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ pathAggregationsEntry.getKey(),
+ pathAggregationsEntry.getValue(),
+ scanOrder,
+ groupByTimeParameter,
+ timeFilter));
+ }
+ for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
+ groupedDescendingAggregations.entrySet()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ pathAggregationsEntry.getKey(),
+ pathAggregationsEntry.getValue(),
+ scanOrder,
+ groupByTimeParameter,
+ timeFilter));
+ }
}
+ if (curStep.isOutputPartial()) {
+ if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
+ curStep =
+ groupByLevelExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
+ this.root =
+ createGroupByTimeNode(
+ sourceNodeList, aggregationExpressions, groupByTimeParameter, curStep);
+
+ if (groupByLevelExpressions != null) {
+ curStep = AggregationStep.FINAL;
+ this.root =
+ createGroupByTLevelNode(this.root.getChildren(), groupByLevelExpressions, curStep);
+ }
+ } else {
+ if (groupByLevelExpressions != null) {
+ curStep = AggregationStep.FINAL;
+ this.root = createGroupByTLevelNode(sourceNodeList, groupByLevelExpressions, curStep);
+ }
+ }
+ } else {
+ this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
+ }
return this;
}
- public void planTimeJoin(
- Map<String, List<PlanNode>> deviceNameToSourceNodesMap, OrderBy mergeOrder) {
- List<PlanNode> sourceNodes =
- deviceNameToSourceNodesMap.entrySet().stream()
- .flatMap(entry -> entry.getValue().stream())
- .collect(Collectors.toList());
- this.root = convergeWithTimeJoin(sourceNodes, mergeOrder);
+ private void updateTypeProviderByPartialAggregation(
+ AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
+ List<AggregationType> splitAggregations =
+ SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
+ PartialPath path =
+ ((TimeSeriesOperand) aggregationDescriptor.getInputExpressions().get(0)).getPath();
+ for (AggregationType aggregationType : splitAggregations) {
+ String functionName = aggregationType.toString().toLowerCase();
+ typeProvider.setType(
+ String.format("%s(%s)", functionName, path.getFullPath()),
+ SchemaUtils.getSeriesTypeByPath(path, functionName));
+ }
}
- public void planDeviceMerge(
- Map<String, List<PlanNode>> deviceNameToSourceNodesMap, OrderBy mergeOrder) {
- List<String> measurements =
- deviceNameToSourceNodesMap.values().stream()
- .flatMap(List::stream)
- .map(node -> ((SeriesScanNode) node).getSeriesPath().getMeasurement())
- .distinct()
- .collect(Collectors.toList());
+ private PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, OrderBy mergeOrder) {
+ PlanNode tmpNode;
+ if (sourceNodes.size() == 1) {
+ tmpNode = sourceNodes.get(0);
+ } else {
+ tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes);
+ }
+ return tmpNode;
+ }
+
+ public LogicalPlanBuilder planDeviceView(
+ Map<String, PlanNode> deviceNameToSourceNodesMap,
+ List<String> outputColumnNames,
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap,
+ OrderBy mergeOrder) {
DeviceViewNode deviceViewNode =
new DeviceViewNode(
context.getQueryId().genPlanNodeId(),
Arrays.asList(OrderBy.DEVICE_ASC, mergeOrder),
- measurements);
- for (Map.Entry<String, List<PlanNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
+ outputColumnNames,
+ deviceToMeasurementIndexesMap);
+ for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
String deviceName = entry.getKey();
- List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
- deviceViewNode.addChildDeviceNode(deviceName, convergeWithTimeJoin(planNodes, mergeOrder));
+ PlanNode subPlan = entry.getValue();
+ deviceViewNode.addChildDeviceNode(deviceName, subPlan);
}
+
this.root = deviceViewNode;
+ return this;
}
- private PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, OrderBy mergeOrder) {
- PlanNode tmpNode;
- if (sourceNodes.size() == 1) {
- tmpNode = sourceNodes.get(0);
+ public LogicalPlanBuilder planGroupByLevel(
+ Map<Expression, Set<Expression>> groupByLevelExpressions, AggregationStep curStep) {
+ if (groupByLevelExpressions == null) {
+ return this;
+ }
+
+ this.root =
+ createGroupByTLevelNode(
+ Collections.singletonList(this.getRoot()), groupByLevelExpressions, curStep);
+ return this;
+ }
+
+ public LogicalPlanBuilder planAggregation(
+ Map<String, Set<Expression>> aggregationExpressions,
+ GroupByTimeParameter groupByTimeParameter,
+ AggregationStep curStep,
+ TypeProvider typeProvider) {
+ if (aggregationExpressions == null) {
+ return this;
+ }
+
+ List<AggregationDescriptor> aggregationDescriptorList =
+ constructAggregationDescriptorList(aggregationExpressions, curStep);
+ if (curStep.isOutputPartial()) {
+ aggregationDescriptorList.forEach(
+ aggregationDescriptor -> {
+ updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider);
+ });
+ }
+ this.root =
+ new AggregationNode(
+ context.getQueryId().genPlanNodeId(),
+ Collections.singletonList(this.getRoot()),
+ aggregationDescriptorList,
+ groupByTimeParameter);
+ return this;
+ }
+
+ public LogicalPlanBuilder planGroupByTime(
+ Map<String, Set<Expression>> aggregationExpressions,
+ GroupByTimeParameter groupByTimeParameter,
+ AggregationStep curStep) {
+ if (aggregationExpressions == null) {
+ return this;
+ }
+
+ this.root =
+ createGroupByTimeNode(
+ Collections.singletonList(this.getRoot()),
+ aggregationExpressions,
+ groupByTimeParameter,
+ curStep);
+ return this;
+ }
+
+ private PlanNode createGroupByTimeNode(
+ List<PlanNode> children,
+ Map<String, Set<Expression>> aggregationExpressions,
+ GroupByTimeParameter groupByTimeParameter,
+ AggregationStep curStep) {
+ List<AggregationDescriptor> aggregationDescriptorList =
+ constructAggregationDescriptorList(aggregationExpressions, curStep);
+ return new GroupByTimeNode(
+ context.getQueryId().genPlanNodeId(),
+ children,
+ aggregationDescriptorList,
+ groupByTimeParameter);
+ }
+
+ private PlanNode createGroupByTLevelNode(
+ List<PlanNode> children,
+ Map<Expression, Set<Expression>> groupByLevelExpressions,
+ AggregationStep curStep) {
+ List<String> outputColumnNames = new ArrayList<>();
+ List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>();
+ for (Expression groupedExpression : groupByLevelExpressions.keySet()) {
+ AggregationType aggregationFunction =
+ AggregationType.valueOf(
+ ((FunctionExpression) groupedExpression).getFunctionName().toUpperCase());
+ outputColumnNames.add(groupedExpression.getExpressionString());
+ aggregationDescriptorList.add(
+ new AggregationDescriptor(
+ aggregationFunction,
+ curStep,
+ new ArrayList<>(groupByLevelExpressions.get(groupedExpression))));
+ }
+ return new GroupByLevelNode(
+ context.getQueryId().genPlanNodeId(),
+ children,
+ aggregationDescriptorList,
+ outputColumnNames);
+ }
+
+ private PlanNode createAggregationScanNode(
+ PartialPath selectPath,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ OrderBy scanOrder,
+ GroupByTimeParameter groupByTimeParameter,
+ Filter timeFilter) {
+ if (selectPath instanceof MeasurementPath) { // non-aligned series
+ SeriesAggregationScanNode seriesAggregationScanNode =
+ new SeriesAggregationScanNode(
+ context.getQueryId().genPlanNodeId(),
+ (MeasurementPath) selectPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter);
+ seriesAggregationScanNode.setTimeFilter(timeFilter);
+ return seriesAggregationScanNode;
+ } else if (selectPath instanceof AlignedPath) { // aligned series
+ AlignedSeriesAggregationScanNode alignedSeriesAggregationScanNode =
+ new AlignedSeriesAggregationScanNode(
+ context.getQueryId().genPlanNodeId(),
+ (AlignedPath) selectPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter);
+ alignedSeriesAggregationScanNode.setTimeFilter(timeFilter);
+ return alignedSeriesAggregationScanNode;
} else {
- tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes);
+ throw new IllegalArgumentException("unexpected path type");
}
- return tmpNode;
+ }
+
+ private List<AggregationDescriptor> constructAggregationDescriptorList(
+ Map<String, Set<Expression>> aggregationExpressions, AggregationStep curStep) {
+ return aggregationExpressions.values().stream()
+ .flatMap(Set::stream)
+ .map(
+ expression -> {
+ Validate.isTrue(expression instanceof FunctionExpression);
+ AggregationType aggregationFunction =
+ AggregationType.valueOf(
+ ((FunctionExpression) expression).getFunctionName().toUpperCase());
+ return new AggregationDescriptor(
+ aggregationFunction, curStep, expression.getExpressions());
+ })
+ .collect(Collectors.toList());
+ }
+
+ public LogicalPlanBuilder planFilterAndTransform(
+ Expression queryFilter,
+ Set<Expression> selectExpressions,
+ boolean isGroupByTime,
+ ZoneId zoneId) {
+ if (queryFilter == null) {
+ return this;
+ }
+
+ this.root =
+ new FilterNode(
+ context.getQueryId().genPlanNodeId(),
+ this.getRoot(),
+ selectExpressions.toArray(new Expression[0]),
+ queryFilter,
+ isGroupByTime,
+ zoneId);
+ return this;
+ }
+
+ public LogicalPlanBuilder planTransform(
+ Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId) {
+ boolean needTransform = false;
+ for (Expression expression : selectExpressions) {
+ if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
+ needTransform = true;
+ break;
+ }
+ }
+ if (!needTransform) {
+ return this;
+ }
+
+ this.root =
+ new TransformNode(
+ context.getQueryId().genPlanNodeId(),
+ this.getRoot(),
+ selectExpressions.toArray(new Expression[0]),
+ isGroupByTime,
+ zoneId);
+ return this;
+ }
+
+ public LogicalPlanBuilder planFilterNull(FilterNullParameter filterNullParameter) {
+ if (filterNullParameter == null) {
+ return this;
+ }
+
+ this.root =
+ new FilterNullNode(
+ context.getQueryId().genPlanNodeId(), this.getRoot(), filterNullParameter);
+ return this;
+ }
+
+ public LogicalPlanBuilder planFill(FillDescriptor fillDescriptor) {
+ if (fillDescriptor == null) {
+ return this;
+ }
+
+ this.root = new FillNode(context.getQueryId().genPlanNodeId(), this.getRoot(), fillDescriptor);
+ return this;
}
public LogicalPlanBuilder planLimit(int rowLimit) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 064c59971f..befcdb71f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
@@ -48,9 +49,17 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
+import org.apache.iotdb.db.query.expression.Expression;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Generate a logical plan for the statement. */
public class LogicalPlanner {
@@ -96,14 +105,149 @@ public class LogicalPlanner {
@Override
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
- return new LogicalPlanBuilder(context)
- .planRawDataQuerySource(
- analysis.getSourceExpressions(),
- queryStatement.getResultOrder(),
- queryStatement.isAlignByDevice())
- .planOffset(queryStatement.getRowOffset())
- .planLimit(queryStatement.getRowLimit())
- .getRoot();
+ LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+
+ if (queryStatement.isAlignByDevice()) {
+ Map<String, PlanNode> deviceToSubPlanMap = new HashMap<>();
+ for (String deviceName : analysis.getSourceExpressions().keySet()) {
+ LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(context);
+ subPlanBuilder =
+ subPlanBuilder.withNewRoot(
+ visitQueryBody(
+ queryStatement,
+ Maps.asMap(
+ Sets.newHashSet(deviceName),
+ (key) -> analysis.getSourceExpressions().get(key)),
+ Maps.asMap(
+ Sets.newHashSet(deviceName),
+ (key) -> analysis.getAggregationExpressions().get(key)),
+ analysis.getSourceExpressions().get(deviceName),
+ analysis.getDeviceToQueryFilter() != null
+ ? analysis.getDeviceToQueryFilter().get(deviceName)
+ : null,
+ context));
+ deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
+ }
+ // convert to ALIGN BY DEVICE view
+ planBuilder =
+ planBuilder.planDeviceView(
+ deviceToSubPlanMap,
+ analysis.getRespDatasetHeader().getRespColumns().stream()
+ .distinct()
+ .collect(Collectors.toList()),
+ analysis.getDeviceToMeasurementIndexesMap(),
+ queryStatement.getResultOrder());
+ } else {
+ planBuilder =
+ planBuilder.withNewRoot(
+ visitQueryBody(
+ queryStatement,
+ analysis.getSourceExpressions(),
+ analysis.getAggregationExpressions(),
+ analysis.getSelectExpressions(),
+ analysis.getQueryFilter(),
+ context));
+ }
+
+ // other common upstream node
+ planBuilder =
+ planBuilder
+ .planFilterNull(analysis.getFilterNullParameter())
+ .planFill(analysis.getFillDescriptor())
+ .planOffset(queryStatement.getRowOffset())
+ .planLimit(queryStatement.getRowLimit());
+
+ return planBuilder.getRoot();
+ }
+
+ public PlanNode visitQueryBody(
+ QueryStatement queryStatement,
+ Map<String, Set<Expression>> sourceExpressions,
+ Map<String, Set<Expression>> aggregationExpressions,
+ Set<Expression> selectExpressions,
+ Expression queryFilter,
+ MPPQueryContext context) {
+ LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+ boolean isRawDataSource =
+ !queryStatement.isAggregationQuery()
+ || (queryStatement.isAggregationQuery() && analysis.hasValueFilter());
+
+ // plan data source node
+ if (isRawDataSource) {
+ planBuilder =
+ planBuilder.planRawDataSource(
+ sourceExpressions, queryStatement.getResultOrder(), analysis.getGlobalTimeFilter());
+
+ if (queryStatement.isAggregationQuery()) {
+ if (analysis.hasValueFilter()) {
+ planBuilder =
+ planBuilder.planFilterAndTransform(
+ queryFilter,
+ sourceExpressions.values().stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet()),
+ queryStatement.isGroupByTime(),
+ queryStatement.getSelectComponent().getZoneId());
+ }
+
+ boolean outputPartial =
+ queryStatement.isGroupByLevel()
+ || (queryStatement.isGroupByTime()
+ && analysis.getGroupByTimeParameter().hasOverlap());
+ AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.FINAL;
+ planBuilder =
+ planBuilder.planAggregation(
+ aggregationExpressions,
+ analysis.getGroupByTimeParameter(),
+ curStep,
+ analysis.getTypeProvider());
+
+ if (curStep.isOutputPartial()) {
+ if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
+ curStep =
+ queryStatement.isGroupByLevel()
+ ? AggregationStep.INTERMEDIATE
+ : AggregationStep.FINAL;
+ planBuilder =
+ planBuilder.planGroupByTime(
+ aggregationExpressions, analysis.getGroupByTimeParameter(), curStep);
+ }
+
+ if (queryStatement.isGroupByLevel()) {
+ curStep = AggregationStep.FINAL;
+ planBuilder =
+ planBuilder.planGroupByLevel(analysis.getGroupByLevelExpressions(), curStep);
+ }
+ }
+ } else {
+ if (analysis.hasValueFilter()) {
+ planBuilder =
+ planBuilder.planFilterAndTransform(
+ queryFilter,
+ selectExpressions,
+ queryStatement.isGroupByTime(),
+ queryStatement.getSelectComponent().getZoneId());
+ } else {
+ planBuilder =
+ planBuilder.planTransform(
+ selectExpressions,
+ queryStatement.isGroupByTime(),
+ queryStatement.getSelectComponent().getZoneId());
+ }
+ }
+ } else {
+ planBuilder =
+ planBuilder.planAggregationSource(
+ sourceExpressions,
+ queryStatement.getResultOrder(),
+ analysis.getGlobalTimeFilter(),
+ analysis.getGroupByTimeParameter(),
+ aggregationExpressions,
+ analysis.getGroupByLevelExpressions(),
+ analysis.getTypeProvider());
+ }
+
+ return planBuilder.getRoot();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index d9c049a389..589e666297 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/**
@@ -50,21 +52,35 @@ public class DeviceViewNode extends ProcessNode {
// each child node whose output TsBlock contains the data belonged to one device.
private final List<PlanNode> children = new ArrayList<>();
- // measurement columns in result output
- private final List<String> measurements;
+ // Device column and measurement columns in result output
+ private final List<String> outputColumnNames;
- public DeviceViewNode(PlanNodeId id, List<OrderBy> mergeOrders, List<String> measurements) {
+ // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but
+ // not 0 because device is the first column
+ private Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+
+ public DeviceViewNode(
+ PlanNodeId id,
+ List<OrderBy> mergeOrders,
+ List<String> outputColumnNames,
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
super(id);
this.mergeOrders = mergeOrders;
- this.measurements = measurements;
+ this.outputColumnNames = outputColumnNames;
+ this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
}
public DeviceViewNode(
- PlanNodeId id, List<OrderBy> mergeOrders, List<String> measurements, List<String> devices) {
+ PlanNodeId id,
+ List<OrderBy> mergeOrders,
+ List<String> outputColumnNames,
+ List<String> devices,
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
super(id);
this.mergeOrders = mergeOrders;
- this.measurements = measurements;
+ this.outputColumnNames = outputColumnNames;
this.devices.addAll(devices);
+ this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
}
public void addChildDeviceNode(String deviceName, PlanNode childNode) {
@@ -76,6 +92,10 @@ public class DeviceViewNode extends ProcessNode {
return devices;
}
+ public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+ return deviceToMeasurementIndexesMap;
+ }
+
@Override
public List<PlanNode> getChildren() {
return children;
@@ -93,12 +113,13 @@ public class DeviceViewNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new DeviceViewNode(getPlanNodeId(), mergeOrders, measurements, devices);
+ return new DeviceViewNode(
+ getPlanNodeId(), mergeOrders, outputColumnNames, devices, deviceToMeasurementIndexesMap);
}
@Override
public List<String> getOutputColumnNames() {
- return measurements;
+ return outputColumnNames;
}
@Override
@@ -111,25 +132,33 @@ public class DeviceViewNode extends ProcessNode {
PlanNodeType.DEVICE_VIEW.serialize(byteBuffer);
ReadWriteIOUtils.write(mergeOrders.get(0).ordinal(), byteBuffer);
ReadWriteIOUtils.write(mergeOrders.get(1).ordinal(), byteBuffer);
- ReadWriteIOUtils.write(measurements.size(), byteBuffer);
- for (String measurement : measurements) {
- ReadWriteIOUtils.write(measurement, byteBuffer);
+ ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
+ for (String column : outputColumnNames) {
+ ReadWriteIOUtils.write(column, byteBuffer);
}
ReadWriteIOUtils.write(devices.size(), byteBuffer);
for (String deviceName : devices) {
ReadWriteIOUtils.write(deviceName, byteBuffer);
}
+ ReadWriteIOUtils.write(deviceToMeasurementIndexesMap.size(), byteBuffer);
+ for (Map.Entry<String, List<Integer>> entry : deviceToMeasurementIndexesMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue().size(), byteBuffer);
+ for (Integer index : entry.getValue()) {
+ ReadWriteIOUtils.write(index, byteBuffer);
+ }
+ }
}
public static DeviceViewNode deserialize(ByteBuffer byteBuffer) {
List<OrderBy> mergeOrders = new ArrayList<>();
mergeOrders.add(OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)]);
mergeOrders.add(OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)]);
- int measurementsSize = ReadWriteIOUtils.readInt(byteBuffer);
- List<String> measurements = new ArrayList<>();
- while (measurementsSize > 0) {
- measurements.add(ReadWriteIOUtils.readString(byteBuffer));
- measurementsSize--;
+ int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> outputColumnNames = new ArrayList<>();
+ while (columnSize > 0) {
+ outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+ columnSize--;
}
int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
List<String> devices = new ArrayList<>();
@@ -137,8 +166,22 @@ public class DeviceViewNode extends ProcessNode {
devices.add(ReadWriteIOUtils.readString(byteBuffer));
devicesSize--;
}
+ int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>(mapSize);
+ while (mapSize > 0) {
+ String deviceName = ReadWriteIOUtils.readString(byteBuffer);
+ int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Integer> indexes = new ArrayList<>(listSize);
+ while (listSize > 0) {
+ indexes.add(ReadWriteIOUtils.readInt(byteBuffer));
+ listSize--;
+ }
+ deviceToMeasurementIndexesMap.put(deviceName, indexes);
+ mapSize--;
+ }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new DeviceViewNode(planNodeId, mergeOrders, measurements, devices);
+ return new DeviceViewNode(
+ planNodeId, mergeOrders, outputColumnNames, devices, deviceToMeasurementIndexesMap);
}
@Override
@@ -156,11 +199,18 @@ public class DeviceViewNode extends ProcessNode {
return mergeOrders.equals(that.mergeOrders)
&& devices.equals(that.devices)
&& children.equals(that.children)
- && measurements.equals(that.measurements);
+ && outputColumnNames.equals(that.outputColumnNames)
+ && deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), mergeOrders, devices, children, measurements);
+ return Objects.hash(
+ super.hashCode(),
+ mergeOrders,
+ devices,
+ children,
+ outputColumnNames,
+ deviceToMeasurementIndexesMap);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index d4a6ae36eb..e072d6d3f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -162,11 +162,11 @@ public class GroupByLevelNode extends AggregationNode {
return false;
}
GroupByLevelNode that = (GroupByLevelNode) o;
- return outputColumnNames.equals(that.outputColumnNames) && children.equals(that.children);
+ return outputColumnNames.equals(that.outputColumnNames);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), outputColumnNames, children);
+ return Objects.hash(super.hashCode(), outputColumnNames);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 88bdc54be6..c57e6692b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -81,9 +81,11 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
PlanNodeId id,
AlignedPath alignedPath,
List<AggregationDescriptor> aggregationDescriptorList,
- OrderBy scanOrder) {
+ OrderBy scanOrder,
+ @Nullable GroupByTimeParameter groupByTimeParameter) {
this(id, alignedPath, aggregationDescriptorList);
this.scanOrder = scanOrder;
+ this.groupByTimeParameter = groupByTimeParameter;
}
public AlignedSeriesAggregationScanNode(
@@ -94,9 +96,8 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
@Nullable Filter timeFilter,
@Nullable GroupByTimeParameter groupByTimeParameter,
TRegionReplicaSet dataRegionReplicaSet) {
- this(id, alignedPath, aggregationDescriptorList, scanOrder);
+ this(id, alignedPath, aggregationDescriptorList, scanOrder, groupByTimeParameter);
this.timeFilter = timeFilter;
- this.groupByTimeParameter = groupByTimeParameter;
this.regionReplicaSet = dataRegionReplicaSet;
}
@@ -117,6 +118,10 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
return timeFilter;
}
+ public void setTimeFilter(@Nullable Filter timeFilter) {
+ this.timeFilter = timeFilter;
+ }
+
@Nullable
public GroupByTimeParameter getGroupByTimeParameter() {
return groupByTimeParameter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index fd1df944d3..749c653e0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -107,6 +107,10 @@ public class AlignedSeriesScanNode extends SourceNode {
return timeFilter;
}
+ public void setTimeFilter(@Nullable Filter timeFilter) {
+ this.timeFilter = timeFilter;
+ }
+
@Nullable
public Filter getValueFilter() {
return valueFilter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index f3963c5036..abf3e5b8d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -40,10 +39,8 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -70,9 +67,6 @@ public class SeriesAggregationScanNode extends SourceNode {
// result TsBlock
private final List<AggregationDescriptor> aggregationDescriptorList;
- // all the sensors in seriesPath's device of current query
- private final Set<String> allSensors;
-
// The order to traverse the data.
// Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -91,11 +85,9 @@ public class SeriesAggregationScanNode extends SourceNode {
public SeriesAggregationScanNode(
PlanNodeId id,
MeasurementPath seriesPath,
- List<AggregationDescriptor> aggregationDescriptorList,
- Set<String> allSensors) {
+ List<AggregationDescriptor> aggregationDescriptorList) {
super(id);
this.seriesPath = seriesPath;
- this.allSensors = allSensors;
this.aggregationDescriptorList = aggregationDescriptorList;
}
@@ -103,24 +95,23 @@ public class SeriesAggregationScanNode extends SourceNode {
PlanNodeId id,
MeasurementPath seriesPath,
List<AggregationDescriptor> aggregationDescriptorList,
- Set<String> allSensors,
- OrderBy scanOrder) {
- this(id, seriesPath, aggregationDescriptorList, allSensors);
+ OrderBy scanOrder,
+ @Nullable GroupByTimeParameter groupByTimeParameter) {
+ this(id, seriesPath, aggregationDescriptorList);
this.scanOrder = scanOrder;
+ this.groupByTimeParameter = groupByTimeParameter;
}
public SeriesAggregationScanNode(
PlanNodeId id,
MeasurementPath seriesPath,
List<AggregationDescriptor> aggregationDescriptorList,
- Set<String> allSensors,
OrderBy scanOrder,
@Nullable Filter timeFilter,
@Nullable GroupByTimeParameter groupByTimeParameter,
TRegionReplicaSet dataRegionReplicaSet) {
- this(id, seriesPath, aggregationDescriptorList, allSensors, scanOrder);
+ this(id, seriesPath, aggregationDescriptorList, scanOrder, groupByTimeParameter);
this.timeFilter = timeFilter;
- this.groupByTimeParameter = groupByTimeParameter;
this.regionReplicaSet = dataRegionReplicaSet;
}
@@ -128,15 +119,15 @@ public class SeriesAggregationScanNode extends SourceNode {
return scanOrder;
}
- public Set<String> getAllSensors() {
- return allSensors;
- }
-
@Nullable
public Filter getTimeFilter() {
return timeFilter;
}
+ public void setTimeFilter(@Nullable Filter timeFilter) {
+ this.timeFilter = timeFilter;
+ }
+
@Nullable
public GroupByTimeParameter getGroupByTimeParameter() {
return groupByTimeParameter;
@@ -150,16 +141,6 @@ public class SeriesAggregationScanNode extends SourceNode {
return aggregationDescriptorList;
}
- public List<AggregationType> getAggregateFuncList() {
- return aggregationDescriptorList.stream()
- .map(AggregationDescriptor::getAggregationType)
- .collect(Collectors.toList());
- }
-
- public void setTimeFilter(Filter timeFilter) {
- this.timeFilter = timeFilter;
- }
-
@Override
public List<PlanNode> getChildren() {
return ImmutableList.of();
@@ -181,7 +162,6 @@ public class SeriesAggregationScanNode extends SourceNode {
getPlanNodeId(),
getSeriesPath(),
getAggregationDescriptorList(),
- getAllSensors(),
getScanOrder(),
getTimeFilter(),
getGroupByTimeParameter(),
@@ -225,10 +205,6 @@ public class SeriesAggregationScanNode extends SourceNode {
for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
aggregationDescriptor.serialize(byteBuffer);
}
- ReadWriteIOUtils.write(allSensors.size(), byteBuffer);
- for (String sensor : allSensors) {
- ReadWriteIOUtils.write(sensor, byteBuffer);
- }
ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
if (timeFilter == null) {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
@@ -252,11 +228,6 @@ public class SeriesAggregationScanNode extends SourceNode {
for (int i = 0; i < aggregateDescriptorSize; i++) {
aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer));
}
- int allSensorsSize = ReadWriteIOUtils.readInt(byteBuffer);
- Set<String> allSensors = new HashSet<>();
- for (int i = 0; i < allSensorsSize; i++) {
- allSensors.add(ReadWriteIOUtils.readString(byteBuffer));
- }
OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
Filter timeFilter = null;
@@ -275,7 +246,6 @@ public class SeriesAggregationScanNode extends SourceNode {
planNodeId,
partialPath,
aggregationDescriptorList,
- allSensors,
scanOrder,
timeFilter,
groupByTimeParameter,
@@ -296,7 +266,6 @@ public class SeriesAggregationScanNode extends SourceNode {
SeriesAggregationScanNode that = (SeriesAggregationScanNode) o;
return seriesPath.equals(that.seriesPath)
&& aggregationDescriptorList.equals(that.aggregationDescriptorList)
- && allSensors.equals(that.allSensors)
&& scanOrder == that.scanOrder
&& Objects.equals(timeFilter, that.timeFilter)
&& Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
@@ -309,7 +278,6 @@ public class SeriesAggregationScanNode extends SourceNode {
super.hashCode(),
seriesPath,
aggregationDescriptorList,
- allSensors,
scanOrder,
timeFilter,
groupByTimeParameter,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
index 539c5109fe..3b3fe8f05c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
@@ -33,14 +33,11 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
/**
* SeriesScanNode is responsible for read data a specific series. When reading data, the
@@ -54,9 +51,6 @@ public class SeriesScanNode extends SourceNode {
// The path of the target series which will be scanned.
private final MeasurementPath seriesPath;
- // all the sensors in seriesPath's device of current query
- @Nonnull private final Set<String> allSensors;
-
// The order to traverse the data.
// Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -77,30 +71,26 @@ public class SeriesScanNode extends SourceNode {
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
- public SeriesScanNode(
- PlanNodeId id, MeasurementPath seriesPath, @Nonnull Set<String> allSensors) {
+ public SeriesScanNode(PlanNodeId id, MeasurementPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
- this.allSensors = allSensors;
}
- public SeriesScanNode(
- PlanNodeId id, MeasurementPath seriesPath, Set<String> allSensors, OrderBy scanOrder) {
- this(id, seriesPath, allSensors);
+ public SeriesScanNode(PlanNodeId id, MeasurementPath seriesPath, OrderBy scanOrder) {
+ this(id, seriesPath);
this.scanOrder = scanOrder;
}
public SeriesScanNode(
PlanNodeId id,
MeasurementPath seriesPath,
- Set<String> allSensors,
OrderBy scanOrder,
@Nullable Filter timeFilter,
@Nullable Filter valueFilter,
int limit,
int offset,
TRegionReplicaSet dataRegionReplicaSet) {
- this(id, seriesPath, allSensors, scanOrder);
+ this(id, seriesPath, scanOrder);
this.timeFilter = timeFilter;
this.valueFilter = valueFilter;
this.limit = limit;
@@ -140,11 +130,6 @@ public class SeriesScanNode extends SourceNode {
this.offset = offset;
}
- @Nonnull
- public Set<String> getAllSensors() {
- return allSensors;
- }
-
public OrderBy getScanOrder() {
return scanOrder;
}
@@ -162,6 +147,10 @@ public class SeriesScanNode extends SourceNode {
return timeFilter;
}
+ public void setTimeFilter(@Nullable Filter timeFilter) {
+ this.timeFilter = timeFilter;
+ }
+
@Nullable
public Filter getValueFilter() {
return valueFilter;
@@ -187,7 +176,6 @@ public class SeriesScanNode extends SourceNode {
return new SeriesScanNode(
getPlanNodeId(),
getSeriesPath(),
- getAllSensors(),
getScanOrder(),
getTimeFilter(),
getValueFilter(),
@@ -210,10 +198,6 @@ public class SeriesScanNode extends SourceNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.SERIES_SCAN.serialize(byteBuffer);
seriesPath.serialize(byteBuffer);
- ReadWriteIOUtils.write(allSensors.size(), byteBuffer);
- for (String sensor : allSensors) {
- ReadWriteIOUtils.write(sensor, byteBuffer);
- }
ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
if (timeFilter == null) {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
@@ -234,11 +218,6 @@ public class SeriesScanNode extends SourceNode {
public static SeriesScanNode deserialize(ByteBuffer byteBuffer) {
MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer);
- int allSensorSize = ReadWriteIOUtils.readInt(byteBuffer);
- Set<String> allSensors = new HashSet<>();
- for (int i = 0; i < allSensorSize; i++) {
- allSensors.add(ReadWriteIOUtils.readString(byteBuffer));
- }
OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
Filter timeFilter = null;
@@ -258,7 +237,6 @@ public class SeriesScanNode extends SourceNode {
return new SeriesScanNode(
planNodeId,
partialPath,
- allSensors,
scanOrder,
timeFilter,
valueFilter,
@@ -289,7 +267,6 @@ public class SeriesScanNode extends SourceNode {
return limit == that.limit
&& offset == that.offset
&& seriesPath.equals(that.seriesPath)
- && allSensors.equals(that.allSensors)
&& scanOrder == that.scanOrder
&& Objects.equals(timeFilter, that.timeFilter)
&& Objects.equals(valueFilter, that.valueFilter)
@@ -301,7 +278,6 @@ public class SeriesScanNode extends SourceNode {
return Objects.hash(
super.hashCode(),
seriesPath,
- allSensors,
scanOrder,
timeFilter,
valueFilter,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index e7f0b42e36..1f1923a6b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -34,7 +34,7 @@ public class AggregationDescriptor {
private final AggregationType aggregationType;
// indicate the input and output type
- private final AggregationStep step;
+ private AggregationStep step;
/**
* Input of aggregation function. Currently, we only support one series in the aggregation
@@ -63,6 +63,14 @@ public class AggregationDescriptor {
return step;
}
+ public void setStep(AggregationStep step) {
+ this.step = step;
+ }
+
+ public List<Expression> getInputExpressions() {
+ return inputExpressions;
+ }
+
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
step.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java
index 468b36e6a2..1210bc55ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.Objects;
+import static org.apache.iotdb.db.qp.utils.DatetimeUtils.MS_TO_MONTH;
+
/** The parameter of `GROUP BY TIME` */
public class GroupByTimeParameter {
@@ -135,6 +137,12 @@ public class GroupByTimeParameter {
this.leftCRightO = leftCRightO;
}
+ public boolean hasOverlap() {
+ long tmpInterval = isIntervalByMonth ? interval * MS_TO_MONTH : interval;
+ long tmpSlidingStep = isSlidingStepByMonth ? slidingStep * MS_TO_MONTH : slidingStep;
+ return tmpInterval > tmpSlidingStep;
+ }
+
public void serialize(ByteBuffer buffer) {
ReadWriteIOUtils.write(startTime, buffer);
ReadWriteIOUtils.write(endTime, buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 72f275653a..ae6923c08a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
@@ -205,6 +208,33 @@ public class SchemaUtils {
}
}
+ /**
+ * judge whether the order of aggregation calculation is consistent with the order of traversing
+ * data
+ */
+ public static boolean isConsistentWithScanOrder(
+ AggregationType aggregationFunction, OrderBy scanOrder) {
+ boolean ascending = scanOrder == OrderBy.TIMESTAMP_ASC;
+ switch (aggregationFunction) {
+ case MIN_TIME:
+ case FIRST_VALUE:
+ return ascending;
+ case MAX_TIME:
+ case LAST_VALUE:
+ return !ascending;
+ case SUM:
+ case MIN_VALUE:
+ case MAX_VALUE:
+ case EXTREME:
+ case COUNT:
+ case AVG:
+ return true;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Invalid Aggregation function: %s", aggregationFunction));
+ }
+ }
+
/**
* If e or one of its recursive causes is a PathNotExistException or StorageGroupNotSetException,
* return such an exception or null if it cannot be found.
@@ -233,4 +263,26 @@ public class SchemaUtils {
String.format("encoding %s does not support %s", encoding, dataType), true);
}
}
+
+ public static List<AggregationType> splitPartialAggregation(AggregationType aggregationType) {
+ switch (aggregationType) {
+ case FIRST_VALUE:
+ return Collections.singletonList(AggregationType.MIN_TIME);
+ case LAST_VALUE:
+ return Collections.singletonList(AggregationType.MAX_TIME);
+ case AVG:
+ return Arrays.asList(AggregationType.COUNT, AggregationType.SUM);
+ case SUM:
+ case MIN_VALUE:
+ case MAX_VALUE:
+ case EXTREME:
+ case COUNT:
+ case MIN_TIME:
+ case MAX_TIME:
+ return Collections.emptyList();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Invalid Aggregation function: %s", aggregationType));
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index 83ec2ff10b..e553a72666 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.util.ArrayList;
@@ -75,7 +74,6 @@ public class DistributionPlannerTest {
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC);
Analysis analysis = constructAnalysis();
@@ -96,7 +94,6 @@ public class DistributionPlannerTest {
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC);
Analysis analysis = constructAnalysis();
@@ -119,19 +116,16 @@ public class DistributionPlannerTest {
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC));
LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
@@ -200,19 +194,16 @@ public class DistributionPlannerTest {
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC));
LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
@@ -240,19 +231,16 @@ public class DistributionPlannerTest {
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC));
LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
@@ -278,19 +266,16 @@ public class DistributionPlannerTest {
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
new SeriesScanNode(
queryId.genPlanNodeId(),
new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC));
LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index 0f16730f2e..53950b092a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -124,28 +123,19 @@ public class FragmentInstanceSerdeTest {
TimeJoinNode timeJoinNode =
new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
SeriesScanNode seriesScanNode1 =
- new SeriesScanNode(
- new PlanNodeId("SeriesScanNode1"),
- new MeasurementPath("root.sg.d1.s2"),
- Sets.newHashSet("s2"));
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
seriesScanNode1.setRegionReplicaSet(
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), new ArrayList<>()));
seriesScanNode1.setScanOrder(OrderBy.TIMESTAMP_DESC);
SeriesScanNode seriesScanNode2 =
- new SeriesScanNode(
- new PlanNodeId("SeriesScanNode2"),
- new MeasurementPath("root.sg.d2.s1"),
- Sets.newHashSet("s1", "s2"));
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode2"), new MeasurementPath("root.sg.d2.s1"));
seriesScanNode2.setRegionReplicaSet(
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 2), new ArrayList<>()));
seriesScanNode2.setScanOrder(OrderBy.TIMESTAMP_DESC);
SeriesScanNode seriesScanNode3 =
- new SeriesScanNode(
- new PlanNodeId("SeriesScanNode3"),
- new MeasurementPath("root.sg.d2.s2"),
- Sets.newHashSet("s1", "s2"));
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode3"), new MeasurementPath("root.sg.d2.s2"));
seriesScanNode3.setRegionReplicaSet(
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 3), new ArrayList<>()));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index c500cd278e..6248ed7791 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.mpp.plan.plan;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
@@ -30,14 +33,16 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.binary.GreaterThanExpression;
@@ -45,17 +50,17 @@ import org.apache.iotdb.db.query.expression.binary.LogicAndExpression;
import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.commons.compress.utils.Sets;
-
-import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -68,46 +73,69 @@ public class QueryLogicalPlanUtil {
// key: query statement; value: expected logical plan
public static final Map<String, PlanNode> sqlToPlanMap = new HashMap<>();
- public static final Map<String, MeasurementPath> schemaMap = new HashMap<>();
+ public static final Map<String, PartialPath> schemaMap = new HashMap<>();
static {
try {
schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32));
- schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.INT32));
+ schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE));
+ schemaMap.put("root.sg.d1.s3", new MeasurementPath("root.sg.d1.s3", TSDataType.BOOLEAN));
schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32));
- schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.INT32));
+ schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE));
+ schemaMap.put("root.sg.d2.s4", new MeasurementPath("root.sg.d2.s4", TSDataType.TEXT));
+
+ MeasurementPath aS1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32);
+ MeasurementPath aS2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE);
+ AlignedPath alignedPath =
+ new AlignedPath(
+ "root.sg.d2.a",
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(aS1.getMeasurementSchema(), aS2.getMeasurementSchema()));
+ aS1.setUnderAlignedEntity(true);
+ aS2.setUnderAlignedEntity(true);
+ schemaMap.put("root.sg.d2.a.s1", aS1);
+ schemaMap.put("root.sg.d2.a.s2", aS2);
+ schemaMap.put("root.sg.d2.a", alignedPath);
} catch (IllegalPathException e) {
e.printStackTrace();
}
}
- /* 0. Simple Query */
+ /* Simple Query */
static {
- String sql = "SELECT * FROM root.sg.d1 LIMIT 10 OFFSET 10";
+ String sql = "SELECT ** FROM root.sg.d2 LIMIT 10 OFFSET 10";
List<PlanNode> sourceNodeList = new ArrayList<>();
sourceNodeList.add(
new SeriesScanNode(
new PlanNodeId("0"),
- schemaMap.get("root.sg.d1.s1"),
- Sets.newHashSet("s1", "s2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
OrderBy.TIMESTAMP_ASC));
sourceNodeList.add(
new SeriesScanNode(
new PlanNodeId("1"),
- schemaMap.get("root.sg.d1.s2"),
- Sets.newHashSet("s1", "s2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+ OrderBy.TIMESTAMP_ASC));
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s4"),
+ OrderBy.TIMESTAMP_ASC));
+ sourceNodeList.add(
+ new AlignedSeriesScanNode(
+ new PlanNodeId("3"),
+ (AlignedPath) schemaMap.get("root.sg.d2.a"),
OrderBy.TIMESTAMP_ASC));
TimeJoinNode timeJoinNode =
- new TimeJoinNode(new PlanNodeId("2"), OrderBy.TIMESTAMP_ASC, sourceNodeList);
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("3"), timeJoinNode, 10);
- LimitNode limitNode = new LimitNode(new PlanNodeId("4"), offsetNode, 10);
+ new TimeJoinNode(new PlanNodeId("4"), OrderBy.TIMESTAMP_ASC, sourceNodeList);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("5"), timeJoinNode, 10);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("6"), offsetNode, 10);
querySQLs.add(sql);
sqlToPlanMap.put(sql, limitNode);
}
- /* 1. Raw Data Query */
+ /* Raw Data Query */
static {
String sql =
"SELECT s1 FROM root.sg.* WHERE time > 100 and s2 > 10 "
@@ -116,25 +144,24 @@ public class QueryLogicalPlanUtil {
List<PlanNode> sourceNodeList = new ArrayList<>();
sourceNodeList.add(
new SeriesScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d1.s2"),
- Sets.newHashSet("s2"),
+ new PlanNodeId("0"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
OrderBy.TIMESTAMP_DESC));
sourceNodeList.add(
new SeriesScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d2.s1"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("1"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
OrderBy.TIMESTAMP_DESC));
sourceNodeList.add(
new SeriesScanNode(
- new PlanNodeId("test_query_2"),
- schemaMap.get("root.sg.d2.s2"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
OrderBy.TIMESTAMP_DESC));
+ sourceNodeList.forEach(
+ planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
TimeJoinNode timeJoinNode =
- new TimeJoinNode(new PlanNodeId("test_query_3"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+ new TimeJoinNode(new PlanNodeId("3"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
GreaterThanExpression timeFilter =
new GreaterThanExpression(
@@ -145,155 +172,282 @@ public class QueryLogicalPlanUtil {
new ConstantOperand(TSDataType.INT32, "10"));
GreaterThanExpression valueFilter2 =
new GreaterThanExpression(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
new ConstantOperand(TSDataType.INT32, "10"));
- LogicAndExpression expression =
- new LogicAndExpression(
- new LogicAndExpression(timeFilter, valueFilter1),
- new LogicAndExpression(timeFilter, valueFilter2));
+ LogicAndExpression predicate =
+ new LogicAndExpression(timeFilter, new LogicAndExpression(valueFilter1, valueFilter2));
FilterNode filterNode =
new FilterNode(
- new PlanNodeId("test_query_4"),
+ new PlanNodeId("4"),
timeJoinNode,
- new Expression[] {
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
- },
- expression,
+ new Expression[] {new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))},
+ predicate,
false,
- ZoneId.systemDefault());
-
- ProjectNode projectNode =
- new ProjectNode(
- new PlanNodeId("test_query_4"), filterNode, Collections.singletonList("root.sg.d2.s1"));
+ ZonedDateTime.now().getOffset());
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("test_query_5"),
- projectNode,
+ new PlanNodeId("5"),
+ filterNode,
FilterNullPolicy.CONTAINS_NULL,
- new ArrayList<>());
+ Collections.singletonList(new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))));
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_6"), filterNullNode, 100);
- LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_7"), offsetNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("6"), filterNullNode, 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("7"), offsetNode, 100);
- // querySQLs.add(sql);
- // sqlToPlanMap.put(sql, limitNode);
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, limitNode);
}
- /* 2. Raw Data Query (align by device) */
+ /* Raw Data Query (align by device) */
static {
String sql =
"SELECT * FROM root.sg.* WHERE time > 100 and s1 > 10 "
- + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+ + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
List<PlanNode> sourceNodeList1 = new ArrayList<>();
- List<PlanNode> sourceNodeList2 = new ArrayList<>();
sourceNodeList1.add(
new SeriesScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d1.s1"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("0"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s3"),
OrderBy.TIMESTAMP_DESC));
sourceNodeList1.add(
new SeriesScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d1.s2"),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
- sourceNodeList2.add(
- new SeriesScanNode(
- new PlanNodeId("test_query_2"),
- schemaMap.get("root.sg.d2.s1"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("1"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
OrderBy.TIMESTAMP_DESC));
- sourceNodeList2.add(
+ sourceNodeList1.add(
new SeriesScanNode(
- new PlanNodeId("test_query_3"),
- schemaMap.get("root.sg.d2.s2"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("2"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
OrderBy.TIMESTAMP_DESC));
+ sourceNodeList1.forEach(
+ planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
TimeJoinNode timeJoinNode1 =
- new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
- TimeJoinNode timeJoinNode2 =
- new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+ new TimeJoinNode(new PlanNodeId("3"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
GreaterThanExpression timeFilter =
new GreaterThanExpression(
new TimestampOperand(), new ConstantOperand(TSDataType.INT64, "100"));
GreaterThanExpression valueFilter1 =
new GreaterThanExpression(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
- new ConstantOperand(TSDataType.INT32, "10"));
- GreaterThanExpression valueFilter2 =
- new GreaterThanExpression(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
- new ConstantOperand(TSDataType.INT32, "10"));
- LogicAndExpression expression =
- new LogicAndExpression(
- new LogicAndExpression(timeFilter, valueFilter1),
- new LogicAndExpression(timeFilter, valueFilter2));
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
+ new ConstantOperand(TSDataType.INT64, "10"));
+ LogicAndExpression predicate1 = new LogicAndExpression(timeFilter, valueFilter1);
FilterNode filterNode1 =
new FilterNode(
- new PlanNodeId("test_query_6"),
+ new PlanNodeId("4"),
timeJoinNode1,
new Expression[] {
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s3")),
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))
},
- expression,
+ predicate1,
false,
- ZoneId.systemDefault());
+ ZonedDateTime.now().getOffset());
+
+ List<PlanNode> sourceNodeList2 = new ArrayList<>();
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("5"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s4"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("6"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("7"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.forEach(
+ planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
+
+ TimeJoinNode timeJoinNode2 =
+ new TimeJoinNode(new PlanNodeId("8"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+ GreaterThanExpression valueFilter2 =
+ new GreaterThanExpression(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+ new ConstantOperand(TSDataType.INT32, "10"));
+ LogicAndExpression predicate2 = new LogicAndExpression(timeFilter, valueFilter2);
+
FilterNode filterNode2 =
new FilterNode(
- new PlanNodeId("test_query_7"),
+ new PlanNodeId("9"),
timeJoinNode2,
new Expression[] {
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s4")),
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))
},
- expression,
+ predicate2,
false,
- ZoneId.systemDefault());
+ ZonedDateTime.now().getOffset());
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 4));
+ deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(2, 3, 4));
DeviceViewNode deviceViewNode =
new DeviceViewNode(
- new PlanNodeId("test_query_8"),
+ new PlanNodeId("10"),
Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_DESC),
- Arrays.asList("s1", "s2"));
+ Arrays.asList(HeaderConstant.COLUMN_DEVICE, "s3", "s4", "s1", "s2"),
+ deviceToMeasurementIndexesMap);
deviceViewNode.addChildDeviceNode("root.sg.d1", filterNode1);
deviceViewNode.addChildDeviceNode("root.sg.d2", filterNode2);
- FilterNullNode filterNullNode =
- new FilterNullNode(
- new PlanNodeId("test_query_9"),
- deviceViewNode,
- FilterNullPolicy.CONTAINS_NULL,
- new ArrayList<>());
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("11"), deviceViewNode, 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("12"), offsetNode, 100);
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, limitNode);
+ }
+
+ /* Simple Aggregation Query */
+ static {
+ String sql =
+ "SELECT last_value(s1), first_value(s1), sum(s2) FROM root.sg.** WHERE time > 100 LIMIT 10 OFFSET 10";
+
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ Filter timeFilter = TimeFilter.gt(100);
+ sourceNodeList.add(
+ new SeriesAggregationScanNode(
+ new PlanNodeId("0"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregationScanNode(
+ new PlanNodeId("1"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.SUM,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregationScanNode(
+ new PlanNodeId("2"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.LAST_VALUE,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregationScanNode(
+ new PlanNodeId("3"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregationScanNode(
+ new PlanNodeId("4"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.SUM,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregationScanNode(
+ new PlanNodeId("5"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.LAST_VALUE,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.add(
+ new AlignedSeriesAggregationScanNode(
+ new PlanNodeId("6"),
+ (AlignedPath) schemaMap.get("root.sg.d2.a"),
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+ new AggregationDescriptor(
+ AggregationType.SUM,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.add(
+ new AlignedSeriesAggregationScanNode(
+ new PlanNodeId("7"),
+ new AlignedPath((MeasurementPath) schemaMap.get("root.sg.d2.a.s1")),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.LAST_VALUE,
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))))),
+ OrderBy.TIMESTAMP_ASC,
+ null));
+ sourceNodeList.forEach(
+ node -> {
+ if (node instanceof SeriesAggregationScanNode) {
+ ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+ } else {
+ ((AlignedSeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+ }
+ });
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_10"), filterNullNode, 100);
- LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_11"), offsetNode, 100);
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(new PlanNodeId("8"), OrderBy.TIMESTAMP_ASC, sourceNodeList);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("9"), timeJoinNode, 10);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("10"), offsetNode, 10);
- // querySQLs.add(sql);
- // sqlToPlanMap.put(sql, limitNode);
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, limitNode);
}
- /* 3. Aggregation Query (without value filter) */
+ /* Aggregation Query (without value filter) */
static {
String sql =
- "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
- + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+ "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.** WHERE time > 100 "
+ + "GROUP BY LEVEL = 1 ORDER BY TIME DESC LIMIT 100 OFFSET 100";
List<PlanNode> sourceNodeList = new ArrayList<>();
Filter timeFilter = TimeFilter.gt(100);
sourceNodeList.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d1.s1"),
+ new PlanNodeId("0"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
@@ -305,24 +459,24 @@ public class QueryLogicalPlanUtil {
AggregationStep.PARTIAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
+ OrderBy.TIMESTAMP_DESC,
+ null));
sourceNodeList.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d1.s2"),
+ new PlanNodeId("1"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
Collections.singletonList(
new AggregationDescriptor(
AggregationType.MAX_VALUE,
AggregationStep.PARTIAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
+ OrderBy.TIMESTAMP_DESC,
+ null));
sourceNodeList.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d2.s1"),
+ new PlanNodeId("2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
@@ -334,193 +488,269 @@ public class QueryLogicalPlanUtil {
AggregationStep.PARTIAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
+ OrderBy.TIMESTAMP_DESC,
+ null));
sourceNodeList.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d2.s2"),
+ new PlanNodeId("3"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
Collections.singletonList(
new AggregationDescriptor(
AggregationType.MAX_VALUE,
AggregationStep.PARTIAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
- sourceNodeList.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
+ OrderBy.TIMESTAMP_DESC,
+ null));
+ sourceNodeList.add(
+ new AlignedSeriesAggregationScanNode(
+ new PlanNodeId("4"),
+ (AlignedPath) schemaMap.get("root.sg.d2.a"),
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.LAST_VALUE,
+ AggregationStep.PARTIAL,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.PARTIAL,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+ new AggregationDescriptor(
+ AggregationType.MAX_VALUE,
+ AggregationStep.PARTIAL,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))))),
+ OrderBy.TIMESTAMP_DESC,
+ null));
+ sourceNodeList.forEach(
+ node -> {
+ if (node instanceof SeriesAggregationScanNode) {
+ ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+ } else {
+ ((AlignedSeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+ }
+ });
GroupByLevelNode groupByLevelNode =
new GroupByLevelNode(
- new PlanNodeId("test_query_5"),
+ new PlanNodeId("5"),
sourceNodeList,
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
AggregationStep.FINAL,
Arrays.asList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new FunctionExpression(
+ SQLConstant.COUNT,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new FunctionExpression(
+ SQLConstant.COUNT,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))))),
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(
+ new FunctionExpression(
+ SQLConstant.COUNT,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))))),
new AggregationDescriptor(
AggregationType.MAX_VALUE,
AggregationStep.FINAL,
Arrays.asList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))),
+ new FunctionExpression(
+ SQLConstant.MAX_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
+ new FunctionExpression(
+ SQLConstant.MAX_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))))),
+ new AggregationDescriptor(
+ AggregationType.MAX_VALUE,
+ AggregationStep.FINAL,
+ Collections.singletonList(
+ new FunctionExpression(
+ SQLConstant.MAX_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2")))))),
new AggregationDescriptor(
AggregationType.LAST_VALUE,
AggregationStep.FINAL,
Arrays.asList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+ new FunctionExpression(
+ SQLConstant.LAST_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new FunctionExpression(
+ SQLConstant.LAST_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))))),
+ new AggregationDescriptor(
+ AggregationType.LAST_VALUE,
+ AggregationStep.FINAL,
+ Collections.singletonList(
+ new FunctionExpression(
+ SQLConstant.LAST_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))))))),
Arrays.asList(
- "count(root.sg.*.s1)", "max_value(root.sg.*.s2)", "last_value(root.sg.*.s1)"));
+ "count(root.sg.*.s1)",
+ "count(root.sg.*.*.s1)",
+ "max_value(root.sg.*.s2)",
+ "max_value(root.sg.*.*.s2)",
+ "last_value(root.sg.*.s1)",
+ "last_value(root.sg.*.*.s1)"));
- FilterNullNode filterNullNode =
- new FilterNullNode(
- new PlanNodeId("test_query_6"),
- groupByLevelNode,
- FilterNullPolicy.CONTAINS_NULL,
- new ArrayList<>());
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("6"), groupByLevelNode, 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("7"), offsetNode, 100);
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_7"), filterNullNode, 100);
- LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_8"), offsetNode, 100);
-
- // querySQLs.add(sql);
- // sqlToPlanMap.put(sql, limitNode);
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, limitNode);
}
- /* 4. Aggregation Query (without value filter and align by device) */
+ /* Aggregation Query (without value filter and align by device) */
static {
String sql =
"SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
- + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+ + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
- List<PlanNode> sourceNodeList1 = new ArrayList<>();
- List<PlanNode> sourceNodeList2 = new ArrayList<>();
Filter timeFilter = TimeFilter.gt(100);
+ List<PlanNode> sourceNodeList1 = new ArrayList<>();
sourceNodeList1.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d1.s1"),
+ new PlanNodeId("0"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
- AggregationStep.PARTIAL,
+ AggregationStep.SINGLE,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
new AggregationDescriptor(
AggregationType.LAST_VALUE,
- AggregationStep.PARTIAL,
+ AggregationStep.SINGLE,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
+ OrderBy.TIMESTAMP_DESC,
+ null));
sourceNodeList1.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d1.s2"),
+ new PlanNodeId("1"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
Collections.singletonList(
new AggregationDescriptor(
AggregationType.MAX_VALUE,
- AggregationStep.PARTIAL,
+ AggregationStep.SINGLE,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
+ OrderBy.TIMESTAMP_DESC,
+ null));
+ sourceNodeList1.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
+
+ TimeJoinNode timeJoinNode1 =
+ new TimeJoinNode(new PlanNodeId("2"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+
+ List<PlanNode> sourceNodeList2 = new ArrayList<>();
sourceNodeList2.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d2.s1"),
+ new PlanNodeId("3"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
- AggregationStep.PARTIAL,
+ AggregationStep.SINGLE,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
new AggregationDescriptor(
AggregationType.LAST_VALUE,
- AggregationStep.PARTIAL,
+ AggregationStep.SINGLE,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
+ OrderBy.TIMESTAMP_DESC,
+ null));
sourceNodeList2.add(
new SeriesAggregationScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d2.s2"),
+ new PlanNodeId("4"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
Collections.singletonList(
new AggregationDescriptor(
AggregationType.MAX_VALUE,
- AggregationStep.PARTIAL,
+ AggregationStep.SINGLE,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
- sourceNodeList1.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
+ OrderBy.TIMESTAMP_DESC,
+ null));
sourceNodeList2.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
- TimeJoinNode timeJoinNode1 =
- new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
TimeJoinNode timeJoinNode2 =
- new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+ new TimeJoinNode(new PlanNodeId("5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
+ deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 2, 3));
DeviceViewNode deviceViewNode =
new DeviceViewNode(
- new PlanNodeId("test_query_6"),
+ new PlanNodeId("6"),
Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_DESC),
- Arrays.asList("count(s1)", "max_value(s2)", "last_value(s1)"));
+ Arrays.asList(
+ HeaderConstant.COLUMN_DEVICE, "count(s1)", "max_value(s2)", "last_value(s1)"),
+ deviceToMeasurementIndexesMap);
deviceViewNode.addChildDeviceNode("root.sg.d1", timeJoinNode1);
deviceViewNode.addChildDeviceNode("root.sg.d2", timeJoinNode2);
- FilterNullNode filterNullNode =
- new FilterNullNode(
- new PlanNodeId("test_query_7"),
- deviceViewNode,
- FilterNullPolicy.CONTAINS_NULL,
- new ArrayList<>());
-
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_8"), filterNullNode, 100);
- LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_9"), offsetNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("7"), deviceViewNode, 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("8"), offsetNode, 100);
- // querySQLs.add(sql);
- // sqlToPlanMap.put(sql, limitNode);
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, limitNode);
}
- /* 5. Aggregation Query (with value filter) */
+ /* Aggregation Query (with value filter) */
static {
String sql =
"SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
- + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+ + "GROUP BY LEVEL = 1 ORDER BY TIME DESC LIMIT 100 OFFSET 100";
List<PlanNode> sourceNodeList = new ArrayList<>();
sourceNodeList.add(
new SeriesScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d1.s1"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("0"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
OrderBy.TIMESTAMP_DESC));
sourceNodeList.add(
new SeriesScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d1.s2"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("1"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
OrderBy.TIMESTAMP_DESC));
sourceNodeList.add(
new SeriesScanNode(
- new PlanNodeId("test_query_2"),
- schemaMap.get("root.sg.d2.s1"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
OrderBy.TIMESTAMP_DESC));
sourceNodeList.add(
new SeriesScanNode(
- new PlanNodeId("test_query_3"),
- schemaMap.get("root.sg.d2.s2"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("3"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
OrderBy.TIMESTAMP_DESC));
+ sourceNodeList.forEach(
+ planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
TimeJoinNode timeJoinNode =
- new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+ new TimeJoinNode(new PlanNodeId("4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
GreaterThanExpression timeFilter =
new GreaterThanExpression(
@@ -533,27 +763,25 @@ public class QueryLogicalPlanUtil {
new GreaterThanExpression(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
new ConstantOperand(TSDataType.INT32, "10"));
- LogicAndExpression expression =
- new LogicAndExpression(
- new LogicAndExpression(timeFilter, valueFilter1),
- new LogicAndExpression(timeFilter, valueFilter2));
+ LogicAndExpression predicate =
+ new LogicAndExpression(timeFilter, new LogicAndExpression(valueFilter1, valueFilter2));
FilterNode filterNode =
new FilterNode(
- new PlanNodeId("test_query_5"),
+ new PlanNodeId("5"),
timeJoinNode,
new Expression[] {
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
},
- expression,
+ predicate,
false,
- ZoneId.systemDefault());
+ ZonedDateTime.now().getOffset());
AggregationNode aggregationNode =
new AggregationNode(
- new PlanNodeId("test_query_6"),
+ new PlanNodeId("6"),
Collections.singletonList(filterNode),
Arrays.asList(
new AggregationDescriptor(
@@ -562,108 +790,112 @@ public class QueryLogicalPlanUtil {
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
new AggregationDescriptor(
- AggregationType.LAST_VALUE,
+ AggregationType.MAX_VALUE,
AggregationStep.PARTIAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
new AggregationDescriptor(
- AggregationType.MAX_VALUE,
+ AggregationType.LAST_VALUE,
AggregationStep.PARTIAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
new AggregationDescriptor(
AggregationType.COUNT,
AggregationStep.PARTIAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
new AggregationDescriptor(
- AggregationType.LAST_VALUE,
+ AggregationType.MAX_VALUE,
AggregationStep.PARTIAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))),
new AggregationDescriptor(
- AggregationType.MAX_VALUE,
+ AggregationType.LAST_VALUE,
AggregationStep.PARTIAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))));
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))));
GroupByLevelNode groupByLevelNode =
new GroupByLevelNode(
- new PlanNodeId("test_query_7"),
+ new PlanNodeId("7"),
Collections.singletonList(aggregationNode),
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
AggregationStep.FINAL,
Arrays.asList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new FunctionExpression(
+ SQLConstant.COUNT,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new FunctionExpression(
+ SQLConstant.COUNT,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))))),
new AggregationDescriptor(
- AggregationType.LAST_VALUE,
+ AggregationType.MAX_VALUE,
AggregationStep.FINAL,
Arrays.asList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new FunctionExpression(
+ SQLConstant.MAX_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
+ new FunctionExpression(
+ SQLConstant.MAX_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))))),
new AggregationDescriptor(
- AggregationType.MAX_VALUE,
+ AggregationType.LAST_VALUE,
AggregationStep.FINAL,
Arrays.asList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
+ new FunctionExpression(
+ SQLConstant.LAST_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new FunctionExpression(
+ SQLConstant.LAST_VALUE,
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))))),
Arrays.asList(
"count(root.sg.*.s1)", "max_value(root.sg.*.s2)", "last_value(root.sg.*.s1)"));
- FilterNullNode filterNullNode =
- new FilterNullNode(
- new PlanNodeId("test_query_8"),
- groupByLevelNode,
- FilterNullPolicy.CONTAINS_NULL,
- new ArrayList<>());
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("8"), groupByLevelNode, 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("9"), offsetNode, 100);
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_9"), filterNullNode, 100);
- LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_10"), offsetNode, 100);
-
- // querySQLs.add(sql);
- // sqlToPlanMap.put(sql, limitNode);
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, limitNode);
}
- /* 6. Aggregation Query (with value filter and align by device) */
+ /* Aggregation Query (with value filter and align by device) */
static {
String sql =
"SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
- + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+ + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
List<PlanNode> sourceNodeList1 = new ArrayList<>();
- List<PlanNode> sourceNodeList2 = new ArrayList<>();
sourceNodeList1.add(
new SeriesScanNode(
- new PlanNodeId("test_query_0"),
- schemaMap.get("root.sg.d1.s1"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("0"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
OrderBy.TIMESTAMP_DESC));
sourceNodeList1.add(
new SeriesScanNode(
- new PlanNodeId("test_query_1"),
- schemaMap.get("root.sg.d1.s2"),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
- sourceNodeList2.add(
- new SeriesScanNode(
- new PlanNodeId("test_query_2"),
- schemaMap.get("root.sg.d2.s1"),
- Sets.newHashSet("s1", "s2"),
- OrderBy.TIMESTAMP_DESC));
- sourceNodeList2.add(
- new SeriesScanNode(
- new PlanNodeId("test_query_3"),
- schemaMap.get("root.sg.d2.s2"),
- Sets.newHashSet("s1", "s2"),
+ new PlanNodeId("1"),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
OrderBy.TIMESTAMP_DESC));
+ sourceNodeList1.forEach(
+ planNode -> {
+ ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100));
+ });
TimeJoinNode timeJoinNode1 =
- new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
- TimeJoinNode timeJoinNode2 =
- new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+ new TimeJoinNode(new PlanNodeId("2"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
GreaterThanExpression timeFilter =
new GreaterThanExpression(
@@ -671,99 +903,117 @@ public class QueryLogicalPlanUtil {
GreaterThanExpression valueFilter1 =
new GreaterThanExpression(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
- new ConstantOperand(TSDataType.INT32, "10"));
- GreaterThanExpression valueFilter2 =
- new GreaterThanExpression(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
- new ConstantOperand(TSDataType.INT32, "10"));
- LogicAndExpression expression =
- new LogicAndExpression(
- new LogicAndExpression(timeFilter, valueFilter1),
- new LogicAndExpression(timeFilter, valueFilter2));
+ new ConstantOperand(TSDataType.INT64, "10"));
+ LogicAndExpression predicate1 = new LogicAndExpression(timeFilter, valueFilter1);
FilterNode filterNode1 =
new FilterNode(
- new PlanNodeId("test_query_6"),
+ new PlanNodeId("3"),
timeJoinNode1,
new Expression[] {
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
},
- expression,
- false,
- ZoneId.systemDefault());
- FilterNode filterNode2 =
- new FilterNode(
- new PlanNodeId("test_query_7"),
- timeJoinNode2,
- new Expression[] {
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
- },
- expression,
+ predicate1,
false,
- ZoneId.systemDefault());
+ ZonedDateTime.now().getOffset());
AggregationNode aggregationNode1 =
new AggregationNode(
- new PlanNodeId("test_query_8"),
+ new PlanNodeId("4"),
Collections.singletonList(filterNode1),
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
- AggregationStep.PARTIAL,
+ AggregationStep.FINAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
new AggregationDescriptor(
- AggregationType.LAST_VALUE,
- AggregationStep.PARTIAL,
+ AggregationType.MAX_VALUE,
+ AggregationStep.FINAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
new AggregationDescriptor(
- AggregationType.MAX_VALUE,
- AggregationStep.PARTIAL,
+ AggregationType.LAST_VALUE,
+ AggregationStep.FINAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))));
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))));
+
+ List<PlanNode> sourceNodeList2 = new ArrayList<>();
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("5"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("6"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.forEach(
+ planNode -> {
+ ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100));
+ });
+
+ TimeJoinNode timeJoinNode2 =
+ new TimeJoinNode(new PlanNodeId("7"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+ GreaterThanExpression valueFilter2 =
+ new GreaterThanExpression(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+ new ConstantOperand(TSDataType.INT32, "10"));
+ LogicAndExpression predicate2 = new LogicAndExpression(timeFilter, valueFilter2);
+
+ FilterNode filterNode2 =
+ new FilterNode(
+ new PlanNodeId("8"),
+ timeJoinNode2,
+ new Expression[] {
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+ },
+ predicate2,
+ false,
+ ZonedDateTime.now().getOffset());
+
AggregationNode aggregationNode2 =
new AggregationNode(
- new PlanNodeId("test_query_9"),
+ new PlanNodeId("9"),
Collections.singletonList(filterNode2),
Arrays.asList(
new AggregationDescriptor(
AggregationType.COUNT,
- AggregationStep.PARTIAL,
+ AggregationStep.FINAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
new AggregationDescriptor(
- AggregationType.LAST_VALUE,
- AggregationStep.PARTIAL,
+ AggregationType.MAX_VALUE,
+ AggregationStep.FINAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))),
new AggregationDescriptor(
- AggregationType.MAX_VALUE,
- AggregationStep.PARTIAL,
+ AggregationType.LAST_VALUE,
+ AggregationStep.FINAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))));
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))));
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
+ deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 2, 3));
DeviceViewNode deviceViewNode =
new DeviceViewNode(
- new PlanNodeId("test_query_10"),
+ new PlanNodeId("10"),
Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_DESC),
- Arrays.asList("count(s1)", "max_value(s2)", "last_value(s1)"));
+ Arrays.asList(
+ HeaderConstant.COLUMN_DEVICE, "count(s1)", "max_value(s2)", "last_value(s1)"),
+ deviceToMeasurementIndexesMap);
deviceViewNode.addChildDeviceNode("root.sg.d1", aggregationNode1);
deviceViewNode.addChildDeviceNode("root.sg.d2", aggregationNode2);
- FilterNullNode filterNullNode =
- new FilterNullNode(
- new PlanNodeId("test_query_11"),
- deviceViewNode,
- FilterNullPolicy.CONTAINS_NULL,
- new ArrayList<>());
-
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_12"), filterNullNode, 100);
- LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_13"), offsetNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("11"), deviceViewNode, 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("12"), offsetNode, 100);
- // querySQLs.add(sql);
- // sqlToPlanMap.put(sql, limitNode);
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, limitNode);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
index e93fcf1fb4..e1ead56759 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
@@ -63,7 +63,6 @@ public class AggregationNodeSerdeTest {
AggregationStep.INTERMEDIATE,
Collections.singletonList(
new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))))),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC,
new In<>(Sets.newHashSet("s1", "s2"), VALUE_FILTER, true),
groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
index d2fa325a0c..b8ed4919fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.HashMap;
import static org.junit.Assert.assertEquals;
@@ -43,7 +44,8 @@ public class DeviceViewNodeSerdeTest {
new DeviceViewNode(
new PlanNodeId("TestDeviceMergeNode"),
Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_ASC),
- Arrays.asList("s1", "s2"));
+ Arrays.asList("s1", "s2"),
+ new HashMap<>());
deviceViewNode.addChildDeviceNode("root.sg.d1", timeJoinNode1);
deviceViewNode.addChildDeviceNode("root.sg.d2", timeJoinNode2);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
index 931183c13a..7e94ffff86 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.commons.compress.utils.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -62,7 +61,6 @@ public class GroupByLevelNodeSerdeTest {
AggregationStep.FINAL,
Collections.singletonList(
new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))))),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC,
null,
groupByTimeParameter,
@@ -78,7 +76,6 @@ public class GroupByLevelNodeSerdeTest {
AggregationStep.FINAL,
Collections.singletonList(
new TimeSeriesOperand(new PartialPath("root.sg.d2.s1"))))),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC,
null,
groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java
index bf263094e3..42348c2911 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -48,7 +47,6 @@ public class LimitNodeSerdeTest {
new SeriesScanNode(
new PlanNodeId("TestSeriesScanNode"),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_DESC,
new GroupByFilter(1, 2, 3, 4),
null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java
index 7b506f5946..0f410299d8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class OffsetNodeSerdeTest {
new SeriesScanNode(
new PlanNodeId("TestSeriesScanNode"),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_DESC,
new GroupByFilter(1, 2, 3, 4),
null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
index 76ec69aaae..66b4943736 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class SortNodeSerdeTest {
new SeriesScanNode(
new PlanNodeId("TestSeriesScanNode"),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_DESC,
new GroupByFilter(1, 2, 3, 4),
null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java
index 82e5283ccf..96224b8873 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class TimeJoinNodeSerdeTest {
new SeriesScanNode(
new PlanNodeId("TestSeriesScanNode"),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_DESC,
TimeFilter.gt(100),
null,
@@ -59,7 +57,6 @@ public class TimeJoinNodeSerdeTest {
new SeriesScanNode(
new PlanNodeId("TestSeriesScanNode"),
new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
- Sets.newHashSet("s1", "s2"),
OrderBy.TIMESTAMP_DESC,
null,
ValueFilter.gt(100),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java
index 1ea22b4e2f..072cf1a0a4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java
@@ -64,7 +64,6 @@ public class SeriesAggregationScanNodeSerdeTest {
new PlanNodeId("TestSeriesAggregateScanNode"),
new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN),
aggregationDescriptorList,
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_ASC,
new In<>(Sets.newHashSet("s1", "s2"), VALUE_FILTER, true),
groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
index a40fa7c997..0ce1190d11 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class SeriesScanNodeSerdeTest {
new SeriesScanNode(
new PlanNodeId("TestSeriesScanNode"),
new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
- Sets.newHashSet("s1"),
OrderBy.TIMESTAMP_DESC,
new GroupByFilter(1, 2, 3, 4),
null,