You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/09/26 08:33:05 UTC
[iotdb] 03/04: refactor LogicalPlanner
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/RefactorAnalyzer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b880760c6466d7b6b6b05c97f6fe9c5bb782ff54
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Sep 26 16:31:33 2022 +0800
refactor LogicalPlanner
---
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 132 ++++-------
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 252 ++++++---------------
2 files changed, 119 insertions(+), 265 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index ba0ff63e5d..997386a4d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -97,7 +97,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
@@ -200,18 +199,16 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planAggregationSource(
- Set<Expression> sourceExpressions,
AggregationStep curStep,
Ordering scanOrder,
Filter timeFilter,
GroupByTimeParameter groupByTimeParameter,
Set<Expression> aggregationExpressions,
- Set<Expression> aggregationTransformExpressions,
Map<Expression, Set<Expression>> groupByLevelExpressions) {
boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
- for (Expression sourceExpression : sourceExpressions) {
+ for (Expression sourceExpression : aggregationExpressions) {
createAggregationDescriptor(
(FunctionExpression) sourceExpression,
curStep,
@@ -228,73 +225,7 @@ public class LogicalPlanBuilder {
scanOrder,
timeFilter,
groupByTimeParameter);
- updateTypeProvider(sourceExpressions);
- updateTypeProvider(aggregationTransformExpressions);
-
- return convergeAggregationSource(
- sourceNodeList,
- curStep,
- scanOrder,
- groupByTimeParameter,
- aggregationExpressions,
- groupByLevelExpressions);
- }
-
- public LogicalPlanBuilder planAggregationSourceWithIndexAdjust(
- Set<Expression> sourceExpressions,
- AggregationStep curStep,
- Ordering scanOrder,
- Filter timeFilter,
- GroupByTimeParameter groupByTimeParameter,
- List<Integer> measurementIndexes,
- Set<Expression> aggregationExpressions,
- Set<Expression> aggregationTransformExpressions,
- Map<Expression, Set<Expression>> groupByLevelExpressions) {
- checkArgument(
- sourceExpressions.size() == measurementIndexes.size(),
- "Each aggregate should correspond to a column of output.");
-
- boolean needCheckAscending = groupByTimeParameter == null;
- Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
- Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
- Map<AggregationDescriptor, Integer> aggregationToMeasurementIndexMap = new HashMap<>();
-
- int index = 0;
- for (Expression sourceExpression : sourceExpressions) {
- AggregationDescriptor aggregationDescriptor =
- createAggregationDescriptor(
- (FunctionExpression) sourceExpression,
- curStep,
- scanOrder,
- needCheckAscending,
- ascendingAggregations,
- descendingAggregations);
- aggregationToMeasurementIndexMap.put(aggregationDescriptor, measurementIndexes.get(index));
- index++;
- }
-
- List<PlanNode> sourceNodeList =
- constructSourceNodeFromAggregationDescriptors(
- ascendingAggregations,
- descendingAggregations,
- scanOrder,
- timeFilter,
- groupByTimeParameter);
- updateTypeProvider(sourceExpressions);
- updateTypeProvider(aggregationTransformExpressions);
-
- if (!curStep.isOutputPartial()) {
- // update measurementIndexes
- measurementIndexes.clear();
- measurementIndexes.addAll(
- sourceNodeList.stream()
- .map(
- planNode ->
- ((SeriesAggregationSourceNode) planNode).getAggregationDescriptorList())
- .flatMap(List::stream)
- .map(aggregationToMeasurementIndexMap::get)
- .collect(Collectors.toList()));
- }
+ updateTypeProvider(aggregationExpressions);
return convergeAggregationSource(
sourceNodeList,
@@ -305,7 +236,7 @@ public class LogicalPlanBuilder {
groupByLevelExpressions);
}
- private AggregationDescriptor createAggregationDescriptor(
+ private void createAggregationDescriptor(
FunctionExpression sourceExpression,
AggregationStep curStep,
Ordering scanOrder,
@@ -331,7 +262,6 @@ public class LogicalPlanBuilder {
.computeIfAbsent(selectPath, key -> new ArrayList<>())
.add(aggregationDescriptor);
}
- return aggregationDescriptor;
}
private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
@@ -458,11 +388,13 @@ public class LogicalPlanBuilder {
public LogicalPlanBuilder planDeviceView(
Map<String, PlanNode> deviceNameToSourceNodesMap,
- Set<Expression> deviceViewOutput,
+ Set<Expression> deviceViewOutputExpressions,
Map<String, List<Integer>> deviceToMeasurementIndexesMap,
Ordering mergeOrder) {
List<String> outputColumnNames =
- deviceViewOutput.stream().map(Expression::getExpressionString).collect(Collectors.toList());
+ deviceViewOutputExpressions.stream()
+ .map(Expression::getExpressionString)
+ .collect(Collectors.toList());
DeviceViewNode deviceViewNode =
new DeviceViewNode(
context.getQueryId().genPlanNodeId(),
@@ -480,7 +412,7 @@ public class LogicalPlanBuilder {
}
context.getTypeProvider().setType(COLUMN_DEVICE, TSDataType.TEXT);
- updateTypeProvider(deviceViewOutput);
+ updateTypeProvider(deviceViewOutputExpressions);
this.root = deviceViewNode;
return this;
@@ -488,7 +420,6 @@ public class LogicalPlanBuilder {
public LogicalPlanBuilder planGroupByLevel(
Map<Expression, Set<Expression>> groupByLevelExpressions,
- AggregationStep curStep,
GroupByTimeParameter groupByTimeParameter,
Ordering scanOrder) {
if (groupByLevelExpressions == null) {
@@ -499,7 +430,7 @@ public class LogicalPlanBuilder {
createGroupByTLevelNode(
Collections.singletonList(this.getRoot()),
groupByLevelExpressions,
- curStep,
+ AggregationStep.FINAL,
groupByTimeParameter,
scanOrder);
return this;
@@ -641,12 +572,12 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planFilterAndTransform(
- Expression queryFilter,
+ Expression filterExpression,
Set<Expression> selectExpressions,
boolean isGroupByTime,
ZoneId zoneId,
Ordering scanOrder) {
- if (queryFilter == null || selectExpressions.isEmpty()) {
+ if (filterExpression == null || selectExpressions.isEmpty()) {
return this;
}
@@ -655,7 +586,7 @@ public class LogicalPlanBuilder {
context.getQueryId().genPlanNodeId(),
this.getRoot(),
selectExpressions.toArray(new Expression[0]),
- queryFilter,
+ filterExpression,
isGroupByTime,
zoneId,
scanOrder);
@@ -664,12 +595,9 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planTransform(
- Set<Expression> transformExpressions,
- boolean isGroupByTime,
- ZoneId zoneId,
- Ordering scanOrder) {
+ Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId, Ordering scanOrder) {
boolean needTransform = false;
- for (Expression expression : transformExpressions) {
+ for (Expression expression : selectExpressions) {
if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
needTransform = true;
break;
@@ -683,11 +611,11 @@ public class LogicalPlanBuilder {
new TransformNode(
context.getQueryId().genPlanNodeId(),
this.getRoot(),
- transformExpressions.toArray(new Expression[0]),
+ selectExpressions.toArray(new Expression[0]),
isGroupByTime,
zoneId,
scanOrder);
- updateTypeProvider(transformExpressions);
+ updateTypeProvider(selectExpressions);
return this;
}
@@ -720,6 +648,34 @@ public class LogicalPlanBuilder {
return this;
}
+ public LogicalPlanBuilder planHaving(
+ Expression havingExpression,
+ Set<Expression> selectExpressions,
+ boolean isGroupByTime,
+ ZoneId zoneId,
+ Ordering scanOrder) {
+ if (havingExpression != null) {
+ return planFilterAndTransform(
+ havingExpression, selectExpressions, isGroupByTime, zoneId, scanOrder);
+ } else {
+ return planTransform(selectExpressions, isGroupByTime, zoneId, scanOrder);
+ }
+ }
+
+ public LogicalPlanBuilder planWhereAndSourceTransform(
+ Expression whereExpression,
+ Set<Expression> sourceTransformExpressions,
+ boolean isGroupByTime,
+ ZoneId zoneId,
+ Ordering scanOrder) {
+ if (whereExpression != null) {
+ return planFilterAndTransform(
+ whereExpression, sourceTransformExpressions, isGroupByTime, zoneId, scanOrder);
+ } else {
+ return planTransform(sourceTransformExpressions, isGroupByTime, zoneId, scanOrder);
+ }
+ }
+
/** Meta Query* */
public LogicalPlanBuilder planTimeSeriesSchemaSource(
PartialPath pathPattern,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index d3e99cd82f..4e2f0af2d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -67,7 +67,6 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplate
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -112,18 +111,12 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
subPlanBuilder.withNewRoot(
visitQueryBody(
queryStatement,
- analysis.getDeviceToIsRawDataSource().get(deviceName),
analysis.getDeviceToSourceExpressions().get(deviceName),
- analysis.getDeviceToAggregationExpressions().get(deviceName),
analysis.getDeviceToSourceTransformExpressions().get(deviceName),
- analysis.hasValueFilter()
- ? analysis.getDeviceToSourceExpressions().get(deviceName)
- : Collections.emptySet(),
analysis.getDeviceToWhereExpression() != null
? analysis.getDeviceToWhereExpression().get(deviceName)
: null,
- null,
- analysis.getDeviceToMeasurementIndexesMap().get(deviceName),
+ analysis.getDeviceToAggregationExpressions().get(deviceName),
context));
deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
}
@@ -131,42 +124,34 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
planBuilder =
planBuilder.planDeviceView(
deviceToSubPlanMap,
- analysis.getTransformInput(),
+ analysis.getDeviceViewOutputExpressions(),
analysis.getDeviceToMeasurementIndexesMap(),
queryStatement.getResultTimeOrder());
- if (queryStatement.hasHaving()) {
- planBuilder.planFilterAndTransform(
- analysis.getHavingExpression(),
- analysis.getTransformOutput(),
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- } else {
- planBuilder.planTransform(
- analysis.getTransformOutput(),
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- }
} else {
planBuilder =
planBuilder.withNewRoot(
visitQueryBody(
queryStatement,
- analysis.isRawDataSource(),
analysis.getSourceExpressions(),
- analysis.getAggregationExpressions(),
analysis.getSourceTransformExpressions(),
- analysis.getSelectExpressions(),
analysis.getWhereExpression(),
- analysis.getHavingExpression(),
- null,
+ analysis.getAggregationExpressions(),
context));
}
- // other common upstream node
+ // other upstream node
planBuilder =
planBuilder
+ .planGroupByLevel(
+ analysis.getGroupByLevelExpressions(),
+ analysis.getGroupByTimeParameter(),
+ queryStatement.getResultTimeOrder())
+ .planHaving(
+ analysis.getHavingExpression(),
+ analysis.getSelectExpressions(),
+ queryStatement.isGroupByTime(),
+ queryStatement.getSelectComponent().getZoneId(),
+ queryStatement.getResultTimeOrder())
.planFill(analysis.getFillDescriptor(), queryStatement.getResultTimeOrder())
.planOffset(queryStatement.getRowOffset())
.planLimit(queryStatement.getRowLimit());
@@ -176,48 +161,51 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitQueryBody(
QueryStatement queryStatement,
- boolean isRawDataSource,
Set<Expression> sourceExpressions,
+ Set<Expression> sourceTransformExpressions,
+ Expression whereExpression,
Set<Expression> aggregationExpressions,
- Set<Expression> aggregationTransformExpressions,
- Set<Expression> transformExpressions,
- Expression queryFilter,
- Expression havingExpression,
- List<Integer> measurementIndexes, // only used in ALIGN BY DEVICE
MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
- // plan data source node
- if (isRawDataSource) {
+ if (aggregationExpressions == null) {
+ // raw data query
planBuilder =
- planBuilder.planRawDataSource(
- sourceExpressions,
- queryStatement.getResultTimeOrder(),
- analysis.getGlobalTimeFilter());
-
- if (queryStatement.isAggregationQuery()) {
- if (analysis.hasValueFilter()) {
- planBuilder =
- planBuilder.planFilterAndTransform(
- queryFilter,
- aggregationTransformExpressions,
+ planBuilder
+ .planRawDataSource(
+ sourceExpressions,
+ queryStatement.getResultTimeOrder(),
+ analysis.getGlobalTimeFilter())
+ .planWhereAndSourceTransform(
+ whereExpression,
+ sourceTransformExpressions,
queryStatement.isGroupByTime(),
queryStatement.getSelectComponent().getZoneId(),
queryStatement.getResultTimeOrder());
- } else {
- planBuilder =
- planBuilder.planTransform(
- aggregationTransformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- }
+ } else {
+ // aggregation query
+ boolean isRawDataSource =
+ (whereExpression != null) || needTransform(sourceTransformExpressions);
+ AggregationStep curStep;
+ if (isRawDataSource) {
+ planBuilder =
+ planBuilder
+ .planRawDataSource(
+ sourceExpressions,
+ queryStatement.getResultTimeOrder(),
+ analysis.getGlobalTimeFilter())
+ .planWhereAndSourceTransform(
+ whereExpression,
+ sourceTransformExpressions,
+ queryStatement.isGroupByTime(),
+ queryStatement.getSelectComponent().getZoneId(),
+ queryStatement.getResultTimeOrder());
boolean outputPartial =
queryStatement.isGroupByLevel()
|| (queryStatement.isGroupByTime()
&& analysis.getGroupByTimeParameter().hasOverlap());
- AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE;
+ curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE;
planBuilder =
planBuilder.planAggregation(
aggregationExpressions,
@@ -225,147 +213,57 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
curStep,
queryStatement.getResultTimeOrder());
- if (curStep.isOutputPartial()) {
- if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
- curStep =
- queryStatement.isGroupByLevel()
- ? AggregationStep.INTERMEDIATE
- : AggregationStep.FINAL;
- planBuilder =
- planBuilder.planSlidingWindowAggregation(
- aggregationExpressions,
- analysis.getGroupByTimeParameter(),
- curStep,
- queryStatement.getResultTimeOrder());
- }
-
- if (queryStatement.isGroupByLevel()) {
- curStep = AggregationStep.FINAL;
- planBuilder =
- planBuilder.planGroupByLevel(
- analysis.getGroupByLevelExpressions(),
- curStep,
- analysis.getGroupByTimeParameter(),
- queryStatement.getResultTimeOrder());
- }
+ if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
+ curStep =
+ queryStatement.isGroupByLevel()
+ ? AggregationStep.INTERMEDIATE
+ : AggregationStep.FINAL;
+ planBuilder =
+ planBuilder.planSlidingWindowAggregation(
+ aggregationExpressions,
+ analysis.getGroupByTimeParameter(),
+ curStep,
+ queryStatement.getResultTimeOrder());
}
if (queryStatement.isGroupByLevel()) {
- planBuilder = // plan Having with GroupByLevel
- planBuilder.planFilterAndTransform(
- havingExpression,
- analysis.getGroupByLevelExpressions().keySet(),
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- } else {
- if (havingExpression != null) {
- planBuilder = // plan Having without GroupByLevel
- planBuilder.planFilterAndTransform(
- havingExpression,
- transformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- } else {
- planBuilder = // no Having
- planBuilder.planTransform(
- transformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- }
- }
- } else {
- if (analysis.hasValueFilter()) {
planBuilder =
- planBuilder.planFilterAndTransform(
- queryFilter,
- transformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- } else {
- planBuilder =
- planBuilder.planTransform(
- transformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
+ planBuilder.planGroupByLevel(
+ analysis.getGroupByLevelExpressions(),
+ analysis.getGroupByTimeParameter(),
queryStatement.getResultTimeOrder());
}
- }
- } else {
- AggregationStep curStep =
- (analysis.getGroupByLevelExpressions() != null
- || (analysis.getGroupByTimeParameter() != null
- && analysis.getGroupByTimeParameter().hasOverlap()))
- ? AggregationStep.PARTIAL
- : AggregationStep.SINGLE;
-
- boolean needTransform = false;
- for (Expression expression : transformExpressions) {
- if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
- needTransform = true;
- break;
- }
- }
-
- if (!needTransform && measurementIndexes != null) {
- planBuilder =
- planBuilder.planAggregationSourceWithIndexAdjust(
- sourceExpressions,
- curStep,
- queryStatement.getResultTimeOrder(),
- analysis.getGlobalTimeFilter(),
- analysis.getGroupByTimeParameter(),
- measurementIndexes,
- aggregationExpressions,
- aggregationTransformExpressions,
- analysis.getGroupByLevelExpressions());
} else {
+ curStep =
+ (analysis.getGroupByLevelExpressions() != null
+ || (analysis.getGroupByTimeParameter() != null
+ && analysis.getGroupByTimeParameter().hasOverlap()))
+ ? AggregationStep.PARTIAL
+ : AggregationStep.SINGLE;
+
planBuilder =
planBuilder.planAggregationSource(
- sourceExpressions,
curStep,
queryStatement.getResultTimeOrder(),
analysis.getGlobalTimeFilter(),
analysis.getGroupByTimeParameter(),
aggregationExpressions,
- aggregationTransformExpressions,
analysis.getGroupByLevelExpressions());
-
- if (queryStatement.isGroupByLevel()) {
- planBuilder = // plan Having with GroupByLevel
- planBuilder.planFilterAndTransform(
- havingExpression,
- analysis.getGroupByLevelExpressions().keySet(),
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- } else {
- if (havingExpression != null) {
- planBuilder = // plan Having without GroupByLevel
- planBuilder.planFilterAndTransform(
- havingExpression,
- transformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- } else {
- planBuilder = // no Having
- planBuilder.planTransform(
- transformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId(),
- queryStatement.getResultTimeOrder());
- }
- }
}
}
return planBuilder.getRoot();
}
+ private boolean needTransform(Set<Expression> expressions) {
+ for (Expression expression : expressions) {
+ if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public PlanNode visitCreateTimeseries(
CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {