You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/10 12:57:55 UTC
[iotdb] branch master updated: [IOTDB-2658] Generate logical plan for query statement —— UT & Raw Data Query & Aggregation Query (#5469)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cc78c84f99 [IOTDB-2658] Generate logical plan for query statement —— UT & Raw Data Query & Aggregation Query (#5469)
cc78c84f99 is described below
commit cc78c84f99aadcf5aa59dcbdba862171f267173b
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun Apr 10 20:57:51 2022 +0800
[IOTDB-2658] Generate logical plan for query statement —— UT & Raw Data Query & Aggregation Query (#5469)
---
.../exception/sql/StatementAnalyzeException.java | 6 +
.../db/mpp/common/filter/BasicFunctionFilter.java | 8 +-
.../iotdb/db/mpp/common/filter/InFilter.java | 6 +-
.../iotdb/db/mpp/common/filter/LikeFilter.java | 8 +-
.../iotdb/db/mpp/common/filter/QueryFilter.java | 19 +-
.../iotdb/db/mpp/common/filter/RegexpFilter.java | 8 +-
.../iotdb/db/mpp/common/schematree/SchemaTree.java | 6 +-
.../apache/iotdb/db/mpp/sql/analyze/Analysis.java | 21 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 59 ++-
.../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java | 51 +-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 7 +-
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 236 +++------
.../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 277 ++++++++++
.../plan/IOutputPlanNode.java} | 17 +-
.../db/mpp/sql/planner/plan/node/ColumnHeader.java | 88 ++++
.../db/mpp/sql/planner/plan/node/PlanNode.java | 2 -
.../plan/node/metedata/read/ShowDevicesNode.java | 5 -
.../node/metedata/write/AlterTimeSeriesNode.java | 5 -
.../plan/node/metedata/write/AuthorNode.java | 5 -
.../write/CreateAlignedTimeSeriesNode.java | 5 -
.../node/metedata/write/CreateTimeSeriesNode.java | 5 -
.../planner/plan/node/process/AggregateNode.java | 93 +++-
.../planner/plan/node/process/DeviceMergeNode.java | 79 ++-
.../planner/plan/node/process/ExchangeNode.java | 5 -
.../sql/planner/plan/node/process/FillNode.java | 17 +-
.../sql/planner/plan/node/process/FilterNode.java | 59 ++-
.../planner/plan/node/process/FilterNullNode.java | 63 ++-
.../plan/node/process/GroupByLevelNode.java | 91 ++--
.../sql/planner/plan/node/process/LimitNode.java | 41 +-
.../sql/planner/plan/node/process/OffsetNode.java | 37 +-
.../sql/planner/plan/node/process/SortNode.java | 17 +-
.../planner/plan/node/process/TimeJoinNode.java | 78 ++-
.../planner/plan/node/sink/FragmentSinkNode.java | 5 -
.../plan/node/source/SeriesAggregateScanNode.java | 133 +++--
.../planner/plan/node/source/SeriesScanNode.java | 78 ++-
.../sql/planner/plan/node/source/SourceNode.java | 22 -
.../plan/node/write/InsertMultiTabletsNode.java | 5 -
.../sql/planner/plan/node/write/InsertRowNode.java | 5 -
.../planner/plan/node/write/InsertRowsNode.java | 5 -
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 5 -
.../planner/plan/node/write/InsertTabletNode.java | 5 -
.../sql/rewriter/ColumnPaginationController.java | 46 +-
.../db/mpp/sql/rewriter/WildcardsRemover.java | 24 +-
.../db/mpp/sql/statement/StatementVisitor.java | 28 +
.../statement/component/GroupByLevelComponent.java | 5 +
.../component/GroupByLevelController.java | 16 +
.../sql/statement/component/SelectComponent.java | 16 +-
.../statement/crud/AggregationQueryStatement.java | 29 +-
.../mpp/sql/statement/crud/FillQueryStatement.java | 7 +-
.../statement/crud/GroupByFillQueryStatement.java | 5 +
.../sql/statement/crud/GroupByQueryStatement.java | 5 +
.../mpp/sql/statement/crud/LastQueryStatement.java | 7 +-
.../db/mpp/sql/statement/crud/QueryStatement.java | 41 +-
.../mpp/sql/statement/crud/UDAFQueryStatement.java | 7 +-
.../mpp/sql/statement/crud/UDTFQueryStatement.java | 6 +
.../apache/iotdb/db/qp/constant/SQLConstant.java | 4 +
.../iotdb/db/query/expression/Expression.java | 4 -
.../query/expression/binary/BinaryExpression.java | 7 -
.../db/query/expression/unary/ConstantOperand.java | 7 -
.../query/expression/unary/FunctionExpression.java | 11 -
.../query/expression/unary/LogicNotExpression.java | 7 -
.../query/expression/unary/NegationExpression.java | 7 -
.../query/expression/unary/TimeSeriesOperand.java | 8 -
.../org/apache/iotdb/db/utils/SchemaUtils.java | 2 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 101 ++--
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 103 +---
.../db/mpp/sql/plan/QueryLogicalPlanUtil.java | 567 +++++++++++++++++++++
.../read/expression/impl/BinaryExpression.java | 39 ++
.../read/expression/impl/GlobalTimeExpression.java | 20 +
.../expression/impl/SingleSeriesExpression.java | 20 +
70 files changed, 2143 insertions(+), 693 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java b/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java
index efecd6de7e..4d4cea20cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java
@@ -36,4 +36,10 @@ public class StatementAnalyzeException extends IoTDBException {
filterOperator, filterType, FilterConstant.filterNames.get(filterType)),
TSStatusCode.LOGICAL_OPTIMIZE_ERROR.getStatusCode());
}
+
+ public StatementAnalyzeException(String type, String message) {
+ super(
+ String.format("Unsupported type: [%s]. %s", type, message),
+ TSStatusCode.LOGICAL_OPTIMIZE_ERROR.getStatusCode());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
index 46bb10ddfe..aec114cffb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.mpp.common.filter;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.exception.sql.SQLParserException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
@@ -75,7 +75,7 @@ public class BasicFunctionFilter extends FunctionFilter {
@Override
protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
- throws LogicalOperatorException, MetadataException {
+ throws StatementAnalyzeException, MetadataException {
TSDataType type = pathTSDataTypeHashMap.get(singlePath);
if (type == null) {
throw new MetadataException(
@@ -109,12 +109,12 @@ public class BasicFunctionFilter extends FunctionFilter {
? new Binary(value.substring(1, value.length() - 1))
: new Binary(value));
} else {
- throw new LogicalOperatorException(
+ throw new StatementAnalyzeException(
"For Basic operator,TEXT type only support EQUAL or NOTEQUAL operator");
}
break;
default:
- throw new LogicalOperatorException(type.toString(), "");
+ throw new StatementAnalyzeException(type.toString(), "");
}
return new Pair<>(ret, singlePath.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
index 4d31e28d3f..dbc3b818aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.common.filter;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -75,7 +75,7 @@ public class InFilter extends FunctionFilter {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
- throws LogicalOperatorException, MetadataException {
+ throws StatementAnalyzeException, MetadataException {
TSDataType type = pathTSDataTypeHashMap.get(singlePath);
if (type == null) {
throw new MetadataException(
@@ -131,7 +131,7 @@ public class InFilter extends FunctionFilter {
ret = In.getUnaryExpression(singlePath, binaryValues, not);
break;
default:
- throw new LogicalOperatorException(type.toString(), "");
+ throw new StatementAnalyzeException(type.toString(), "");
}
return new Pair<>(ret, singlePath.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
index bc8186bafd..659f744316 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.common.filter;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -51,7 +51,7 @@ public class LikeFilter extends FunctionFilter {
@Override
protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
- throws LogicalOperatorException, MetadataException {
+ throws StatementAnalyzeException, MetadataException {
TSDataType type = pathTSDataTypeHashMap.get(singlePath);
if (type == null) {
throw new MetadataException(
@@ -59,9 +59,9 @@ public class LikeFilter extends FunctionFilter {
}
IUnaryExpression ret;
if (type != TEXT) {
- throw new LogicalOperatorException(type.toString(), "Only TEXT is supported in 'Like'");
+ throw new StatementAnalyzeException(type.toString(), "Only TEXT is supported in 'Like'");
} else if (value.startsWith("\"") && value.endsWith("\"")) {
- throw new LogicalOperatorException(value, "Please use single quotation marks");
+ throw new StatementAnalyzeException(value, "Please use single quotation marks");
} else {
ret =
Like.getUnaryExpression(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
index e5e4b2b293..0b74f3d52d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
@@ -19,8 +19,7 @@
package org.apache.iotdb.db.mpp.common.filter;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
@@ -124,18 +123,18 @@ public class QueryFilter implements Comparable<QueryFilter> {
* @param pathTSDataTypeHashMap
*/
public IExpression transformToExpression(Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
- throws QueryProcessException {
+ throws StatementAnalyzeException {
if (isSingle) {
Pair<IUnaryExpression, String> ret;
try {
ret = transformToSingleQueryFilter(pathTSDataTypeHashMap);
} catch (MetadataException e) {
- throw new QueryProcessException(e);
+ throw new StatementAnalyzeException("Meet error when transformToSingleQueryFilter");
}
return ret.left;
} else {
if (childOperators.isEmpty()) {
- throw new LogicalOperatorException(
+ throw new StatementAnalyzeException(
String.valueOf(filterType), "this filter is not leaf, but it's empty");
}
IExpression retFilter = childOperators.get(0).transformToExpression(pathTSDataTypeHashMap);
@@ -150,7 +149,7 @@ public class QueryFilter implements Comparable<QueryFilter> {
retFilter = BinaryExpression.or(retFilter, currentFilter);
break;
default:
- throw new LogicalOperatorException(
+ throw new StatementAnalyzeException(
String.valueOf(filterType), "Maybe it means " + getFilterName());
}
}
@@ -168,9 +167,9 @@ public class QueryFilter implements Comparable<QueryFilter> {
*/
protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
- throws LogicalOperatorException, MetadataException {
+ throws StatementAnalyzeException, MetadataException {
if (childOperators.isEmpty()) {
- throw new LogicalOperatorException(
+ throw new StatementAnalyzeException(
String.valueOf(filterType),
"TransformToSingleFilter: this filter is not a leaf, but it's empty.");
}
@@ -183,7 +182,7 @@ public class QueryFilter implements Comparable<QueryFilter> {
for (int i = 1; i < childOperators.size(); i++) {
currentPair = childOperators.get(i).transformToSingleQueryFilter(pathTSDataTypeHashMap);
if (!path.equals(currentPair.right)) {
- throw new LogicalOperatorException(
+ throw new StatementAnalyzeException(
"TransformToSingleFilter: paths among children are not inconsistent: one is: "
+ path
+ ", another is: "
@@ -199,7 +198,7 @@ public class QueryFilter implements Comparable<QueryFilter> {
FilterFactory.or(retFilter.getFilter(), currentPair.left.getFilter()));
break;
default:
- throw new LogicalOperatorException(
+ throw new StatementAnalyzeException(
String.valueOf(filterType), "Maybe it means " + getFilterName());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
index 6001eed41e..5417a12d39 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.common.filter;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -51,7 +51,7 @@ public class RegexpFilter extends FunctionFilter {
@Override
protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
- throws LogicalOperatorException, MetadataException {
+ throws StatementAnalyzeException, MetadataException {
TSDataType type = pathTSDataTypeHashMap.get(singlePath);
if (type == null) {
throw new MetadataException(
@@ -59,9 +59,9 @@ public class RegexpFilter extends FunctionFilter {
}
IUnaryExpression ret;
if (type != TEXT) {
- throw new LogicalOperatorException(type.toString(), "Only TEXT is supported in 'Regexp'");
+ throw new StatementAnalyzeException(type.toString(), "Only TEXT is supported in 'Regexp'");
} else if (value.startsWith("\"") && value.endsWith("\"")) {
- throw new LogicalOperatorException(value, "Please use single quotation marks");
+ throw new StatementAnalyzeException(value, "Please use single quotation marks");
} else {
ret =
Regexp.getUnaryExpression(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 5543d3ab5f..240a6c1875 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -32,7 +32,11 @@ import java.util.List;
public class SchemaTree {
- private SchemaNode root;
+ private final SchemaNode root;
+
+ public SchemaTree(SchemaNode root) {
+ this.root = root;
+ }
/**
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index 84f49b66ba..3a7b961bf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -25,11 +25,10 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
/** Analysis used for planning a query. TODO: This class may need to store more info for a query. */
public class Analysis {
@@ -51,7 +50,7 @@ public class Analysis {
private SchemaTree schemaTree;
- private Map<String, Set<PartialPath>> deviceIdToPathsMap;
+ private IExpression queryFilter;
public List<RegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
// TODO: (xingtanzjr) implement the calculation of timePartitionIdList
@@ -82,14 +81,6 @@ public class Analysis {
this.schemaPartition = schemaPartition;
}
- public Map<String, Set<PartialPath>> getDeviceIdToPathsMap() {
- return deviceIdToPathsMap;
- }
-
- public void setDeviceIdToPathsMap(Map<String, Set<PartialPath>> deviceIdToPathsMap) {
- this.deviceIdToPathsMap = deviceIdToPathsMap;
- }
-
public SchemaTree getSchemaTree() {
return schemaTree;
}
@@ -97,4 +88,12 @@ public class Analysis {
public void setSchemaTree(SchemaTree schemaTree) {
this.schemaTree = schemaTree;
}
+
+ public IExpression getQueryFilter() {
+ return queryFilter;
+ }
+
+ public void setQueryFilter(IExpression expression) {
+ this.queryFilter = expression;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 4376baaa23..c0aaacb62e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -40,14 +40,26 @@ import org.apache.iotdb.db.mpp.sql.rewriter.RemoveNotOptimizer;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
-
-import java.util.*;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Analyze the statement and generate Analysis. */
public class Analyzer {
@@ -93,16 +105,27 @@ public class Analyzer {
SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
// bind metadata, remove wildcards, and apply SLIMIT & SOFFSET
- Map<String, Set<PartialPath>> deviceIdToPathsMap = new HashMap<>();
rewrittenStatement =
- (QueryStatement)
- new WildcardsRemover().rewrite(rewrittenStatement, schemaTree, deviceIdToPathsMap);
+ (QueryStatement) new WildcardsRemover().rewrite(rewrittenStatement, schemaTree);
// fetch partition information
+ Set<String> devicePathSet = new HashSet<>();
+ for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
+ devicePathSet.addAll(
+ resultColumn.collectPaths().stream()
+ .map(PartialPath::getDevice)
+ .collect(Collectors.toList()));
+ }
+ if (queryStatement.getWhereCondition() != null) {
+ devicePathSet.addAll(
+ queryStatement.getWhereCondition().getQueryFilter().getPathSet().stream()
+ .map(PartialPath::getDevice)
+ .collect(Collectors.toList()));
+ }
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
- for (String deviceId : deviceIdToPathsMap.keySet()) {
+ for (String devicePath : devicePathSet) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(deviceId);
+ dataPartitionQueryParam.setDevicePath(devicePath);
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
DataPartition dataPartition =
@@ -115,13 +138,29 @@ public class Analyzer {
filter = new RemoveNotOptimizer().optimize(filter);
filter = new DnfFilterOptimizer().optimize(filter);
filter = new MergeSingleFilterOptimizer().optimize(filter);
- whereCondition.setQueryFilter(filter);
+
+ // transform QueryFilter to expression
+ List<PartialPath> filterPaths = new ArrayList<>(filter.getPathSet());
+ HashMap<PartialPath, TSDataType> pathTSDataTypeHashMap = new HashMap<>();
+ for (PartialPath filterPath : filterPaths) {
+ pathTSDataTypeHashMap.put(
+ filterPath,
+ SQLConstant.isReservedPath(filterPath)
+ ? TSDataType.INT64
+ : filterPath.getSeriesType());
+ }
+ IExpression expression = filter.transformToExpression(pathTSDataTypeHashMap);
+ expression =
+ ExpressionOptimizer.getInstance()
+ .optimize(expression, queryStatement.getSelectComponent().getDeduplicatedPaths());
+ analysis.setQueryFilter(expression);
}
analysis.setStatement(rewrittenStatement);
analysis.setSchemaTree(schemaTree);
- analysis.setDeviceIdToPathsMap(deviceIdToPathsMap);
analysis.setDataPartitionInfo(dataPartition);
- } catch (StatementAnalyzeException | PathNumOverLimitException e) {
+ } catch (StatementAnalyzeException
+ | PathNumOverLimitException
+ | QueryFilterOptimizationException e) {
e.printStackTrace();
}
return analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
index ed23e6e4d8..6e5ed07282 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
@@ -20,22 +20,65 @@
package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.*;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
public class FakeSchemaFetcherImpl implements ISchemaFetcher {
+
+ private final SchemaTree schemaTree = new SchemaTree(generateSchemaTree());
+
@Override
public SchemaTree fetchSchema(PathPatternTree patternTree) {
- return new SchemaTree();
+ return schemaTree;
}
@Override
public SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
- return null;
+ return schemaTree;
+ }
+
+ /**
+ * Generate the following tree: root.sg.d1.s1, root.sg.d1.s2(status) root.sg.d2.s1,
+ * root.sg.d2.s2(status) root.sg.d2.a.s1, root.sg.d2.a.s2(status)
+ *
+ * @return the root node of the generated schemTree
+ */
+ private SchemaNode generateSchemaTree() {
+ SchemaNode root = new SchemaInternalNode("root");
+
+ SchemaNode sg = new SchemaInternalNode("sg");
+ root.addChild("sg", sg);
+
+ SchemaEntityNode d1 = new SchemaEntityNode("d1");
+ sg.addChild("d1", d1);
+
+ SchemaMeasurementNode s1 =
+ new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
+ d1.addChild("s1", s1);
+ SchemaMeasurementNode s2 =
+ new SchemaMeasurementNode("s2", new MeasurementSchema("s1", TSDataType.INT32));
+ s2.setAlias("status");
+ d1.addChild("s2", s2);
+ d1.addAliasChild("status", s2);
+
+ SchemaEntityNode d2 = new SchemaEntityNode("d2");
+ sg.addChild("d2", d2);
+ d2.addChild("s1", s1);
+ d2.addChild("s2", s2);
+ d2.addAliasChild("status", s2);
+
+ SchemaEntityNode a = new SchemaEntityNode("a");
+ a.setAligned(true);
+ d2.addChild("a", a);
+ a.addChild("s1", s1);
+ a.addChild("s2", s2);
+ a.addAliasChild("status", s2);
+
+ return root;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 1dbc265c0c..cf40915784 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.mpp.buffer.DataBlockManager;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.execution.DataDriver;
import org.apache.iotdb.db.mpp.execution.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
@@ -56,6 +55,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanN
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
@@ -168,7 +168,7 @@ public class LocalExecutionPlanner {
public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
PlanNode child = node.getChild();
- QueryFilter filterExpression = node.getPredicate();
+ IExpression filterExpression = node.getPredicate();
List<String> outputSymbols = node.getOutputColumnNames();
return super.visitFilter(node, context);
}
@@ -222,7 +222,8 @@ public class LocalExecutionPlanner {
context.getNextOperatorId(),
node.getPlanNodeId(),
TimeJoinOperator.class.getSimpleName());
- return new TimeJoinOperator(operatorContext, children, node.getMergeOrder(), node.getTypes());
+ return new TimeJoinOperator(
+ operatorContext, children, node.getMergeOrder(), node.getOutputColumnTypes());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 2db6e48a2f..1b842beac9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
@@ -29,25 +29,12 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSer
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
-import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
+import org.apache.iotdb.db.mpp.sql.statement.component.*;
import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
@@ -58,17 +45,15 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
-import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
/** Generate a logical plan for the statement. */
public class LogicalPlanner {
@@ -98,7 +83,7 @@ public class LogicalPlanner {
* This visitor is used to generate a logical plan for the statement and returns the {@link
* PlanNode}.
*/
- private class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryContext> {
+ private static class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryContext> {
private final Analysis analysis;
@@ -108,157 +93,83 @@ public class LogicalPlanner {
@Override
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
- PlanBuilder planBuilder = planSelectComponent(queryStatement);
-
- if (queryStatement.getWhereCondition() != null) {
- planBuilder =
- planQueryFilter(planBuilder, queryStatement.getWhereCondition().getQueryFilter());
- }
-
- if (queryStatement.isGroupByLevel()) {
- planBuilder =
- planGroupByLevel(
- planBuilder,
- ((AggregationQueryStatement) queryStatement).getGroupByLevelComponent());
- }
-
- if (queryStatement instanceof FillQueryStatement) {
- planBuilder =
- planFill(planBuilder, ((FillQueryStatement) queryStatement).getFillComponent());
- }
-
- planBuilder = planFilterNull(planBuilder, queryStatement.getFilterNullComponent());
- planBuilder = planSort(planBuilder, queryStatement.getResultOrder());
- planBuilder = planLimit(planBuilder, queryStatement.getRowLimit());
- planBuilder = planOffset(planBuilder, queryStatement.getRowOffset());
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+
+ planBuilder.planRawDataQuerySource(
+ queryStatement.getDeviceNameToPathsMap(),
+ queryStatement.getResultOrder(),
+ queryStatement.isAlignByDevice(),
+ analysis.getQueryFilter(),
+ queryStatement.getSelectedPathNames());
+
+ planBuilder.planFilterNull(queryStatement.getFilterNullComponent());
+ planBuilder.planLimit(queryStatement.getRowLimit());
+ planBuilder.planOffset(queryStatement.getRowOffset());
return planBuilder.getRoot();
}
- private PlanBuilder planSelectComponent(QueryStatement queryStatement) {
- // TODO: generate SourceNode for QueryFilter
- Map<String, Set<SourceNode>> deviceNameToSourceNodesMap = new HashMap<>();
-
- for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
- Set<SourceNode> sourceNodes = planResultColumn(resultColumn);
- for (SourceNode sourceNode : sourceNodes) {
- String deviceName = sourceNode.getDeviceName();
- deviceNameToSourceNodesMap
- .computeIfAbsent(deviceName, k -> new HashSet<>())
- .add(sourceNode);
- }
- }
-
- if (queryStatement.isAlignByDevice()) {
- DeviceMergeNode deviceMergeNode = new DeviceMergeNode(context.getQueryId().genPlanNodeId());
- for (Map.Entry<String, Set<SourceNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
- String deviceName = entry.getKey();
- List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
- if (planNodes.size() == 1) {
- deviceMergeNode.addChildDeviceNode(deviceName, planNodes.get(0));
- } else {
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(
- context.getQueryId().genPlanNodeId(),
- queryStatement.getResultOrder(),
- null,
- planNodes);
- deviceMergeNode.addChildDeviceNode(deviceName, timeJoinNode);
- }
- }
- return new PlanBuilder(deviceMergeNode);
- }
-
- List<PlanNode> planNodes =
- deviceNameToSourceNodesMap.entrySet().stream()
- .flatMap(entry -> entry.getValue().stream())
- .collect(Collectors.toList());
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(
- context.getQueryId().genPlanNodeId(),
- queryStatement.getResultOrder(),
- null,
- planNodes);
- return new PlanBuilder(timeJoinNode);
- }
-
- private Set<SourceNode> planResultColumn(ResultColumn resultColumn) {
- Set<SourceNode> resultSourceNodeSet = new HashSet<>();
- resultColumn
- .getExpression()
- .collectPlanNode(resultSourceNodeSet, context.getQueryId().genPlanNodeId());
- return resultSourceNodeSet;
- }
-
- private PlanBuilder planQueryFilter(PlanBuilder planBuilder, QueryFilter queryFilter) {
- if (queryFilter == null) {
- return planBuilder;
+ @Override
+ public PlanNode visitAggregationQuery(
+ AggregationQueryStatement queryStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap;
+
+ if (analysis.getQueryFilter() != null
+ && analysis.getQueryFilter().getType() != ExpressionType.GLOBAL_TIME) {
+ // with value filter
+ planBuilder.planAggregationSourceWithValueFilter(
+ queryStatement.getDeviceNameToAggregationsMap(),
+ queryStatement.getDeviceNameToPathsMap(),
+ queryStatement.getResultOrder(),
+ queryStatement.isAlignByDevice(),
+ analysis.getQueryFilter(),
+ queryStatement.getSelectedPathNames());
+ } else {
+ // without value filter
+ planBuilder.planAggregationSourceWithoutValueFilter(
+ queryStatement.getDeviceNameToAggregationsMap(),
+ queryStatement.getResultOrder(),
+ queryStatement.isAlignByDevice(),
+ analysis.getQueryFilter());
}
- return planBuilder.withNewRoot(
- new FilterNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), queryFilter));
+ planBuilder.planGroupByLevel(queryStatement.getGroupByLevelComponent());
+ planBuilder.planFilterNull(queryStatement.getFilterNullComponent());
+ planBuilder.planLimit(queryStatement.getRowLimit());
+ planBuilder.planOffset(queryStatement.getRowOffset());
+ return planBuilder.getRoot();
}
- private PlanBuilder planGroupByLevel(
- PlanBuilder planBuilder, GroupByLevelComponent groupByLevelComponent) {
- if (groupByLevelComponent == null) {
- return planBuilder;
- }
-
- return planBuilder.withNewRoot(
- new GroupByLevelNode(
- context.getQueryId().genPlanNodeId(),
- planBuilder.getRoot(),
- groupByLevelComponent.getLevels(),
- groupByLevelComponent.getGroupedPathMap()));
+ @Override
+ public PlanNode visitGroupByQuery(
+ GroupByQueryStatement queryStatement, MPPQueryContext context) {
+ throw new UnsupportedOperationException();
}
- private PlanBuilder planFill(PlanBuilder planBuilder, FillComponent fillComponent) {
- // TODO: support Fill
- return planBuilder;
+ @Override
+ public PlanNode visitGroupByFillQuery(
+ GroupByFillQueryStatement queryStatement, MPPQueryContext context) {
+ throw new UnsupportedOperationException();
}
- private PlanBuilder planFilterNull(
- PlanBuilder planBuilder, FilterNullComponent filterNullComponent) {
- if (filterNullComponent == null) {
- return planBuilder;
- }
-
- return planBuilder.withNewRoot(
- new FilterNullNode(
- context.getQueryId().genPlanNodeId(),
- planBuilder.getRoot(),
- filterNullComponent.getWithoutPolicyType(),
- filterNullComponent.getWithoutNullColumns().stream()
- .map(Expression::getExpressionString)
- .collect(Collectors.toList())));
+ @Override
+ public PlanNode visitFillQuery(FillQueryStatement queryStatement, MPPQueryContext context) {
+ throw new UnsupportedOperationException();
}
- private PlanBuilder planSort(PlanBuilder planBuilder, OrderBy resultOrder) {
- if (resultOrder == null || resultOrder == OrderBy.TIMESTAMP_ASC) {
- return planBuilder;
- }
-
- return planBuilder.withNewRoot(
- new SortNode(
- context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), null, resultOrder));
+ @Override
+ public PlanNode visitLastQuery(LastQueryStatement queryStatement, MPPQueryContext context) {
+ throw new UnsupportedOperationException();
}
- private PlanBuilder planLimit(PlanBuilder planBuilder, int rowLimit) {
- if (rowLimit == 0) {
- return planBuilder;
- }
-
- return planBuilder.withNewRoot(
- new LimitNode(context.getQueryId().genPlanNodeId(), rowLimit, planBuilder.getRoot()));
+ @Override
+ public PlanNode visitUDTFQuery(UDTFQueryStatement queryStatement, MPPQueryContext context) {
+ throw new UnsupportedOperationException();
}
- private PlanBuilder planOffset(PlanBuilder planBuilder, int rowOffset) {
- if (rowOffset == 0) {
- return planBuilder;
- }
-
- return planBuilder.withNewRoot(
- new OffsetNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), rowOffset));
+ @Override
+ public PlanNode visitUDAFQuery(UDAFQueryStatement queryStatement, MPPQueryContext context) {
+ throw new UnsupportedOperationException();
}
@Override
@@ -557,21 +468,4 @@ public class LogicalPlanner {
return insertRowsNode;
}
}
-
- private class PlanBuilder {
-
- private PlanNode root;
-
- public PlanBuilder(PlanNode root) {
- this.root = root;
- }
-
- public PlanNode getRoot() {
- return root;
- }
-
- public PlanBuilder withNewRoot(PlanNode newRoot) {
- return new PlanBuilder(newRoot);
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
new file mode 100644
index 0000000000..788998f87b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class QueryPlanBuilder {
+
+ private PlanNode root;
+
+ private final MPPQueryContext context;
+
+ public QueryPlanBuilder(MPPQueryContext context) {
+ this.context = context;
+ }
+
+ public PlanNode getRoot() {
+ return root;
+ }
+
+ public void planRawDataQuerySource(
+ Map<String, Set<PartialPath>> deviceNameToPathsMap,
+ OrderBy scanOrder,
+ boolean isAlignByDevice,
+ IExpression queryFilter,
+ List<String> selectedPathList) {
+ Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+
+ for (Map.Entry<String, Set<PartialPath>> entry : deviceNameToPathsMap.entrySet()) {
+ String deviceName = entry.getKey();
+ Set<String> allSensors =
+ entry.getValue().stream().map(PartialPath::getMeasurement).collect(Collectors.toSet());
+ for (PartialPath path : entry.getValue()) {
+ deviceNameToSourceNodesMap
+ .computeIfAbsent(deviceName, k -> new ArrayList<>())
+ .add(
+ new SeriesScanNode(
+ context.getQueryId().genPlanNodeId(), path, allSensors, scanOrder));
+ }
+ }
+
+ if (isAlignByDevice) {
+ planDeviceMerge(deviceNameToSourceNodesMap, scanOrder, queryFilter, selectedPathList);
+ } else {
+ planTimeJoin(deviceNameToSourceNodesMap, scanOrder, queryFilter, selectedPathList);
+ }
+ }
+
+ public void planAggregationSourceWithoutValueFilter(
+ Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap,
+ OrderBy scanOrder,
+ boolean isAlignByDevice,
+ IExpression queryFilter) {
+ Filter timeFilter = null;
+ if (queryFilter != null) {
+ timeFilter = ((GlobalTimeExpression) queryFilter).getFilter();
+ }
+
+ Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+ for (Map.Entry<String, Map<PartialPath, Set<AggregationType>>> entry :
+ deviceNameToAggregationsMap.entrySet()) {
+ String deviceName = entry.getKey();
+
+ for (PartialPath path : entry.getValue().keySet()) {
+ deviceNameToSourceNodesMap
+ .computeIfAbsent(deviceName, k -> new ArrayList<>())
+ .add(
+ new SeriesAggregateScanNode(
+ context.getQueryId().genPlanNodeId(),
+ path,
+ new ArrayList<>(entry.getValue().get(path)),
+ scanOrder,
+ timeFilter,
+ null));
+ }
+ }
+
+ if (isAlignByDevice) {
+ planDeviceMerge(deviceNameToSourceNodesMap, scanOrder, null, null);
+ } else {
+ planTimeJoin(deviceNameToSourceNodesMap, scanOrder, null, null);
+ }
+ }
+
+ public void planAggregationSourceWithValueFilter(
+ Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap,
+ Map<String, Set<PartialPath>> deviceNameToPathsMap,
+ OrderBy scanOrder,
+ boolean isAlignByDevice,
+ IExpression queryFilter,
+ List<String> selectedPathList) {
+ Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+
+ for (Map.Entry<String, Set<PartialPath>> entry : deviceNameToPathsMap.entrySet()) {
+ String deviceName = entry.getKey();
+ Set<String> allSensors =
+ entry.getValue().stream().map(PartialPath::getMeasurement).collect(Collectors.toSet());
+ for (PartialPath path : entry.getValue()) {
+ deviceNameToSourceNodesMap
+ .computeIfAbsent(deviceName, k -> new ArrayList<>())
+ .add(
+ new SeriesScanNode(
+ context.getQueryId().genPlanNodeId(), path, allSensors, scanOrder));
+ }
+ }
+
+ if (isAlignByDevice) {
+ planDeviceMergeForAggregation(
+ deviceNameToAggregationsMap,
+ deviceNameToSourceNodesMap,
+ scanOrder,
+ queryFilter,
+ selectedPathList);
+ } else {
+ planTimeJoin(deviceNameToSourceNodesMap, scanOrder, queryFilter, selectedPathList);
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ deviceNameToAggregationsMap.values().forEach(aggregateFuncMap::putAll);
+ this.root =
+ new AggregateNode(
+ context.getQueryId().genPlanNodeId(), this.getRoot(), aggregateFuncMap, null);
+ }
+ }
+
+ public void planTimeJoin(
+ Map<String, List<PlanNode>> deviceNameToSourceNodesMap,
+ OrderBy mergeOrder,
+ IExpression queryFilter,
+ List<String> selectedPathList) {
+ List<PlanNode> sourceNodes =
+ deviceNameToSourceNodesMap.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream())
+ .collect(Collectors.toList());
+ this.root = convergeWithTimeJoin(sourceNodes, mergeOrder, queryFilter, selectedPathList);
+ }
+
+ public void planDeviceMerge(
+ Map<String, List<PlanNode>> deviceNameToSourceNodesMap,
+ OrderBy mergeOrder,
+ IExpression queryFilter,
+ List<String> selectedPathList) {
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(context.getQueryId().genPlanNodeId(), mergeOrder);
+ for (Map.Entry<String, List<PlanNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
+ String deviceName = entry.getKey();
+ List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
+ deviceMergeNode.addChildDeviceNode(
+ deviceName, convergeWithTimeJoin(planNodes, mergeOrder, queryFilter, selectedPathList));
+ }
+ this.root = deviceMergeNode;
+ }
+
+ private void planDeviceMergeForAggregation(
+ Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap,
+ Map<String, List<PlanNode>> deviceNameToSourceNodesMap,
+ OrderBy mergeOrder,
+ IExpression queryFilter,
+ List<String> selectedPathList) {
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(context.getQueryId().genPlanNodeId(), mergeOrder);
+ for (Map.Entry<String, List<PlanNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
+ String deviceName = entry.getKey();
+ List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
+ PlanNode timeJoinNode =
+ convergeWithTimeJoin(planNodes, mergeOrder, queryFilter, selectedPathList);
+ AggregateNode aggregateNode =
+ new AggregateNode(
+ context.getQueryId().genPlanNodeId(),
+ timeJoinNode,
+ deviceNameToAggregationsMap.get(deviceName),
+ null);
+ deviceMergeNode.addChildDeviceNode(deviceName, aggregateNode);
+ }
+ this.root = deviceMergeNode;
+ }
+
+ private PlanNode convergeWithTimeJoin(
+ List<PlanNode> sourceNodes,
+ OrderBy mergeOrder,
+ IExpression queryFilter,
+ List<String> outputColumnNames) {
+ PlanNode tmpNode;
+ if (sourceNodes.size() == 1) {
+ tmpNode = sourceNodes.get(0);
+ } else {
+ tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes);
+ }
+
+ if (queryFilter != null) {
+ tmpNode =
+ new FilterNode(
+ context.getQueryId().genPlanNodeId(), tmpNode, queryFilter, outputColumnNames);
+ }
+
+ return tmpNode;
+ }
+
+ public void planGroupByLevel(GroupByLevelComponent groupByLevelComponent) {
+ if (groupByLevelComponent == null) {
+ return;
+ }
+
+ this.root =
+ new GroupByLevelNode(
+ context.getQueryId().genPlanNodeId(),
+ this.getRoot(),
+ groupByLevelComponent.getLevels(),
+ groupByLevelComponent.getGroupedHeaderMap());
+ }
+
+ public void planFilterNull(FilterNullComponent filterNullComponent) {
+ if (filterNullComponent == null) {
+ return;
+ }
+
+ this.root =
+ new FilterNullNode(
+ context.getQueryId().genPlanNodeId(),
+ this.getRoot(),
+ filterNullComponent.getWithoutPolicyType(),
+ filterNullComponent.getWithoutNullColumns().stream()
+ .map(Expression::getExpressionString)
+ .collect(Collectors.toList()));
+ }
+
+ public void planLimit(int rowLimit) {
+ if (rowLimit == 0) {
+ return;
+ }
+
+ this.root = new LimitNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowLimit);
+ }
+
+ public void planOffset(int rowOffset) {
+ if (rowOffset == 0) {
+ return;
+ }
+
+ this.root = new OffsetNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowOffset);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
similarity index 68%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
index 79d3c75efa..f88ad5d70d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
@@ -17,11 +17,18 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.statement.crud;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
-public class UDTFQueryStatement extends QueryStatement {
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
- public UDTFQueryStatement(QueryStatement queryStatement) {
- super(queryStatement);
- }
+import java.util.List;
+
+public interface IOutputPlanNode {
+
+ List<ColumnHeader> getOutputColumnHeaders();
+
+ List<String> getOutputColumnNames();
+
+ List<TSDataType> getOutputColumnTypes();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java
new file mode 100644
index 0000000000..afb27badba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.Objects;
+
+public class ColumnHeader {
+
+ private final String pathName;
+ private String functionName;
+ private final TSDataType dataType;
+
+ public ColumnHeader(String pathName, TSDataType dataType) {
+ this.pathName = pathName;
+ this.dataType = dataType;
+ }
+
+ public ColumnHeader(String pathName, String functionName, TSDataType dataType) {
+ this.pathName = pathName;
+ this.functionName = functionName.toLowerCase();
+ this.dataType = dataType;
+ }
+
+ public String getColumnName() {
+ if (functionName != null) {
+ return String.format("%s(%s)", functionName, pathName);
+ }
+ return pathName;
+ }
+
+ public TSDataType getColumnType() {
+ return dataType;
+ }
+
+ public ColumnHeader replacePathWithMeasurement() {
+ String measurement = null;
+ try {
+ measurement = new PartialPath(pathName).getMeasurement();
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
+ if (functionName != null) {
+ return new ColumnHeader(measurement, functionName, dataType);
+ }
+ return new ColumnHeader(measurement, dataType);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ColumnHeader that = (ColumnHeader) o;
+ return Objects.equals(pathName, that.pathName)
+ && Objects.equals(functionName, that.functionName)
+ && dataType == that.dataType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathName, functionName, dataType);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 3fe488161c..9783d2ab2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -74,8 +74,6 @@ public abstract class PlanNode {
public abstract int allowedChildCount();
- public abstract List<String> getOutputColumnNames();
-
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPlan(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
index c00130bf14..5c7c44261c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
@@ -49,11 +49,6 @@ public class ShowDevicesNode extends ShowNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static ShowDevicesNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 492a1b892b..374279efad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -135,11 +135,6 @@ public class AlterTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
@Override
public void serialize(ByteBuffer byteBuffer) {
byteBuffer.putShort((short) PlanNodeType.ALTER_TIME_SERIES.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
index 6ab7447804..7bb72a37fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
@@ -140,11 +140,6 @@ public class AuthorNode extends PlanNode {
return 0;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
@Override
public void serialize(ByteBuffer buffer) {
buffer.putInt(getPlanType(authorType));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 722c6575e7..98561ab0e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -158,11 +158,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
@Override
public void serialize(ByteBuffer byteBuffer) {
byteBuffer.putShort((short) PlanNodeType.CREATE_ALIGNED_TIME_SERIES.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 5ba57ea192..db7f4f8c4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -156,11 +156,6 @@ public class CreateTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index 6105fbc2cb..b7d2502b35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -18,55 +18,77 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* This node is used to aggregate required series from multiple sources. The source data will be
* input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
* final series aggregated result represented by TsBlock.
*/
-public class AggregateNode extends ProcessNode {
- // The parameter of `group by time`
- // Its value will be null if there is no `group by time` clause,
- private GroupByTimeParameter groupByTimeParameter;
+public class AggregateNode extends ProcessNode implements IOutputPlanNode {
+
+ // The map from columns to corresponding aggregation functions on that column.
+ // KEY: The index of a column in the input {@link TsBlock}.
+ // VALUE: Aggregation functions on this column.
+ // (Currently, we only support one series in the aggregation function.)
+ private final Map<PartialPath, Set<AggregationType>> aggregateFuncMap;
+
+ // The parameter of `group by time`.
+ // Its value will be null if there is no `group by time` clause.
+ private final GroupByTimeParameter groupByTimeParameter;
- // The list of aggregation functions, each FunctionExpression will be output as one column of
- // result TsBlock
- // (Currently we only support one series in the aggregation function)
- // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
- private Map<String, FunctionExpression> aggregateFuncMap;
+ private final List<ColumnHeader> columnHeaders = new ArrayList<>();
- private final List<PlanNode> children;
- private final List<String> columnNames;
+ private PlanNode child;
public AggregateNode(
PlanNodeId id,
- Map<String, FunctionExpression> aggregateFuncMap,
- List<PlanNode> children,
- List<String> columnNames) {
+ PlanNode child,
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap,
+ GroupByTimeParameter groupByTimeParameter) {
super(id);
+ this.child = child;
this.aggregateFuncMap = aggregateFuncMap;
- this.children = children;
- this.columnNames = columnNames;
+ this.groupByTimeParameter = groupByTimeParameter;
+ for (Map.Entry<PartialPath, Set<AggregationType>> entry : aggregateFuncMap.entrySet()) {
+ PartialPath path = entry.getKey();
+ columnHeaders.addAll(
+ entry.getValue().stream()
+ .map(
+ functionName ->
+ new ColumnHeader(
+ path.getFullPath(), functionName.name(), path.getSeriesType()))
+ .collect(Collectors.toList()));
+ }
}
@Override
public List<PlanNode> getChildren() {
- return children;
+ return ImmutableList.of(child);
}
@Override
public void addChild(PlanNode child) {
- throw new NotImplementedException("addChild of AggregateNode is not implemented");
+ this.child = child;
}
@Override
@@ -79,9 +101,19 @@ public class AggregateNode extends ProcessNode {
return CHILD_COUNT_NO_LIMIT;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return columnHeaders;
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return columnNames;
+ return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
}
@Override
@@ -95,4 +127,25 @@ public class AggregateNode extends ProcessNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AggregateNode that = (AggregateNode) o;
+ return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+ && Objects.equals(aggregateFuncMap, that.aggregateFuncMap)
+ && Objects.equals(child, that.child);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupByTimeParameter, aggregateFuncMap, child);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index c30f48e3ae..9b21f00198 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -19,17 +19,19 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
/**
* DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
@@ -40,7 +42,8 @@ import java.util.Map;
* same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
* contain n+1 columns where the new column is Device column.
*/
-public class DeviceMergeNode extends ProcessNode {
+public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
+
// The result output order that this operator
private OrderBy mergeOrder;
@@ -48,15 +51,15 @@ public class DeviceMergeNode extends ProcessNode {
// The without policy is able to be push down to the DeviceMergeNode because we can know whether a
// row contains
// null or not.
- private FilterNullPolicy filterNullPolicy;
+ private FilterNullComponent filterNullComponent;
// The map from deviceName to corresponding query result node responsible for that device.
// DeviceNode means the node whose output TsBlock contains the data belonged to one device.
- private Map<String, PlanNode> childDeviceNodeMap;
+ private Map<String, PlanNode> childDeviceNodeMap = new HashMap<>();
private List<PlanNode> children;
- private List<String> columnNames;
+ private final List<ColumnHeader> columnHeaders = new ArrayList<>();;
public DeviceMergeNode(PlanNodeId id) {
super(id);
@@ -88,9 +91,35 @@ public class DeviceMergeNode extends ProcessNode {
return CHILD_COUNT_NO_LIMIT;
}
+ public void addChildDeviceNode(String deviceName, PlanNode childNode) {
+ this.childDeviceNodeMap.put(deviceName, childNode);
+ this.children.add(childNode);
+ updateColumnHeaders(childNode);
+ }
+
+ private void updateColumnHeaders(PlanNode childNode) {
+ List<ColumnHeader> childColumnHeaders = ((IOutputPlanNode) childNode).getOutputColumnHeaders();
+ for (ColumnHeader columnHeader : childColumnHeaders) {
+ ColumnHeader tmpColumnHeader = columnHeader.replacePathWithMeasurement();
+ if (!columnHeaders.contains(tmpColumnHeader)) {
+ columnHeaders.add(tmpColumnHeader);
+ }
+ }
+ }
+
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return columnHeaders;
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return columnNames;
+ return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
}
public OrderBy getMergeOrder() {
@@ -109,17 +138,6 @@ public class DeviceMergeNode extends ProcessNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
- public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode> deviceNodeMap) {
- this(id);
- this.childDeviceNodeMap = deviceNodeMap;
- this.children.addAll(deviceNodeMap.values());
- }
-
- public void addChildDeviceNode(String deviceName, PlanNode childNode) {
- this.childDeviceNodeMap.put(deviceName, childNode);
- this.children.add(childNode);
- }
-
@TestOnly
public Pair<String, List<String>> print() {
String title = String.format("[DeviceMergeNode (%s)]", this.getPlanNodeId());
@@ -131,4 +149,25 @@ public class DeviceMergeNode extends ProcessNode {
public void setChildren(List<PlanNode> children) {
this.children = children;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DeviceMergeNode that = (DeviceMergeNode) o;
+ return mergeOrder == that.mergeOrder
+ && filterNullComponent == that.filterNullComponent
+ && Objects.equals(childDeviceNodeMap, that.childDeviceNodeMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(mergeOrder, filterNullComponent, childDeviceNodeMap);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 21a3374644..42a8f5880a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -82,11 +82,6 @@ public class ExchangeNode extends PlanNode {
this.upstreamPlanNodeId = nodeId;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 984fb047e2..147f817f4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.collect.ImmutableList;
@@ -32,7 +35,7 @@ import java.util.ArrayList;
import java.util.List;
/** FillNode is used to fill the empty field in one row. */
-public class FillNode extends ProcessNode {
+public class FillNode extends ProcessNode implements IOutputPlanNode {
private PlanNode child;
@@ -68,9 +71,19 @@ public class FillNode extends ProcessNode {
return ONE_CHILD;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
+ return ((IOutputPlanNode) child).getOutputColumnNames();
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return ((IOutputPlanNode) child).getOutputColumnTypes();
}
public FillPolicy getFillPolicy() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 3347eead1b..46aff629dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.collect.ImmutableList;
@@ -30,22 +33,32 @@ import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode {
+public class FilterNode extends ProcessNode implements IOutputPlanNode {
private PlanNode child;
- private final QueryFilter predicate;
+ private final IExpression predicate;
- public FilterNode(PlanNodeId id, QueryFilter predicate) {
+ private List<ColumnHeader> columnHeaders;
+
+ public FilterNode(PlanNodeId id, IExpression predicate) {
super(id);
this.predicate = predicate;
}
- public FilterNode(PlanNodeId id, PlanNode child, QueryFilter predicate) {
+ public FilterNode(
+ PlanNodeId id, PlanNode child, IExpression predicate, List<String> outputColumnNames) {
this(id, predicate);
this.child = child;
+ this.columnHeaders =
+ ((IOutputPlanNode) child)
+ .getOutputColumnHeaders().stream()
+ .filter(columnHeader -> outputColumnNames.contains(columnHeader.getColumnName()))
+ .collect(Collectors.toList());
}
@Override
@@ -68,9 +81,19 @@ public class FilterNode extends ProcessNode {
return ONE_CHILD;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return columnHeaders;
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
+ return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
}
@Override
@@ -85,7 +108,7 @@ public class FilterNode extends ProcessNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
- public QueryFilter getPredicate() {
+ public IExpression getPredicate() {
return predicate;
}
@@ -98,6 +121,28 @@ public class FilterNode extends ProcessNode {
String title = String.format("[FilterNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("QueryFilter: " + this.getPredicate());
+ attributes.add("outputColumnNames: " + this.getOutputColumnNames());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FilterNode that = (FilterNode) o;
+ return Objects.equals(child, that.child)
+ && Objects.equals(predicate, that.predicate)
+ && Objects.equals(columnHeaders, that.columnHeaders);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(child, predicate, columnHeaders);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index ef1a705b24..a2fb971b0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.collect.ImmutableList;
@@ -30,25 +33,23 @@ import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode {
+public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
// The policy to discard the result from upstream operator
- private FilterNullPolicy discardPolicy;
+ private final FilterNullPolicy discardPolicy;
- private PlanNode child;
-
- private List<String> filterNullColumnNames;
+ private final List<String> filterNullColumnNames;
- public FilterNullNode(PlanNodeId id, PlanNode child) {
- super(id);
- this.child = child;
- }
+ private PlanNode child;
- public FilterNullNode(PlanNodeId id, FilterNullPolicy policy) {
+ public FilterNullNode(
+ PlanNodeId id, FilterNullPolicy policy, List<String> filterNullColumnNames) {
super(id);
this.discardPolicy = policy;
+ this.filterNullColumnNames = filterNullColumnNames;
}
public FilterNullNode(
@@ -56,9 +57,8 @@ public class FilterNullNode extends ProcessNode {
PlanNode child,
FilterNullPolicy discardPolicy,
List<String> filterNullColumnNames) {
- this(id, discardPolicy);
+ this(id, discardPolicy, filterNullColumnNames);
this.child = child;
- this.filterNullColumnNames = filterNullColumnNames;
}
@Override
@@ -73,7 +73,7 @@ public class FilterNullNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new FilterNullNode(getPlanNodeId(), discardPolicy);
+ return new FilterNullNode(getPlanNodeId(), discardPolicy, filterNullColumnNames);
}
@Override
@@ -81,9 +81,19 @@ public class FilterNullNode extends ProcessNode {
return ONE_CHILD;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
+ return ((IOutputPlanNode) child).getOutputColumnNames();
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return ((IOutputPlanNode) child).getOutputColumnTypes();
}
public FilterNullPolicy getDiscardPolicy() {
@@ -106,10 +116,6 @@ public class FilterNullNode extends ProcessNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
- public void setFilterNullColumnNames(List<String> filterNullColumnNames) {
- this.filterNullColumnNames = filterNullColumnNames;
- }
-
@TestOnly
public Pair<String, List<String>> print() {
String title = String.format("[FilterNullNode (%s)]", this.getPlanNodeId());
@@ -118,4 +124,25 @@ public class FilterNullNode extends ProcessNode {
attributes.add("FilterNullColumnNames: " + this.getFilterNullColumnNames());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FilterNullNode that = (FilterNullNode) o;
+ return discardPolicy == that.discardPolicy
+ && Objects.equals(child, that.child)
+ && Objects.equals(filterNullColumnNames, that.filterNullColumnNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(discardPolicy, child, filterNullColumnNames);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index a194ea05db..a8f41cfb71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.nio.ByteBuffer;
@@ -30,34 +33,43 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* This node is responsible for the final aggregation merge operation. It will process the data from
* TsBlock row by row. For one row, it will rollup the fields which have the same aggregate function
* and belong to one bucket. Here, that two columns belong to one bucket means the partial paths of
- * device after rolling up in specific level are the same. For example, let's say there are two
- * columns `root.sg.d1.s1` and `root.sg.d2.s1`. If the group by level parameter is [0, 1], then
- * these two columns will belong to one bucket and the bucket name is `root.sg.*.s1`. If the group
- * by level parameter is [0, 2], then these two columns will not belong to one bucket. And the total
- * buckets are `root.*.d1.s1` and `root.*.d2.s1`
+ * device after rolling up in specific level are the same.
+ *
+ * <p>For example, let's say there are two columns `root.sg.d1.s1` and `root.sg.d2.s1`.
+ *
+ * <p>If the group by level parameter is [0, 1], then these two columns will belong to one bucket
+ * and the bucket name is `root.sg.*.s1`.
+ *
+ * <p>If the group by level parameter is [0, 2], then these two columns will not belong to one
+ * bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1`
*/
-public class GroupByLevelNode extends ProcessNode {
+public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
- private PlanNode child;
+ private final int[] groupByLevels;
- private int[] groupByLevels;
+ private final Map<ColumnHeader, ColumnHeader> groupedPathMap;
- private List<String> columnNames;
+ private final PlanNode child;
- private Map<String, String> groupedPathMap;
+ private final List<ColumnHeader> columnHeaders;
public GroupByLevelNode(
- PlanNodeId id, PlanNode child, int[] groupByLevels, Map<String, String> groupedPathMap) {
+ PlanNodeId id,
+ PlanNode child,
+ int[] groupByLevels,
+ Map<ColumnHeader, ColumnHeader> groupedPathMap) {
super(id);
this.child = child;
this.groupByLevels = groupByLevels;
this.groupedPathMap = groupedPathMap;
- this.columnNames = new ArrayList<>(groupedPathMap.values());
+ this.columnHeaders = groupedPathMap.values().stream().distinct().collect(Collectors.toList());
}
@Override
@@ -80,9 +92,23 @@ public class GroupByLevelNode extends ProcessNode {
return CHILD_COUNT_NO_LIMIT;
}
+ public int[] getGroupByLevels() {
+ return groupByLevels;
+ }
+
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return columnHeaders;
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return columnNames;
+ return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
}
@Override
@@ -97,22 +123,6 @@ public class GroupByLevelNode extends ProcessNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
- public int[] getGroupByLevels() {
- return groupByLevels;
- }
-
- public void setGroupByLevels(int[] groupByLevels) {
- this.groupByLevels = groupByLevels;
- }
-
- public List<String> getColumnNames() {
- return columnNames;
- }
-
- public void setColumnNames(List<String> columnNames) {
- this.columnNames = columnNames;
- }
-
@TestOnly
public Pair<String, List<String>> print() {
String title = String.format("[GroupByLevelNode (%s)]", this.getPlanNodeId());
@@ -121,4 +131,27 @@ public class GroupByLevelNode extends ProcessNode {
attributes.add("ColumnNames: " + this.getOutputColumnNames());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GroupByLevelNode that = (GroupByLevelNode) o;
+ return Objects.equals(child, that.child)
+ && Arrays.equals(groupByLevels, that.groupByLevels)
+ && Objects.equals(groupedPathMap, that.groupedPathMap);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(child, groupedPathMap);
+ result = 31 * result + Arrays.hashCode(groupByLevels);
+ return result;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index b0c6a4e93f..0a6f971126 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -19,9 +19,12 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.collect.ImmutableList;
@@ -29,12 +32,13 @@ import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class LimitNode extends ProcessNode implements IOutputPlanNode {
// The limit count
- private int limit;
+ private final int limit;
private PlanNode child;
public LimitNode(PlanNodeId id, int limit) {
@@ -42,7 +46,7 @@ public class LimitNode extends ProcessNode {
this.limit = limit;
}
- public LimitNode(PlanNodeId id, int limit, PlanNode child) {
+ public LimitNode(PlanNodeId id, PlanNode child, int limit) {
this(id, limit);
this.child = child;
}
@@ -67,9 +71,19 @@ public class LimitNode extends ProcessNode {
return ONE_CHILD;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
+ return ((IOutputPlanNode) child).getOutputColumnNames();
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return ((IOutputPlanNode) child).getOutputColumnTypes();
}
@Override
@@ -107,4 +121,23 @@ public class LimitNode extends ProcessNode {
attributes.add("RowLimit: " + this.getLimit());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ LimitNode that = (LimitNode) o;
+ return limit == that.limit && Objects.equals(child, that.child);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(limit, child);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index f614a2cd3a..da0e0019f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -19,20 +19,24 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
* upstream nodes
*/
-public class OffsetNode extends ProcessNode {
+public class OffsetNode extends ProcessNode implements IOutputPlanNode {
// The limit count
private PlanNode child;
@@ -68,9 +72,19 @@ public class OffsetNode extends ProcessNode {
return ONE_CHILD;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return ((IOutputPlanNode) child).getOutputColumnNames();
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return ((IOutputPlanNode) child).getOutputColumnTypes();
}
@Override
@@ -100,4 +114,23 @@ public class OffsetNode extends ProcessNode {
attributes.add("RowOffset: " + this.getOffset());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ OffsetNode that = (OffsetNode) o;
+ return offset == that.offset && Objects.equals(child, that.child);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(child, offset);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 4744d1f938..33dfbf70d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.collect.ImmutableList;
@@ -35,7 +38,7 @@ import java.util.List;
* In general, the parameter in sortNode should be pushed down to the upstream operators. In our
* optimized logical query plan, the sortNode should not appear.
*/
-public class SortNode extends ProcessNode {
+public class SortNode extends ProcessNode implements IOutputPlanNode {
private PlanNode child;
@@ -74,9 +77,19 @@ public class SortNode extends ProcessNode {
return ONE_CHILD;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
+ return ((IOutputPlanNode) child).getOutputColumnNames();
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return ((IOutputPlanNode) child).getOutputColumnTypes();
}
public OrderBy getSortOrder() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index be36edb3e3..9afafe41a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -30,18 +32,19 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
- * TimeJoinOperator is responsible for join two or more TsBlock. The join algorithm is like outer
- * join by timestamp column. It will join two or more TsBlock by Timestamp column. The output result
- * of TimeJoinOperator is sorted by timestamp
+ * This node is responsible for join two or more TsBlock. The join algorithm is like outer join by
+ * timestamp column. It will join two or more TsBlock by Timestamp column. The output result of
+ * TimeJoinOperator is sorted by timestamp
*/
// TODO: define the TimeJoinMergeNode for distributed plan
-public class TimeJoinNode extends ProcessNode {
+public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
// This parameter indicates the order when executing multiway merge sort.
- private OrderBy mergeOrder;
+ private final OrderBy mergeOrder;
// The policy to decide whether a row should be discarded
// The without policy is able to be push down to the TimeJoinOperator because we can know whether
@@ -51,23 +54,18 @@ public class TimeJoinNode extends ProcessNode {
private List<PlanNode> children;
- // output columns' data type
- private List<TSDataType> types;
+ private final List<ColumnHeader> columnHeaders = new ArrayList<>();
- public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, FilterNullPolicy filterNullPolicy) {
+ public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
super(id);
this.mergeOrder = mergeOrder;
- this.filterNullPolicy = filterNullPolicy;
this.children = new ArrayList<>();
}
- public TimeJoinNode(
- PlanNodeId id,
- OrderBy mergeOrder,
- FilterNullPolicy filterNullPolicy,
- List<PlanNode> children) {
- this(id, mergeOrder, filterNullPolicy);
+ public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, List<PlanNode> children) {
+ this(id, mergeOrder);
this.children = children;
+ initColumnHeaders();
}
@Override
@@ -77,7 +75,7 @@ public class TimeJoinNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new TimeJoinNode(getPlanNodeId(), this.mergeOrder, this.filterNullPolicy);
+ return new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
}
@Override
@@ -85,11 +83,24 @@ public class TimeJoinNode extends ProcessNode {
return CHILD_COUNT_NO_LIMIT;
}
+ private void initColumnHeaders() {
+ for (PlanNode child : children) {
+ columnHeaders.addAll(((IOutputPlanNode) child).getOutputColumnHeaders());
+ }
+ }
+
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return columnHeaders;
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return children.stream()
- .flatMap(child -> child.getOutputColumnNames().stream())
- .collect(Collectors.toList());
+ return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ }
+
+ public List<TSDataType> getOutputColumnTypes() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
}
@Override
@@ -121,10 +132,6 @@ public class TimeJoinNode extends ProcessNode {
return filterNullPolicy;
}
- public void setMergeOrder(OrderBy mergeOrder) {
- this.mergeOrder = mergeOrder;
- }
-
public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
this.filterNullPolicy = filterNullPolicy;
}
@@ -133,10 +140,6 @@ public class TimeJoinNode extends ProcessNode {
return "TimeJoinNode-" + this.getPlanNodeId();
}
- public List<TSDataType> getTypes() {
- return types;
- }
-
@TestOnly
public Pair<String, List<String>> print() {
String title = String.format("[TimeJoinNode (%s)]", this.getPlanNodeId());
@@ -147,4 +150,25 @@ public class TimeJoinNode extends ProcessNode {
+ (this.getFilterNullPolicy() == null ? "null" : this.getFilterNullPolicy()));
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TimeJoinNode that = (TimeJoinNode) o;
+ return mergeOrder == that.mergeOrder
+ && filterNullPolicy == that.filterNullPolicy
+ && Objects.equals(children, that.children);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(mergeOrder, filterNullPolicy, children);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index d8562d144f..59dfc486ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -73,11 +73,6 @@ public class FragmentSinkNode extends SinkNode {
return ONE_CHILD;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static FragmentSinkNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 2f5d88b91b..5350b10d17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -20,12 +20,17 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -34,15 +39,17 @@ import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will
- * read the target series and calculate the aggregation result by the aggregation digest or raw data
- * of this series.
+ * This node is responsible to do the aggregation calculation for one series. It will read the
+ * target series and calculate the aggregation result by the aggregation digest or raw data of this
+ * series.
*
* <p>The aggregation result will be represented as a TsBlock
*
- * <p>This operator will split data of the target series into many groups by time range and do the
+ * <p>This node will split data of the target series into many groups by time range and do the
* aggregation calculation for each group. Each result will be one row of the result TsBlock. The
* timestamp of each row is the start time of the time range group.
*
@@ -50,26 +57,49 @@ import java.util.List;
* represent the whole aggregation result of this series. And the timestamp will be 0, which is
* meaningless.
*/
-public class SeriesAggregateScanNode extends SourceNode {
+public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNode {
- // The parameter of `group by time`
- // Its value will be null if there is no `group by time` clause,
- private GroupByTimeParameter groupByTimeParameter;
+ // The series path and aggregation functions on this series.
+ // (Currently, we only support one series in the aggregation function)
+ private final PartialPath seriesPath;
+ private final List<AggregationType> aggregateFuncList;
- // The aggregation function, which contains the function name and related series.
- // (Currently we only support one series in the aggregation function)
- // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
- private FunctionExpression aggregateFunc;
+ // The order to traverse the data.
+ // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+ // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+ private final OrderBy scanOrder;
- private Filter filter;
+ private final Filter timeFilter;
- private String columnName;
+ // The parameter of `group by time`
+ // Its value will be null if there is no `group by time` clause,
+ private final GroupByTimeParameter groupByTimeParameter;
+
+ private List<ColumnHeader> columnHeaders;
// The id of DataRegion where the node will run
private RegionReplicaSet regionReplicaSet;
- public SeriesAggregateScanNode(PlanNodeId id) {
+ public SeriesAggregateScanNode(
+ PlanNodeId id,
+ PartialPath seriesPath,
+ List<AggregationType> aggregateFuncList,
+ OrderBy scanOrder,
+ Filter timeFilter,
+ GroupByTimeParameter groupByTimeParameter) {
super(id);
+ this.seriesPath = seriesPath;
+ this.aggregateFuncList = aggregateFuncList;
+ this.scanOrder = scanOrder;
+ this.timeFilter = timeFilter;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.columnHeaders =
+ aggregateFuncList.stream()
+ .map(
+ functionType ->
+ new ColumnHeader(
+ seriesPath.getFullPath(), functionType.name(), seriesPath.getSeriesType()))
+ .collect(Collectors.toList());
}
@Override
@@ -91,19 +121,18 @@ public class SeriesAggregateScanNode extends SourceNode {
}
@Override
- public List<String> getOutputColumnNames() {
- return ImmutableList.of(columnName);
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return columnHeaders;
}
- public SeriesAggregateScanNode(PlanNodeId id, FunctionExpression aggregateFunc) {
- this(id);
- this.aggregateFunc = aggregateFunc;
+ @Override
+ public List<String> getOutputColumnNames() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
}
- public SeriesAggregateScanNode(
- PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
- this(id, aggregateFunc);
- this.groupByTimeParameter = groupByTimeParameter;
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
}
@Override
@@ -119,16 +148,6 @@ public class SeriesAggregateScanNode extends SourceNode {
this.regionReplicaSet = regionReplicaSet;
}
- @Override
- public String getDeviceName() {
- return aggregateFunc.getPaths().get(0).getDevice();
- }
-
- @Override
- protected String getExpressionString() {
- return aggregateFunc.getExpressionString();
- }
-
@Override
public void close() throws Exception {}
@@ -144,18 +163,50 @@ public class SeriesAggregateScanNode extends SourceNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
- // This method is used when do the PredicatePushDown.
- // The filter is not put in the constructor because the filter is only clear in the predicate
- // push-down stage
- public void setFilter(Filter filter) {
- this.filter = filter;
+ public PartialPath getSeriesPath() {
+ return seriesPath;
+ }
+
+ public List<AggregationType> getAggregateFuncList() {
+ return aggregateFuncList;
}
@TestOnly
public Pair<String, List<String>> print() {
String title = String.format("[SeriesAggregateScanNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
- attributes.add("AggregateFunction: " + this.getExpressionString());
+ attributes.add("AggregateFunctions: " + this.getAggregateFuncList().toString());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SeriesAggregateScanNode that = (SeriesAggregateScanNode) o;
+ return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+ && Objects.equals(seriesPath, that.seriesPath)
+ && Objects.equals(
+ aggregateFuncList.stream().sorted().collect(Collectors.toList()),
+ that.aggregateFuncList.stream().sorted().collect(Collectors.toList()))
+ && scanOrder == that.scanOrder
+ && Objects.equals(timeFilter, that.timeFilter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(),
+ groupByTimeParameter,
+ seriesPath,
+ aggregateFuncList.stream().sorted().collect(Collectors.toList()),
+ scanOrder,
+ timeFilter);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 37ba66c7fc..8cd5b3e5bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -33,6 +36,7 @@ import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
/**
@@ -42,10 +46,10 @@ import java.util.Set;
*
* <p>Children type: no child is allowed for SeriesScanNode
*/
-public class SeriesScanNode extends SourceNode {
+public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
// The path of the target series which will be scanned.
- private PartialPath seriesPath;
+ private final PartialPath seriesPath;
// all the sensors in seriesPath's device of current query
private Set<String> allSensors;
@@ -67,7 +71,7 @@ public class SeriesScanNode extends SourceNode {
// offset for result set. The default value is 0
private int offset;
- private String columnName;
+ private ColumnHeader columnHeader;
// The id of DataRegion where the node will run
private RegionReplicaSet regionReplicaSet;
@@ -77,6 +81,15 @@ public class SeriesScanNode extends SourceNode {
this.seriesPath = seriesPath;
}
+ public SeriesScanNode(
+ PlanNodeId id, PartialPath seriesPath, Set<String> allSensors, OrderBy scanOrder) {
+ super(id);
+ this.seriesPath = seriesPath;
+ this.allSensors = allSensors;
+ this.scanOrder = scanOrder;
+ this.columnHeader = new ColumnHeader(seriesPath.getFullPath(), seriesPath.getSeriesType());
+ }
+
public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, RegionReplicaSet regionReplicaSet) {
this(id, seriesPath);
this.regionReplicaSet = regionReplicaSet;
@@ -105,16 +118,6 @@ public class SeriesScanNode extends SourceNode {
this.regionReplicaSet = dataRegion;
}
- @Override
- public String getDeviceName() {
- return seriesPath.getDevice();
- }
-
- @Override
- protected String getExpressionString() {
- return seriesPath.getFullPath();
- }
-
public int getLimit() {
return limit;
}
@@ -123,10 +126,6 @@ public class SeriesScanNode extends SourceNode {
return offset;
}
- public void setScanOrder(OrderBy scanOrder) {
- this.scanOrder = scanOrder;
- }
-
public void setLimit(int limit) {
this.limit = limit;
}
@@ -153,9 +152,19 @@ public class SeriesScanNode extends SourceNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return ImmutableList.of(columnHeader);
+ }
+
@Override
public List<String> getOutputColumnNames() {
- return ImmutableList.of(columnName);
+ return ImmutableList.of(columnHeader.getColumnName());
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return ImmutableList.of(columnHeader.getColumnType());
}
public Set<String> getAllSensors() {
@@ -204,4 +213,37 @@ public class SeriesScanNode extends SourceNode {
attributes.add("scanOrder: " + this.getScanOrder());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SeriesScanNode that = (SeriesScanNode) o;
+ return limit == that.limit
+ && offset == that.offset
+ && Objects.equals(seriesPath, that.seriesPath)
+ && Objects.equals(allSensors, that.allSensors)
+ && scanOrder == that.scanOrder
+ && Objects.equals(timeFilter, that.timeFilter)
+ && Objects.equals(valueFilter, that.valueFilter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(),
+ seriesPath,
+ allSensors,
+ scanOrder,
+ timeFilter,
+ valueFilter,
+ limit,
+ offset);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 5fdf152202..8c3f2f48d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -33,26 +33,4 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
public abstract RegionReplicaSet getDataRegionReplicaSet();
public abstract void setDataRegionReplicaSet(RegionReplicaSet regionReplicaSet);
-
- public abstract String getDeviceName();
-
- protected abstract String getExpressionString();
-
- @Override
- public final int hashCode() {
- return getExpressionString().hashCode();
- }
-
- @Override
- public final boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof SourceNode)) {
- return false;
- }
-
- return getExpressionString().equals(o.toString());
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index f646cc9760..ab15bdd0da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -150,11 +150,6 @@ public class InsertMultiTabletsNode extends InsertNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index a1bcb7e5fa..002523c407 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -84,11 +84,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 92ce7684c3..68a39651f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -101,11 +101,6 @@ public class InsertRowsNode extends InsertNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index e9adb763dd..c09de08cd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -97,11 +97,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 6e5d4d1e3a..05ecc06367 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -135,11 +135,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
return NO_CHILD_ALLOWED;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java
index f479f63747..74e2c07692 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java
@@ -32,6 +32,9 @@ public class ColumnPaginationController {
// series offset for result set. The default value is 0
private final int seriesOffset;
+ // for ALIGN BY DEVICE / DISABLE ALIGN / GROUP BY LEVEL / LAST, controller does is disabled
+ private final boolean isDisabled;
+
private int curLimit =
IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1;
private int curOffset;
@@ -39,17 +42,26 @@ public class ColumnPaginationController {
// records the path number that the SchemaTree totally returned
private int consumed = 0;
- public ColumnPaginationController(int seriesLimit, int seriesOffset) {
+ public ColumnPaginationController(int seriesLimit, int seriesOffset, boolean isDisabled) {
// for series limit, the default value is 0, which means no limit
this.curLimit = seriesLimit == 0 ? this.curLimit : Math.min(seriesLimit, this.curLimit);
this.seriesOffset = this.curOffset = seriesOffset;
+ this.isDisabled = isDisabled;
}
public int getCurLimit() {
+ if (isDisabled) {
+ return 0;
+ }
+
return curLimit;
}
public int getCurOffset() {
+ if (isDisabled) {
+ return 0;
+ }
+
return curOffset;
}
@@ -60,11 +72,19 @@ public class ColumnPaginationController {
> IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum()) {
throw new PathNumOverLimitException();
}
+ if (isDisabled) {
+ return false;
+ }
+
return curLimit == 0;
}
public void checkIfSoffsetIsExceeded(List<ResultColumn> resultColumns)
throws StatementAnalyzeException {
+ if (isDisabled) {
+ return;
+ }
+
if (consumed == 0 ? seriesOffset != 0 : resultColumns.isEmpty()) {
throw new StatementAnalyzeException(
String.format(
@@ -74,28 +94,52 @@ public class ColumnPaginationController {
}
public void consume(int limit, int offset) {
+ if (isDisabled) {
+ return;
+ }
+
consumed += offset;
curOffset -= Math.min(curOffset, offset);
curLimit -= limit;
}
public boolean hasCurOffset() {
+ if (isDisabled) {
+ return false;
+ }
+
return curOffset != 0;
}
public boolean hasCurLimit() {
+ if (isDisabled) {
+ return true;
+ }
+
return curLimit != 0;
}
public void decCurOffset() {
+ if (isDisabled) {
+ return;
+ }
+
curOffset--;
}
public void decCurLimit() {
+ if (isDisabled) {
+ return;
+ }
+
curLimit--;
}
public void incConsumed(int num) {
+ if (isDisabled) {
+ return;
+ }
+
consumed += num;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java
index 6753a3ede2..f6c7fb327a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelController;
import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.LastQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.query.expression.Expression;
@@ -60,20 +61,23 @@ public class WildcardsRemover {
*/
private boolean isPrefixMatch;
- public Statement rewrite(
- Statement statement, SchemaTree schemaTree, Map<String, Set<PartialPath>> deviceIdToPathsMap)
+ public Statement rewrite(Statement statement, SchemaTree schemaTree)
throws StatementAnalyzeException, PathNumOverLimitException {
QueryStatement queryStatement = (QueryStatement) statement;
this.paginationController =
new ColumnPaginationController(
- queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
+ queryStatement.getSeriesLimit(),
+ queryStatement.getSeriesOffset(),
+ queryStatement.isAlignByDevice()
+ || queryStatement.disableAlign()
+ || queryStatement instanceof LastQueryStatement
+ || queryStatement.isGroupByLevel());
this.schemaTree = schemaTree;
this.isPrefixMatch = queryStatement.isPrefixMatchPath();
if (queryStatement.getIndexType() == null) {
// remove wildcards in SELECT clause
removeWildcardsInSelectPaths(queryStatement);
- deviceIdToPathsMap.putAll(queryStatement.getSelectComponent().getDeviceIdToPathsMap());
// remove wildcards in WITHOUT NULL clause
if (queryStatement.getFilterNullComponent() != null
@@ -84,7 +88,7 @@ public class WildcardsRemover {
// remove wildcards in WHERE clause
if (queryStatement.getWhereCondition() != null) {
- removeWildcardsInQueryFilter(queryStatement, deviceIdToPathsMap);
+ removeWildcardsInQueryFilter(queryStatement);
}
return queryStatement;
@@ -170,8 +174,7 @@ public class WildcardsRemover {
queryStatement.getFilterNullComponent().setWithoutNullColumns(resultExpressions);
}
- private void removeWildcardsInQueryFilter(
- QueryStatement queryStatement, Map<String, Set<PartialPath>> deviceIdToPathsMap)
+ private void removeWildcardsInQueryFilter(QueryStatement queryStatement)
throws StatementAnalyzeException {
WhereCondition whereCondition = queryStatement.getWhereCondition();
List<PartialPath> fromPaths = queryStatement.getFromComponent().getPrefixPaths();
@@ -180,10 +183,6 @@ public class WildcardsRemover {
whereCondition.setQueryFilter(
removeWildcardsInQueryFilter(whereCondition.getQueryFilter(), fromPaths, resultPaths));
whereCondition.getQueryFilter().setPathSet(resultPaths);
-
- for (PartialPath path : resultPaths) {
- deviceIdToPathsMap.computeIfAbsent(path.getDevice(), k -> new HashSet<>()).add(path);
- }
}
private QueryFilter removeWildcardsInQueryFilter(
@@ -279,6 +278,7 @@ public class WildcardsRemover {
paginationController.consume(pair.left.size(), pair.right);
return pair.left;
} catch (Exception e) {
+ e.printStackTrace();
throw new StatementAnalyzeException(
"error occurred when removing wildcard: " + e.getMessage());
}
@@ -292,7 +292,7 @@ public class WildcardsRemover {
List<List<Expression>> extendedExpressions = new ArrayList<>();
for (Expression originExpression : expressions) {
List<Expression> actualExpressions = new ArrayList<>();
- originExpression.removeWildcards(new WildcardsRemover(), actualExpressions);
+ originExpression.removeWildcards(this, actualExpressions);
if (actualExpressions.isEmpty()) {
// Let's ignore the eval of the function which has at least one non-existence series as
// input. See IOTDB-1212: https://github.com/apache/iotdb/pull/3101
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 69a52ef24d..77c796ce0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -73,6 +73,34 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(queryStatement, context);
}
+ public R visitAggregationQuery(AggregationQueryStatement queryStatement, C context) {
+ return visitQuery(queryStatement, context);
+ }
+
+ public R visitFillQuery(FillQueryStatement queryStatement, C context) {
+ return visitQuery(queryStatement, context);
+ }
+
+ public R visitGroupByQuery(GroupByQueryStatement queryStatement, C context) {
+ return visitQuery(queryStatement, context);
+ }
+
+ public R visitGroupByFillQuery(GroupByFillQueryStatement queryStatement, C context) {
+ return visitQuery(queryStatement, context);
+ }
+
+ public R visitLastQuery(LastQueryStatement queryStatement, C context) {
+ return visitQuery(queryStatement, context);
+ }
+
+ public R visitUDTFQuery(UDTFQueryStatement queryStatement, C context) {
+ return visitQuery(queryStatement, context);
+ }
+
+ public R visitUDAFQuery(UDAFQueryStatement queryStatement, C context) {
+ return visitQuery(queryStatement, context);
+ }
+
// Insert Statement
public R visitInsert(InsertStatement insertStatement, C context) {
return visitStatement(insertStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
index 5b91a7bba4..6d02efdee4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.statement.component;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
import java.util.Map;
@@ -48,4 +49,8 @@ public class GroupByLevelComponent extends StatementNode {
public Map<String, String> getGroupedPathMap() {
return groupByLevelController.getGroupedPathMap();
}
+
+ public Map<ColumnHeader, ColumnHeader> getGroupedHeaderMap() {
+ return groupByLevelController.getGroupedHeaderMap();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
index f00b67dab9..aec0d007ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
@@ -22,11 +22,13 @@ package org.apache.iotdb.db.mpp.sql.statement.component;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.*;
@@ -49,6 +51,8 @@ public class GroupByLevelController {
int prevSize = 0;
/** count(root.sg.d1.s1) with level = 1 -> count(root.*.d1.s1) */
private final Map<String, String> groupedPathMap;
+
+ private final Map<ColumnHeader, ColumnHeader> groupedHeaderMap;
/** count(root.*.d1.s1) -> alias */
private Map<String, String> columnToAliasMap;
/**
@@ -63,6 +67,7 @@ public class GroupByLevelController {
this.limitPaths = seriesLimit > 0 ? new HashSet<>() : null;
this.offsetPaths = seriesOffset > 0 ? new HashSet<>() : null;
this.groupedPathMap = new LinkedHashMap<>();
+ this.groupedHeaderMap = new LinkedHashMap<>();
this.levels =
((AggregationQueryStatement) queryStatement).getGroupByLevelComponent().getLevels();
}
@@ -103,11 +108,17 @@ public class GroupByLevelController {
boolean isCountStar = countWildcardIterIndices.contains(idx++);
String groupedPath =
generatePartialPathByLevel(isCountStar, paths.get(0).getNodes(), levels);
+ TSDataType dataType = paths.get(0).getSeriesType();
String rawPath = String.format("%s(%s)", functionName, paths.get(0).getFullPath());
String pathWithFunction = String.format("%s(%s)", functionName, groupedPath);
+ ColumnHeader rawPathHeader =
+ new ColumnHeader(paths.get(0).getFullPath(), functionName, dataType);
+ ColumnHeader groupedPathHeader = new ColumnHeader(groupedPath, functionName, dataType);
+
if (seriesLimit == 0 && seriesOffset == 0) {
groupedPathMap.put(rawPath, pathWithFunction);
+ groupedHeaderMap.put(rawPathHeader, groupedPathHeader);
checkAliasAndUpdateAliasMap(rawColumn, pathWithFunction);
} else {
// We cannot judge whether the path after grouping exists until we add it to set
@@ -126,6 +137,7 @@ public class GroupByLevelController {
limitPaths.remove(pathWithFunction);
} else {
groupedPathMap.put(rawPath, pathWithFunction);
+ groupedHeaderMap.put(rawPathHeader, groupedPathHeader);
checkAliasAndUpdateAliasMap(rawColumn, pathWithFunction);
}
} else {
@@ -222,4 +234,8 @@ public class GroupByLevelController {
public Map<String, String> getGroupedPathMap() {
return groupedPathMap;
}
+
+ public Map<ColumnHeader, ColumnHeader> getGroupedHeaderMap() {
+ return groupedHeaderMap;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
index 5662881af4..b81b3244e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
@@ -24,9 +24,15 @@ import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
+import org.apache.iotdb.tsfile.read.common.Path;
import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/** This class maintains information of {@code SELECT} clause. */
public class SelectComponent extends StatementNode {
@@ -148,4 +154,12 @@ public class SelectComponent extends StatementNode {
}
return deviceIdToPathsCache;
}
+
+ public List<Path> getDeduplicatedPaths() {
+ Set<Path> deduplicatedPaths = new HashSet<>();
+ for (ResultColumn resultColumn : resultColumns) {
+ deduplicatedPaths.addAll(resultColumn.collectPaths());
+ }
+ return new ArrayList<>(deduplicatedPaths);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java
index 6276f2f31f..1532c07a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java
@@ -20,13 +20,21 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
public class AggregationQueryStatement extends QueryStatement {
// GROUP BY LEVEL clause
@@ -53,11 +61,26 @@ public class AggregationQueryStatement extends QueryStatement {
return groupByLevelComponent != null && groupByLevelComponent.getLevels().length > 0;
}
+ public Map<String, Map<PartialPath, Set<AggregationType>>> getDeviceNameToAggregationsMap() {
+ Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap =
+ new HashMap<>();
+ for (ResultColumn resultColumn : getSelectComponent().getResultColumns()) {
+ FunctionExpression expression = (FunctionExpression) resultColumn.getExpression();
+ PartialPath path = expression.getPaths().get(0);
+ String functionName = expression.getFunctionName();
+ deviceNameToAggregationsMap
+ .computeIfAbsent(path.getDevice(), key -> new HashMap<>())
+ .computeIfAbsent(path, key -> new HashSet<>())
+ .add(AggregationType.valueOf(functionName.toUpperCase()));
+ }
+ return deviceNameToAggregationsMap;
+ }
+
@Override
public void selfCheck() {
super.selfCheck();
- if (!DisableAlign()) {
+ if (disableAlign()) {
throw new SemanticException("AGGREGATION doesn't support disable align clause.");
}
checkSelectComponent(selectComponent);
@@ -87,4 +110,8 @@ public class AggregationQueryStatement extends QueryStatement {
}
}
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitAggregationQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java
index 8e693d174f..da72b8e1d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
import org.apache.iotdb.db.qp.constant.SQLConstant;
@@ -47,7 +48,7 @@ public class FillQueryStatement extends QueryStatement {
public void selfCheck() {
super.selfCheck();
- if (DisableAlign()) {
+ if (disableAlign()) {
throw new SemanticException("FILL doesn't support disable align clause.");
}
@@ -70,4 +71,8 @@ public class FillQueryStatement extends QueryStatement {
throw new SemanticException("Slice query must select a single time point");
}
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitFillQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java
index 1932e2dc77..b3d87aa5dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
public class GroupByFillQueryStatement extends GroupByQueryStatement {
@@ -40,4 +41,8 @@ public class GroupByFillQueryStatement extends GroupByQueryStatement {
public void setFillComponent(FillComponent fillComponent) {
this.fillComponent = fillComponent;
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitGroupByFillQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java
index f778dede0d..73191009d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
public class GroupByQueryStatement extends AggregationQueryStatement {
@@ -40,4 +41,8 @@ public class GroupByQueryStatement extends AggregationQueryStatement {
public void setGroupByTimeComponent(GroupByTimeComponent groupByTimeComponent) {
this.groupByTimeComponent = groupByTimeComponent;
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitGroupByQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java
index a541f6db03..0faa99a2c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
@@ -42,7 +43,7 @@ public class LastQueryStatement extends QueryStatement {
throw new SemanticException("Last query doesn't support align by device.");
}
- if (DisableAlign()) {
+ if (disableAlign()) {
throw new SemanticException("Disable align cannot be applied to LAST query.");
}
@@ -53,4 +54,8 @@ public class LastQueryStatement extends QueryStatement {
}
}
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitLastQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
index 9b4336a392..3aa83a7c0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
@@ -25,15 +25,11 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.StatementType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.FromComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
-import org.apache.iotdb.db.mpp.sql.statement.component.ResultSetFormat;
-import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.sql.statement.component.*;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
/**
* Base class of SELECT statement.
@@ -235,8 +231,8 @@ public class QueryStatement extends Statement {
return resultSetFormat == ResultSetFormat.ALIGN_BY_DEVICE;
}
- public boolean DisableAlign() {
- return resultSetFormat != ResultSetFormat.DISABLE_ALIGN;
+ public boolean disableAlign() {
+ return resultSetFormat == ResultSetFormat.DISABLE_ALIGN;
}
public boolean hasTimeSeriesGeneratingFunction() {
@@ -247,6 +243,31 @@ public class QueryStatement extends Statement {
return selectComponent.isHasUserDefinedAggregationFunction();
}
+ public Map<String, Set<PartialPath>> getDeviceNameToPathsMap() {
+ Map<String, Set<PartialPath>> deviceNameToPathsMap =
+ new HashMap<>(getSelectComponent().getDeviceIdToPathsMap());
+ if (getWhereCondition() != null) {
+ for (PartialPath path :
+ getWhereCondition().getQueryFilter().getPathSet().stream()
+ .filter(SQLConstant::isNotReservedPath)
+ .collect(Collectors.toList())) {
+ deviceNameToPathsMap.computeIfAbsent(path.getDevice(), k -> new HashSet<>()).add(path);
+ }
+ }
+ return deviceNameToPathsMap;
+ }
+
+ public List<String> getSelectedPathNames() {
+ Set<String> pathSet = new HashSet<>();
+ for (ResultColumn resultColumn : getSelectComponent().getResultColumns()) {
+ pathSet.addAll(
+ resultColumn.collectPaths().stream()
+ .map(PartialPath::getFullPath)
+ .collect(Collectors.toList()));
+ }
+ return new ArrayList<>(pathSet);
+ }
+
/** semantic check */
public void selfCheck() {
if (isAlignByDevice()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
index d2475e1e35..555e15eeec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
import org.apache.iotdb.db.query.expression.Expression;
@@ -48,7 +49,7 @@ public class UDAFQueryStatement extends QueryStatement {
public void selfCheck() {
super.selfCheck();
- if (!DisableAlign()) {
+ if (!disableAlign()) {
throw new SemanticException("AGGREGATION doesn't support disable align clause.");
}
checkSelectComponent(selectComponent);
@@ -88,4 +89,8 @@ public class UDAFQueryStatement extends QueryStatement {
checkEachExpression(childExp);
}
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitUDAFQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
index 79d3c75efa..b2fb9166ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
@@ -19,9 +19,15 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+
public class UDTFQueryStatement extends QueryStatement {
public UDTFQueryStatement(QueryStatement queryStatement) {
super(queryStatement);
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitUDTFQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index cbae289964..9ca03a20b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -292,6 +292,10 @@ public class SQLConstant {
return pathStr.equals(TIME_PATH);
}
+ public static boolean isNotReservedPath(PartialPath pathStr) {
+ return !pathStr.equals(TIME_PATH);
+ }
+
public static Set<String> getNativeFunctionNames() {
return NATIVE_FUNCTION_NAMES;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index dfdd47fbd1..fc6a1d6e0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.unary.ConstantOperand;
@@ -85,8 +83,6 @@ public abstract class Expression {
public abstract void constructUdfExecutors(
Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
- public abstract void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId);
-
public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
public abstract IntermediateLayer constructIntermediateLayer(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 4e482905ca..b706a52f5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
@@ -200,11 +198,6 @@ public abstract class BinaryExpression extends Expression {
rightExpression.collectPaths(pathSet);
}
- @Override
- public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
- // TODO: support nested expressions
- }
-
@Override
public void constructUdfExecutors(
Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 1cc9043ceb..0d32cfbad5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -22,8 +22,6 @@ package org.apache.iotdb.db.query.expression.unary;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
@@ -104,11 +102,6 @@ public class ConstantOperand extends Expression {
// Do nothing
}
- @Override
- public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
- // Do nothing
- }
-
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
// Do nothing
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index ef77e955a5..564ce5bb09 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -25,9 +25,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -235,14 +232,6 @@ public class FunctionExpression extends Expression {
}
}
- @Override
- public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
- if (isBuiltInAggregationFunctionExpression) {
- planNodeSet.add(new SeriesAggregateScanNode(nodeId, this));
- }
- // TODO: support UDF
- }
-
@Override
public void constructUdfExecutors(
Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index b1183df457..66893a0172 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
@@ -129,11 +127,6 @@ public class LogicNotExpression extends Expression {
expression.constructUdfExecutors(expressionName2Executor, zoneId);
}
- @Override
- public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
- // TODO: support LogicNotExpression
- }
-
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
expression.updateStatisticsForMemoryAssigner(memoryAssigner);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index fca985cf79..04065daa55 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
@@ -128,11 +126,6 @@ public class NegationExpression extends Expression {
expression.collectPaths(pathSet);
}
- @Override
- public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
- // TODO: support nested expressions
- }
-
@Override
public void constructUdfExecutors(
Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index f33d353ce1..a9c1052c01 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -24,9 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
@@ -113,11 +110,6 @@ public class TimeSeriesOperand extends Expression {
pathSet.add(path);
}
- @Override
- public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
- planNodeSet.add(new SeriesScanNode(nodeId, path));
- }
-
@Override
public void constructUdfExecutors(
Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index b75bb0f0c0..5b553dc73f 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -171,7 +171,7 @@ public class SchemaUtils {
return tsDataTypes;
}
- public static TSDataType getSeriesTypeByPath(MeasurementPath path, String aggregation) {
+ public static TSDataType getSeriesTypeByPath(PartialPath path, String aggregation) {
TSDataType dataType = getAggregationType(aggregation);
if (dataType != null) {
return dataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index c28e07ae85..7ad38eb34f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -40,9 +40,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import com.google.common.collect.Sets;
import org.junit.Test;
import java.util.ArrayList;
@@ -59,18 +60,28 @@ public class DistributionPlannerTest {
public void TestRewriteSourceNode() throws IllegalPathException {
QueryId queryId = new QueryId("test_query");
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(
- queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+ Sets.newHashSet("s1"),
+ OrderBy.TIMESTAMP_ASC));
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
Analysis analysis = constructAnalysis();
@@ -83,18 +94,28 @@ public class DistributionPlannerTest {
@Test
public void TestAddExchangeNode() throws IllegalPathException {
QueryId queryId = new QueryId("test_query");
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(
- queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+ Sets.newHashSet("s1"),
+ OrderBy.TIMESTAMP_ASC));
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
Analysis analysis = constructAnalysis();
@@ -108,18 +129,28 @@ public class DistributionPlannerTest {
@Test
public void TestSplitFragment() throws IllegalPathException {
QueryId queryId = new QueryId("test_query");
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(
- queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+ Sets.newHashSet("s1"),
+ OrderBy.TIMESTAMP_ASC));
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
Analysis analysis = constructAnalysis();
@@ -136,18 +167,28 @@ public class DistributionPlannerTest {
@Test
public void TestParallelPlan() throws IllegalPathException {
QueryId queryId = new QueryId("test_query");
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(
- queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC));
timeJoinNode.addChild(
- new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d333.s1")));
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
+ Sets.newHashSet("s1"),
+ OrderBy.TIMESTAMP_ASC));
- LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
Analysis analysis = constructAnalysis();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 87bea82d7a..baef5cba99 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -41,8 +42,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -51,68 +50,17 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.iotdb.db.mpp.sql.plan.QueryLogicalPlanUtil.querySQLs;
+import static org.apache.iotdb.db.mpp.sql.plan.QueryLogicalPlanUtil.sqlToPlanMap;
import static org.junit.Assert.fail;
public class LogicalPlannerTest {
- LogicalPlanPrinter planPrinter = new LogicalPlanPrinter();
-
- @Before
- public void setUp() {}
-
- @Test
- @Ignore
- public void rawDataQueryTest() {
- PlanNode root =
- parseSQLToPlanNode(
- "SELECT s1,s2 FROM root.sg1.d1 WHERE time > 10 and s2 > 100 WITHOUT NULL ANY(s1) LIMIT 1 OFFSET 10");
- System.out.println(planPrinter.print(root));
- // TODO: replace all paths to full paths
- Assert.assertEquals(
- "[OffsetNode (7)]\n"
- + " │ RowOffset: 10\n"
- + " └─[LimitNode (6)]\n"
- + " │ RowLimit: 1\n"
- + " └─[FilterNullNode (5)]\n"
- + " │ FilterNullPolicy: CONTAINS_NULL\n"
- + " │ FilterNullColumnNames: [s1]\n"
- + " └─[FilterNode (4)]\n"
- + " │ QueryFilter: [and [time>10][s2>100]]\n"
- + " └─[TimeJoinNode (3)]\n"
- + " │ MergeOrder: TIMESTAMP_ASC\n"
- + " │ FilterNullPolicy: null\n"
- + " └─[SeriesScanNode (1)]\n"
- + " │ SeriesPath: s1\n"
- + " │ scanOrder: TIMESTAMP_ASC\n"
- + " [SeriesScanNode (2)]\n"
- + " │ SeriesPath: s2\n"
- + " │ scanOrder: TIMESTAMP_ASC\n",
- planPrinter.print(root));
- }
-
@Test
- @Ignore
- public void aggregationQueryTest() {
- PlanNode root =
- parseSQLToPlanNode(
- "SELECT sum(s1), avg(s2) FROM root.sg1.d1 WHERE time > 10 LIMIT 1 OFFSET 10");
- System.out.println(planPrinter.print(root));
- // TODO: replace all paths to full paths
- Assert.assertEquals(
- "[OffsetNode (6)]\n"
- + " │ RowOffset: 10\n"
- + " └─[LimitNode (5)]\n"
- + " │ RowLimit: 1\n"
- + " └─[FilterNode (4)]\n"
- + " │ QueryFilter: [time>10]\n"
- + " └─[TimeJoinNode (3)]\n"
- + " │ MergeOrder: TIMESTAMP_ASC\n"
- + " │ FilterNullPolicy: null\n"
- + " └─[SeriesAggregateScanNode (2)]\n"
- + " │ AggregateFunction: avg(s2)\n"
- + " [SeriesAggregateScanNode (1)]\n"
- + " │ AggregateFunction: sum(s1)\n",
- planPrinter.print(root));
+ public void queryPlanTest() {
+ for (String sql : querySQLs) {
+ Assert.assertEquals(sqlToPlanMap.get(sql), parseSQLToPlanNode(sql));
+ }
}
@Test
@@ -418,26 +366,6 @@ public class LogicalPlannerTest {
}
}
- private PlanNode parseSQLToPlanNode(String sql) {
- PlanNode planNode = null;
- try {
- Statement statement =
- StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
- MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
- // TODO: do analyze after implementing ISchemaFetcher and IPartitionFetcher
- // Analyzer analyzer = new Analyzer(context);
- // Analysis analysis = analyzer.analyze(statement);
- Analysis analysis = new Analysis();
- analysis.setStatement(statement);
- LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
- planNode = planner.plan(analysis).getRootNode();
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- return planNode;
- }
-
@Test
public void authorTest() throws AuthException {
@@ -583,4 +511,21 @@ public class LogicalPlannerTest {
Assert.assertNotNull(authorNode);
Assert.assertEquals(AuthorOperator.AuthorType.LIST_ROLE_USERS, authorNode.getAuthorType());
}
+
+ private PlanNode parseSQLToPlanNode(String sql) {
+ PlanNode planNode = null;
+ try {
+ Statement statement =
+ StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+ Analyzer analyzer = new Analyzer(context);
+ Analysis analysis = analyzer.analyze(statement);
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ planNode = planner.plan(analysis).getRootNode();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ return planNode;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
new file mode 100644
index 0000000000..97ab163c68
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.ValueFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+
+import org.apache.commons.compress.utils.Sets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** This class generates logical plans for test cases of the query statements. */
+public class QueryLogicalPlanUtil {
+
+ // test cases of query statement
+ public static final List<String> querySQLs = new ArrayList<>();
+
+ // key: query statement; value: expected logical plan
+ public static final Map<String, PlanNode> sqlToPlanMap = new HashMap<>();
+
+ public static final Map<String, MeasurementPath> schemaMap = new HashMap<>();
+
+ static {
+ try {
+ schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32));
+ schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.INT32));
+ schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32));
+ schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.INT32));
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /* 1. Raw Data Query */
+ static {
+ String sql =
+ "SELECT s1 FROM root.sg.* WHERE time > 100 and s2 > 10 "
+ + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 SLIMIT 1 SOFFSET 1";
+
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_0"),
+ schemaMap.get("root.sg.d1.s2"),
+ Sets.newHashSet("s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_1"),
+ schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_2"),
+ schemaMap.get("root.sg.d2.s2"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(new PlanNodeId("test_query_3"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+
+ IExpression leftExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d1.s2"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression rightExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d2.s2"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("test_query_4"),
+ timeJoinNode,
+ expression,
+ Collections.singletonList("root.sg.d2.s1"));
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("test_query_5"),
+ filterNode,
+ FilterNullPolicy.CONTAINS_NULL,
+ new ArrayList<>());
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_6"), filterNullNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_7"), limitNode, 100);
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, offsetNode);
+ }
+
+ /* 2. Raw Data Query (align by device) */
+ static {
+ String sql =
+ "SELECT * FROM root.sg.* WHERE time > 100 and s1 > 10 "
+ + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+
+ List<PlanNode> sourceNodeList1 = new ArrayList<>();
+ List<PlanNode> sourceNodeList2 = new ArrayList<>();
+ sourceNodeList1.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_0"),
+ schemaMap.get("root.sg.d1.s1"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList1.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_1"),
+ schemaMap.get("root.sg.d1.s2"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_2"),
+ schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_3"),
+ schemaMap.get("root.sg.d2.s2"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+
+ TimeJoinNode timeJoinNode1 =
+ new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+ TimeJoinNode timeJoinNode2 =
+ new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+ IExpression leftExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d1.s1"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression rightExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d2.s1"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+
+ List<String> outputColumnNames =
+ Arrays.asList("root.sg.d1.s1", "root.sg.d1.s2", "root.sg.d2.s1", "root.sg.d2.s2");
+ FilterNode filterNode1 =
+ new FilterNode(
+ new PlanNodeId("test_query_6"), timeJoinNode1, expression, outputColumnNames);
+ FilterNode filterNode2 =
+ new FilterNode(
+ new PlanNodeId("test_query_7"), timeJoinNode2, expression, outputColumnNames);
+
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("test_query_8"), OrderBy.TIMESTAMP_DESC);
+ deviceMergeNode.addChildDeviceNode("root.sg.d1", filterNode1);
+ deviceMergeNode.addChildDeviceNode("root.sg.d2", filterNode2);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("test_query_9"),
+ deviceMergeNode,
+ FilterNullPolicy.CONTAINS_NULL,
+ new ArrayList<>());
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_10"), filterNullNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_11"), limitNode, 100);
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, offsetNode);
+ }
+
+ /* 3. Aggregation Query (without value filter) */
+ static {
+ String sql =
+ "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
+ + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ List<AggregationType> aggregationTypeList1 =
+ Arrays.asList(AggregationType.COUNT, AggregationType.LAST_VALUE);
+ List<AggregationType> aggregationTypeList2 =
+ Collections.singletonList(AggregationType.MAX_VALUE);
+ Filter timeFilter = TimeFilter.gt(100);
+ sourceNodeList.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_0"),
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationTypeList1,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_1"),
+ schemaMap.get("root.sg.d1.s2"),
+ aggregationTypeList2,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_2"),
+ schemaMap.get("root.sg.d2.s1"),
+ aggregationTypeList1,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_3"),
+ schemaMap.get("root.sg.d2.s2"),
+ aggregationTypeList2,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+
+ int[] groupByLevels = new int[] {1};
+ Map<ColumnHeader, ColumnHeader> groupedHeaderMap = new HashMap<>();
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d1.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d2.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d1.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d2.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d1.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d2.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("test_query_5"), timeJoinNode, groupByLevels, groupedHeaderMap);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("test_query_6"),
+ groupByLevelNode,
+ FilterNullPolicy.CONTAINS_NULL,
+ new ArrayList<>());
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_7"), filterNullNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_8"), limitNode, 100);
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, offsetNode);
+ }
+
+ /* 4. Aggregation Query (without value filter and align by device) */
+ static {
+ String sql =
+ "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
+ + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+
+ List<PlanNode> sourceNodeList1 = new ArrayList<>();
+ List<PlanNode> sourceNodeList2 = new ArrayList<>();
+ List<AggregationType> aggregationTypeList1 =
+ Arrays.asList(AggregationType.COUNT, AggregationType.LAST_VALUE);
+ List<AggregationType> aggregationTypeList2 =
+ Collections.singletonList(AggregationType.MAX_VALUE);
+ Filter timeFilter = TimeFilter.gt(100);
+ sourceNodeList1.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_0"),
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationTypeList1,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+ sourceNodeList1.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_1"),
+ schemaMap.get("root.sg.d1.s2"),
+ aggregationTypeList2,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+ sourceNodeList2.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_2"),
+ schemaMap.get("root.sg.d2.s1"),
+ aggregationTypeList1,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+ sourceNodeList2.add(
+ new SeriesAggregateScanNode(
+ new PlanNodeId("test_query_3"),
+ schemaMap.get("root.sg.d2.s2"),
+ aggregationTypeList2,
+ OrderBy.TIMESTAMP_DESC,
+ timeFilter,
+ null));
+
+ TimeJoinNode timeJoinNode1 =
+ new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+ TimeJoinNode timeJoinNode2 =
+ new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("test_query_6"), OrderBy.TIMESTAMP_DESC);
+ deviceMergeNode.addChildDeviceNode("root.sg.d1", timeJoinNode1);
+ deviceMergeNode.addChildDeviceNode("root.sg.d2", timeJoinNode2);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("test_query_7"),
+ deviceMergeNode,
+ FilterNullPolicy.CONTAINS_NULL,
+ new ArrayList<>());
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_8"), filterNullNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_9"), limitNode, 100);
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, offsetNode);
+ }
+
+ /* 5. Aggregation Query (with value filter) */
+ static {
+ String sql =
+ "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
+ + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_0"),
+ schemaMap.get("root.sg.d1.s1"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_1"),
+ schemaMap.get("root.sg.d1.s2"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_2"),
+ schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_3"),
+ schemaMap.get("root.sg.d2.s2"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+
+ IExpression leftExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d1.s2"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression rightExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d2.s2"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+ List<String> outputColumnNames =
+ Arrays.asList("root.sg.d1.s1", "root.sg.d1.s2", "root.sg.d2.s1", "root.sg.d2.s2");
+ FilterNode filterNode =
+ new FilterNode(new PlanNodeId("test_query_5"), timeJoinNode, expression, outputColumnNames);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ aggregateFuncMap.put(
+ schemaMap.get("root.sg.d1.s1"),
+ Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+ aggregateFuncMap.put(
+ schemaMap.get("root.sg.d1.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+ aggregateFuncMap.put(
+ schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+ aggregateFuncMap.put(
+ schemaMap.get("root.sg.d2.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("test_query_6"), filterNode, aggregateFuncMap, null);
+
+ int[] groupByLevels = new int[] {1};
+ Map<ColumnHeader, ColumnHeader> groupedHeaderMap = new HashMap<>();
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d1.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d2.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d1.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d2.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d1.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+ groupedHeaderMap.put(
+ new ColumnHeader("root.sg.d2.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+ new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("test_query_5"), aggregateNode, groupByLevels, groupedHeaderMap);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("test_query_7"),
+ groupByLevelNode,
+ FilterNullPolicy.CONTAINS_NULL,
+ new ArrayList<>());
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_8"), filterNullNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_9"), limitNode, 100);
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, offsetNode);
+ }
+
+ /* 6. Aggregation Query (with value filter and align by device) */
+ static {
+ String sql =
+ "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
+ + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+
+ List<PlanNode> sourceNodeList1 = new ArrayList<>();
+ List<PlanNode> sourceNodeList2 = new ArrayList<>();
+ sourceNodeList1.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_0"),
+ schemaMap.get("root.sg.d1.s1"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList1.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_1"),
+ schemaMap.get("root.sg.d1.s2"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_2"),
+ schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+ sourceNodeList2.add(
+ new SeriesScanNode(
+ new PlanNodeId("test_query_3"),
+ schemaMap.get("root.sg.d2.s2"),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_DESC));
+
+ TimeJoinNode timeJoinNode1 =
+ new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+ TimeJoinNode timeJoinNode2 =
+ new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+ IExpression leftExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d1.s2"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression rightExpression =
+ new SingleSeriesExpression(
+ schemaMap.get("root.sg.d2.s2"),
+ FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+ IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+
+ List<String> outputColumnNames =
+ Arrays.asList("root.sg.d1.s1", "root.sg.d1.s2", "root.sg.d2.s1", "root.sg.d2.s2");
+ FilterNode filterNode1 =
+ new FilterNode(
+ new PlanNodeId("test_query_6"), timeJoinNode1, expression, outputColumnNames);
+ FilterNode filterNode2 =
+ new FilterNode(
+ new PlanNodeId("test_query_7"), timeJoinNode2, expression, outputColumnNames);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap1 = new HashMap<>();
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap2 = new HashMap<>();
+ aggregateFuncMap1.put(
+ schemaMap.get("root.sg.d1.s1"),
+ Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+ aggregateFuncMap1.put(
+ schemaMap.get("root.sg.d1.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+ aggregateFuncMap2.put(
+ schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+ aggregateFuncMap2.put(
+ schemaMap.get("root.sg.d2.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+ AggregateNode aggregateNode1 =
+ new AggregateNode(new PlanNodeId("test_query_8"), filterNode1, aggregateFuncMap1, null);
+ AggregateNode aggregateNode2 =
+ new AggregateNode(new PlanNodeId("test_query_9"), filterNode2, aggregateFuncMap2, null);
+
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("test_query_10"), OrderBy.TIMESTAMP_DESC);
+ deviceMergeNode.addChildDeviceNode("root.sg.d1", aggregateNode1);
+ deviceMergeNode.addChildDeviceNode("root.sg.d2", aggregateNode2);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("test_query_11"),
+ deviceMergeNode,
+ FilterNullPolicy.CONTAINS_NULL,
+ new ArrayList<>());
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_12"), filterNullNode, 100);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_13"), limitNode, 100);
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, offsetNode);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
index 7b660d6570..8ae271f089 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import java.io.Serializable;
+import java.util.Objects;
public abstract class BinaryExpression implements IBinaryExpression, Serializable {
@@ -83,6 +84,25 @@ public abstract class BinaryExpression implements IBinaryExpression, Serializabl
public String toString() {
return "[" + left + " && " + right + "]";
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AndExpression that = (AndExpression) o;
+ return Objects.equals(toString(), that.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(toString());
+ }
}
protected static class OrExpression extends BinaryExpression {
@@ -129,5 +149,24 @@ public abstract class BinaryExpression implements IBinaryExpression, Serializabl
public String toString() {
return "[" + left + " || " + right + "]";
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ OrExpression that = (OrExpression) o;
+ return Objects.equals(toString(), that.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(toString());
+ }
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
index 3c07c061f0..ac2fd776c0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.Serializable;
+import java.util.Objects;
public class GlobalTimeExpression implements IUnaryExpression, Serializable {
@@ -58,4 +59,23 @@ public class GlobalTimeExpression implements IUnaryExpression, Serializable {
public String toString() {
return "[" + this.filter + "]";
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GlobalTimeExpression that = (GlobalTimeExpression) o;
+ return Objects.equals(toString(), that.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(toString());
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
index f7b603beeb..1b7fb1888c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.Serializable;
+import java.util.Objects;
public class SingleSeriesExpression implements IUnaryExpression, Serializable {
@@ -69,4 +70,23 @@ public class SingleSeriesExpression implements IUnaryExpression, Serializable {
public void setSeriesPath(Path seriesPath) {
this.seriesPath = seriesPath;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SingleSeriesExpression that = (SingleSeriesExpression) o;
+ return Objects.equals(toString(), that.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(toString());
+ }
}