You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2024/04/23 16:09:23 UTC
(iotdb) 01/03: finish
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggregationPushDown
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit edb5a4950ad3a163cfe378b0cb1a5aef2c5526b8
Author: liuminghui233 <54...@qq.com>
AuthorDate: Tue Apr 23 23:56:10 2024 +0800
finish
---
.../plan/optimization/AggregationPushDown.java | 303 ++++++++++++++++
.../plan/planner/LogicalPlanBuilder.java | 397 +--------------------
.../plan/planner/LogicalPlanVisitor.java | 181 +++-------
.../queryengine/plan/planner/LogicalPlanner.java | 6 +-
.../plan/planner/OperatorTreeGenerator.java | 248 +++++++------
.../planner/distribution/ExchangeNodeAdder.java | 6 +
.../plan/planner/plan/node/PlanNodeType.java | 5 +
.../plan/planner/plan/node/PlanVisitor.java | 5 +
.../plan/node/process/RawDataAggregationNode.java | 322 +++++++++++++++++
9 files changed, 828 insertions(+), 645 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
new file mode 100644
index 00000000000..77d13471a95
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.optimization;
+
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils;
+import org.apache.iotdb.db.utils.SchemaUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
+
+public class AggregationPushDown implements PlanOptimizer {
+
+ @Override
+ public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) {
+ if (analysis.getStatement().getType() != StatementType.QUERY) {
+ return plan;
+ }
+ QueryStatement queryStatement = analysis.getQueryStatement();
+ if (!queryStatement.isAggregationQuery()
+ || (queryStatement.isGroupBy() && !queryStatement.isGroupByTime())
+ || cannotUseStatistics(queryStatement, analysis)) {
+ return plan;
+ }
+ return plan.accept(
+ new Rewriter(), new RewriterContext(analysis, context, queryStatement.isAlignByDevice()));
+ }
+
+ private boolean cannotUseStatistics(QueryStatement queryStatement, Analysis analysis) {
+ boolean isAlignByDevice = queryStatement.isAlignByDevice();
+ if (isAlignByDevice) {
+ // check any of the devices
+ String device = analysis.getDeviceList().get(0).getDevice();
+ return cannotUseStatistics(
+ analysis.getDeviceToAggregationExpressions().get(device),
+ analysis.getDeviceToSourceTransformExpressions().get(device));
+ } else {
+ return cannotUseStatistics(
+ analysis.getAggregationExpressions(), analysis.getSourceTransformExpressions());
+ }
+ }
+
+ private boolean cannotUseStatistics(
+ Set<Expression> aggregationExpressions, Set<Expression> sourceTransformExpressions) {
+ for (Expression expression : aggregationExpressions) {
+
+ if (expression instanceof FunctionExpression) {
+ FunctionExpression functionExpression = (FunctionExpression) expression;
+ // Disable statistics optimization of UDAF for now
+ if (functionExpression.isExternalAggregationFunctionExpression()) {
+ return true;
+ }
+
+ if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) {
+ String alignedDeviceId = "";
+ for (Expression countTimeExpression : sourceTransformExpressions) {
+ TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression;
+ if (!(ts.getPath() instanceof AlignedPath
+ || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) {
+ return true;
+ }
+ if (StringUtils.isEmpty(alignedDeviceId)) {
+ alignedDeviceId = ts.getPath().getDevice();
+ } else if (!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) {
+ // count_time from only one aligned device can use AlignedSeriesAggScan
+ return true;
+ }
+ }
+ return false;
+ }
+
+ if (!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName())) {
+ return true;
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Invalid Aggregation Expression: %s", expression.getExpressionString()));
+ }
+ }
+ return false;
+ }
+
+ private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> {
+
+ @Override
+ public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+ throw new IllegalArgumentException("Unexpected plan node: " + node);
+ }
+
+ @Override
+ public PlanNode visitSingleChildProcess(SingleChildProcessNode node, RewriterContext context) {
+ PlanNode rewrittenChild = node.getChild().accept(this, context);
+ node.setChild(rewrittenChild);
+ return node;
+ }
+
+ @Override
+ public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) {
+ List<PlanNode> rewrittenChildren = new ArrayList<>();
+ for (PlanNode child : node.getChildren()) {
+ rewrittenChildren.add(child.accept(this, context));
+ }
+ node.setChildren(rewrittenChildren);
+ return node;
+ }
+
+ @Override
+ public PlanNode visitRawDataAggregation(RawDataAggregationNode node, RewriterContext context) {
+ PlanNode child = node.getChild();
+ if (child instanceof FullOuterTimeJoinNode || child instanceof SeriesScanSourceNode) {
+ if (child instanceof SeriesScanSourceNode) {
+ // only one source, check partition
+ }
+
+ List<AggregationDescriptor> aggregationDescriptorList = node.getAggregationDescriptorList();
+
+ boolean needCheckAscending = node.getGroupByTimeParameter() == null;
+ Map<PartialPath, List<AggregationDescriptor>> sourceToAscendingAggregationsMap =
+ new HashMap<>();
+ Map<PartialPath, List<AggregationDescriptor>> sourceToDescendingAggregationsMap =
+ new HashMap<>();
+ Map<PartialPath, List<AggregationDescriptor>> sourceToCountTimeAggregationsMap =
+ new HashMap<>();
+ for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
+ checkArgument(
+ aggregationDescriptor.getInputExpressions().size() == 1
+ && aggregationDescriptor.getInputExpressions().get(0)
+ instanceof TimeSeriesOperand);
+ PartialPath path =
+ ((TimeSeriesOperand) aggregationDescriptor.getInputExpressions().get(0)).getPath();
+ if (aggregationDescriptor.getAggregationType().equals(TAggregationType.COUNT_TIME)) {
+ sourceToCountTimeAggregationsMap
+ .computeIfAbsent(path, key -> new ArrayList<>())
+ .add(aggregationDescriptor);
+ } else if (SchemaUtils.isConsistentWithScanOrder(
+ aggregationDescriptor.getAggregationType(), node.getScanOrder())) {
+ sourceToAscendingAggregationsMap
+ .computeIfAbsent(path, key -> new ArrayList<>())
+ .add(aggregationDescriptor);
+ } else {
+ sourceToDescendingAggregationsMap
+ .computeIfAbsent(path, key -> new ArrayList<>())
+ .add(aggregationDescriptor);
+ }
+ }
+
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ Map<PartialPath, List<AggregationDescriptor>> groupedSourceToAscendingAggregations;
+ if (!sourceToCountTimeAggregationsMap.isEmpty()) {
+ groupedSourceToAscendingAggregations = sourceToCountTimeAggregationsMap;
+ } else {
+ groupedSourceToAscendingAggregations =
+ MetaUtils.groupAlignedAggregations(sourceToAscendingAggregationsMap);
+ }
+ for (Map.Entry<PartialPath, List<AggregationDescriptor>> sourceAggregationsEntry :
+ groupedSourceToAscendingAggregations.entrySet()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ sourceAggregationsEntry.getKey(),
+ sourceAggregationsEntry.getValue(),
+ node.getScanOrder(),
+ node.getGroupByTimeParameter(),
+ context));
+ }
+ if (needCheckAscending) {
+ Map<PartialPath, List<AggregationDescriptor>> groupedSourceToDescendingAggregations =
+ MetaUtils.groupAlignedAggregations(sourceToDescendingAggregationsMap);
+ for (Map.Entry<PartialPath, List<AggregationDescriptor>> sourceAggregationsEntry :
+ groupedSourceToDescendingAggregations.entrySet()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ sourceAggregationsEntry.getKey(),
+ sourceAggregationsEntry.getValue(),
+ node.getScanOrder().reverse(),
+ null,
+ context));
+ }
+ }
+
+ PlanNode resultNode = convergeWithTimeJoin(sourceNodeList, node.getScanOrder(), context);
+ resultNode = planProject(resultNode, node.getChild(), context);
+ return resultNode;
+ }
+ // cannot push down
+ return node;
+ }
+
+ private SeriesAggregationSourceNode createAggregationScanNode(
+ PartialPath selectPath,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ Ordering scanOrder,
+ GroupByTimeParameter groupByTimeParameter,
+ RewriterContext context) {
+ if (selectPath instanceof MeasurementPath) { // non-aligned series
+ return new SeriesAggregationScanNode(
+ context.genPlanNodeId(),
+ (MeasurementPath) selectPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter);
+ } else if (selectPath instanceof AlignedPath) { // aligned series
+ return new AlignedSeriesAggregationScanNode(
+ context.genPlanNodeId(),
+ (AlignedPath) selectPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter);
+ } else {
+ throw new IllegalArgumentException("unexpected path type");
+ }
+ }
+
+ private PlanNode convergeWithTimeJoin(
+ List<PlanNode> sourceNodes, Ordering mergeOrder, RewriterContext context) {
+ PlanNode tmpNode;
+ if (sourceNodes.size() == 1) {
+ tmpNode = sourceNodes.get(0);
+ } else {
+ tmpNode = new FullOuterTimeJoinNode(context.genPlanNodeId(), mergeOrder, sourceNodes);
+ }
+ return tmpNode;
+ }
+
+ private PlanNode planProject(PlanNode resultNode, PlanNode rawNode, RewriterContext context) {
+ if (context.isAlignByDevice()
+ && !rawNode.getOutputColumnNames().equals(resultNode.getOutputColumnNames())) {
+ return new ProjectNode(context.genPlanNodeId(), resultNode, rawNode.getOutputColumnNames());
+ }
+ return resultNode;
+ }
+ }
+
+ private static class RewriterContext {
+
+ private final QueryId queryId;
+ private final boolean isAlignByDevice;
+
+ public RewriterContext(Analysis analysis, MPPQueryContext context, boolean isAlignByDevice) {
+ this.queryId = context.getQueryId();
+ this.isAlignByDevice = isAlignByDevice;
+ }
+
+ public PlanNodeId genPlanNodeId() {
+ return queryId.genPlanNodeId();
+ }
+
+ public boolean isAlignByDevice() {
+ return isAlignByDevice;
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 15b890cb2ec..1f4dbb34b07 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
@@ -56,19 +55,18 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -78,11 +76,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullO
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
@@ -103,8 +98,6 @@ import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGeneratorParameter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.commons.lang3.Validate;
@@ -113,7 +106,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -123,14 +115,12 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_VALUE_USE_TOP_K;
import static org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy.LINEAR;
-import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE;
import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME;
@@ -378,7 +368,7 @@ public class LogicalPlanBuilder {
.planRawDataSource(
sourceExpressions, Ordering.DESC, 0, 0, analysis.isLastLevelUseWildcard())
.planWhereAndSourceTransform(null, sourceTransformExpressions, false, Ordering.DESC)
- .planAggregation(
+ .planRawDataAggregation(
new LinkedHashSet<>(Arrays.asList(maxTimeAgg, lastValueAgg)),
null,
analysis.getGroupByTimeParameter(),
@@ -397,304 +387,6 @@ public class LogicalPlanBuilder {
}
}
- public LogicalPlanBuilder planAggregationSource(
- AggregationStep curStep,
- Ordering scanOrder,
- GroupByTimeParameter groupByTimeParameter,
- Set<Expression> aggregationExpressions,
- Set<Expression> sourceTransformExpressions,
- Map<Expression, Set<Expression>> crossGroupByAggregations,
- List<String> tagKeys,
- Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
- tagValuesToGroupedTimeseriesOperands) {
- boolean needCheckAscending = groupByTimeParameter == null;
- Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
- Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
- Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new HashMap<>();
- for (Expression aggregationExpression : aggregationExpressions) {
- createAggregationDescriptor(
- (FunctionExpression) aggregationExpression,
- curStep,
- scanOrder,
- needCheckAscending,
- ascendingAggregations,
- descendingAggregations,
- countTimeAggregations);
- }
-
- List<PlanNode> sourceNodeList =
- constructSourceNodeFromAggregationDescriptors(
- ascendingAggregations,
- descendingAggregations,
- countTimeAggregations,
- scanOrder,
- groupByTimeParameter);
- updateTypeProvider(aggregationExpressions);
- updateTypeProvider(sourceTransformExpressions);
-
- return convergeAggregationSource(
- sourceNodeList,
- curStep,
- scanOrder,
- groupByTimeParameter,
- aggregationExpressions,
- crossGroupByAggregations,
- tagKeys,
- tagValuesToGroupedTimeseriesOperands);
- }
-
- public LogicalPlanBuilder planAggregationSourceWithIndexAdjust(
- AggregationStep curStep,
- Ordering scanOrder,
- GroupByTimeParameter groupByTimeParameter,
- Set<Expression> aggregationExpressions,
- Set<Expression> sourceTransformExpressions,
- Map<Expression, Set<Expression>> crossGroupByExpressions,
- List<Integer> deviceViewInputIndexes,
- boolean outputEndTime) {
- checkArgument(
- aggregationExpressions.size() <= deviceViewInputIndexes.size(),
- "Each aggregate should correspond to a column of output.");
-
- boolean needCheckAscending = groupByTimeParameter == null;
- Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
- Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
- Map<AggregationDescriptor, Integer> aggregationToIndexMap = new HashMap<>();
- Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new HashMap<>();
-
- // If need output endTime, the first index is used by __endTime
- int index = outputEndTime ? 1 : 0;
- for (Expression aggregationExpression : aggregationExpressions) {
- AggregationDescriptor aggregationDescriptor =
- createAggregationDescriptor(
- (FunctionExpression) aggregationExpression,
- curStep,
- scanOrder,
- needCheckAscending,
- ascendingAggregations,
- descendingAggregations,
- countTimeAggregations);
- aggregationToIndexMap.put(aggregationDescriptor, deviceViewInputIndexes.get(index));
- index++;
- }
-
- List<PlanNode> sourceNodeList =
- constructSourceNodeFromAggregationDescriptors(
- ascendingAggregations,
- descendingAggregations,
- countTimeAggregations,
- scanOrder,
- groupByTimeParameter);
- updateTypeProvider(aggregationExpressions);
- updateTypeProvider(sourceTransformExpressions);
-
- if (!curStep.isOutputPartial()) {
- // update measurementIndexes
- deviceViewInputIndexes.clear();
- if (outputEndTime) {
- deviceViewInputIndexes.add(1);
- }
- deviceViewInputIndexes.addAll(
- sourceNodeList.stream()
- .map(
- planNode ->
- ((SeriesAggregationSourceNode) planNode).getAggregationDescriptorList())
- .flatMap(List::stream)
- .map(aggregationToIndexMap::get)
- .collect(Collectors.toList()));
- }
-
- return convergeAggregationSource(
- sourceNodeList,
- curStep,
- scanOrder,
- groupByTimeParameter,
- aggregationExpressions,
- crossGroupByExpressions,
- null,
- null);
- }
-
- private AggregationDescriptor createAggregationDescriptor(
- FunctionExpression sourceExpression,
- AggregationStep curStep,
- Ordering scanOrder,
- boolean needCheckAscending,
- Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
- Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
- Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations) {
- AggregationDescriptor aggregationDescriptor =
- new AggregationDescriptor(
- sourceExpression.getFunctionName(),
- curStep,
- sourceExpression.getExpressions(),
- sourceExpression.getFunctionAttributes());
- if (curStep.isOutputPartial()) {
- updateTypeProviderByPartialAggregation(aggregationDescriptor, context.getTypeProvider());
- }
-
- if (COUNT_TIME.equalsIgnoreCase(sourceExpression.getFunctionName())) {
- Map<String, Pair<List<String>, List<IMeasurementSchema>>> map = new HashMap<>();
- for (Expression expression : sourceExpression.getCountTimeExpressions()) {
- TimeSeriesOperand ts = (TimeSeriesOperand) expression;
- PartialPath path = ts.getPath();
- Pair<List<String>, List<IMeasurementSchema>> pair =
- map.computeIfAbsent(
- path.getDevice(), k -> new Pair<>(new ArrayList<>(), new ArrayList<>()));
- pair.left.add(path.getMeasurement());
- try {
- pair.right.add(path.getMeasurementSchema());
- } catch (MetadataException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- for (Map.Entry<String, Pair<List<String>, List<IMeasurementSchema>>> entry : map.entrySet()) {
- String device = entry.getKey();
- Pair<List<String>, List<IMeasurementSchema>> pair = entry.getValue();
- AlignedPath alignedPath = null;
- try {
- alignedPath = new AlignedPath(device, pair.left, pair.right);
- } catch (IllegalPathException e) {
- throw new RuntimeException(e);
- }
- countTimeAggregations.put(alignedPath, Collections.singletonList(aggregationDescriptor));
- }
-
- return aggregationDescriptor;
- }
-
- PartialPath selectPath =
- ((TimeSeriesOperand) sourceExpression.getExpressions().get(0)).getPath();
- if (!needCheckAscending
- || SchemaUtils.isConsistentWithScanOrder(
- aggregationDescriptor.getAggregationType(), scanOrder)) {
- ascendingAggregations
- .computeIfAbsent(selectPath, key -> new ArrayList<>())
- .add(aggregationDescriptor);
- } else {
- descendingAggregations
- .computeIfAbsent(selectPath, key -> new ArrayList<>())
- .add(aggregationDescriptor);
- }
- return aggregationDescriptor;
- }
-
- private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
- Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
- Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
- Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations,
- Ordering scanOrder,
- GroupByTimeParameter groupByTimeParameter) {
-
- List<PlanNode> sourceNodeList = new ArrayList<>();
- boolean needCheckAscending = groupByTimeParameter == null;
- Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations = null;
- if (!countTimeAggregations.isEmpty()) {
- groupedAscendingAggregations = countTimeAggregations;
- } else {
- groupedAscendingAggregations = MetaUtils.groupAlignedAggregations(ascendingAggregations);
- }
-
- for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
- groupedAscendingAggregations.entrySet()) {
- sourceNodeList.add(
- createAggregationScanNode(
- pathAggregationsEntry.getKey(),
- pathAggregationsEntry.getValue(),
- scanOrder,
- groupByTimeParameter));
- }
-
- if (needCheckAscending) {
- Map<PartialPath, List<AggregationDescriptor>> groupedDescendingAggregations =
- MetaUtils.groupAlignedAggregations(descendingAggregations);
- for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
- groupedDescendingAggregations.entrySet()) {
- sourceNodeList.add(
- createAggregationScanNode(
- pathAggregationsEntry.getKey(),
- pathAggregationsEntry.getValue(),
- scanOrder.reverse(),
- null));
- }
- }
- return sourceNodeList;
- }
-
- private LogicalPlanBuilder convergeAggregationSource(
- List<PlanNode> sourceNodeList,
- AggregationStep curStep,
- Ordering scanOrder,
- GroupByTimeParameter groupByTimeParameter,
- Set<Expression> aggregationExpressions,
- Map<Expression, Set<Expression>> crossGroupByExpressions,
- List<String> tagKeys,
- Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
- tagValuesToGroupedTimeseriesOperands) {
- if (curStep.isOutputPartial()) {
- if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
- curStep =
- crossGroupByExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
-
- this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
-
- this.root =
- createSlidingWindowAggregationNode(
- this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder);
-
- if (crossGroupByExpressions != null) {
- curStep = AggregationStep.FINAL;
- if (tagKeys != null) {
- this.root =
- createGroupByTagNode(
- tagKeys,
- tagValuesToGroupedTimeseriesOperands,
- crossGroupByExpressions.keySet(),
- Collections.singletonList(this.getRoot()),
- curStep,
- groupByTimeParameter,
- scanOrder);
- } else {
- this.root =
- createGroupByTLevelNode(
- Collections.singletonList(this.getRoot()),
- crossGroupByExpressions,
- curStep,
- groupByTimeParameter,
- scanOrder);
- }
- }
- } else {
- if (tagKeys != null) {
- curStep = AggregationStep.FINAL;
- this.root =
- createGroupByTagNode(
- tagKeys,
- tagValuesToGroupedTimeseriesOperands,
- crossGroupByExpressions.keySet(),
- sourceNodeList,
- curStep,
- groupByTimeParameter,
- scanOrder);
- } else if (crossGroupByExpressions != null) {
- curStep = AggregationStep.FINAL;
- this.root =
- createGroupByTLevelNode(
- sourceNodeList,
- crossGroupByExpressions,
- curStep,
- groupByTimeParameter,
- scanOrder);
- }
- }
- } else {
- this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
- }
-
- return this;
- }
-
public static void updateTypeProviderByPartialAggregation(
AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
if (aggregationDescriptor.getAggregationType() == TAggregationType.UDAF) {
@@ -977,7 +669,7 @@ public class LogicalPlanBuilder {
return this;
}
- public LogicalPlanBuilder planAggregation(
+ public LogicalPlanBuilder planRawDataAggregation(
Set<Expression> aggregationExpressions,
Expression groupByExpression,
GroupByTimeParameter groupByTimeParameter,
@@ -999,9 +691,9 @@ public class LogicalPlanBuilder {
aggregationDescriptor, context.getTypeProvider()));
}
this.root =
- new AggregationNode(
+ new RawDataAggregationNode(
context.getQueryId().genPlanNodeId(),
- Collections.singletonList(this.getRoot()),
+ this.getRoot(),
aggregationDescriptorList,
groupByTimeParameter,
groupByParameter,
@@ -1076,85 +768,6 @@ public class LogicalPlanBuilder {
scanOrder);
}
- private PlanNode createGroupByTagNode(
- List<String> tagKeys,
- Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
- tagValuesToGroupedTimeseriesOperands,
- Collection<Expression> groupByTagOutputExpressions,
- List<PlanNode> children,
- AggregationStep curStep,
- GroupByTimeParameter groupByTimeParameter,
- Ordering scanOrder) {
- Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
- new HashMap<>();
- for (List<String> tagValues : tagValuesToGroupedTimeseriesOperands.keySet()) {
- LinkedHashMap<Expression, List<Expression>> groupedTimeseriesOperands =
- tagValuesToGroupedTimeseriesOperands.get(tagValues);
- List<CrossSeriesAggregationDescriptor> aggregationDescriptors = new ArrayList<>();
-
- // Bind an AggregationDescriptor for each GroupByTagOutputExpression
- for (Expression groupByTagOutputExpression : groupByTagOutputExpressions) {
- boolean added = false;
- for (Expression expression : groupedTimeseriesOperands.keySet()) {
- if (expression.equals(groupByTagOutputExpression)) {
- String functionName = ((FunctionExpression) expression).getFunctionName();
- CrossSeriesAggregationDescriptor aggregationDescriptor =
- new CrossSeriesAggregationDescriptor(
- functionName,
- curStep,
- groupedTimeseriesOperands.get(expression),
- ((FunctionExpression) expression).getFunctionAttributes(),
- expression.getExpressions());
- aggregationDescriptors.add(aggregationDescriptor);
- added = true;
- break;
- }
- }
- if (!added) {
- aggregationDescriptors.add(null);
- }
- }
- tagValuesToAggregationDescriptors.put(tagValues, aggregationDescriptors);
- }
-
- updateTypeProvider(groupByTagOutputExpressions);
- updateTypeProviderWithConstantType(tagKeys, TSDataType.TEXT);
- return new GroupByTagNode(
- context.getQueryId().genPlanNodeId(),
- children,
- groupByTimeParameter,
- scanOrder,
- tagKeys,
- tagValuesToAggregationDescriptors,
- groupByTagOutputExpressions.stream()
- .map(Expression::getExpressionString)
- .collect(Collectors.toList()));
- }
-
- private SeriesAggregationSourceNode createAggregationScanNode(
- PartialPath selectPath,
- List<AggregationDescriptor> aggregationDescriptorList,
- Ordering scanOrder,
- GroupByTimeParameter groupByTimeParameter) {
- if (selectPath instanceof MeasurementPath) { // non-aligned series
- return new SeriesAggregationScanNode(
- context.getQueryId().genPlanNodeId(),
- (MeasurementPath) selectPath,
- aggregationDescriptorList,
- scanOrder,
- groupByTimeParameter);
- } else if (selectPath instanceof AlignedPath) { // aligned series
- return new AlignedSeriesAggregationScanNode(
- context.getQueryId().genPlanNodeId(),
- (AlignedPath) selectPath,
- aggregationDescriptorList,
- scanOrder,
- groupByTimeParameter);
- } else {
- throw new IllegalArgumentException("unexpected path type");
- }
- }
-
private List<AggregationDescriptor> constructAggregationDescriptorList(
Set<Expression> aggregationExpressions, AggregationStep curStep) {
return aggregationExpressions.stream()
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 87c04426010..2c371732f4f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -18,18 +18,12 @@
*/
package org.apache.iotdb.db.queryengine.plan.planner;
-import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
-import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
-import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.ExplainAnalyzeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -95,8 +89,6 @@ import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -105,7 +97,6 @@ import java.util.Map;
import java.util.Set;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
-import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
/**
* This visitor is used to generate a logical plan for the statement and returns the {@link
@@ -179,7 +170,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
analysis.getDeviceToGroupByExpression() != null
? analysis.getDeviceToGroupByExpression().get(deviceName)
: null,
- analysis.getDeviceViewInputIndexesMap().get(deviceName),
context));
// order by device, expression, push down sortOperator
if (queryStatement.needPushDownSort()) {
@@ -210,7 +200,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
analysis.getWhereExpression(),
analysis.getAggregationExpressions(),
analysis.getGroupByExpression(),
- null,
context));
}
@@ -255,7 +244,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
Expression whereExpression,
Set<Expression> aggregationExpressions,
Expression groupByExpression,
- List<Integer> deviceViewInputIndexes,
MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
if (aggregationExpressions == null) {
@@ -275,96 +263,56 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
queryStatement.getResultTimeOrder());
} else {
// aggregation query
- boolean isRawDataSource =
- analysis.hasValueFilter()
- || analysis.hasGroupByParameter()
- || needTransform(sourceTransformExpressions)
- || cannotUseStatistics(aggregationExpressions, sourceTransformExpressions);
- if (queryStatement.isOutputEndTime()) {
- context.getTypeProvider().setType(ENDTIME, TSDataType.INT64);
- }
- AggregationStep curStep;
- if (isRawDataSource) {
- planBuilder =
- planBuilder
- .planRawDataSource(
- sourceExpressions,
- queryStatement.getResultTimeOrder(),
- 0,
- 0,
- analysis.isLastLevelUseWildcard())
- .planWhereAndSourceTransform(
- whereExpression,
- sourceTransformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getResultTimeOrder());
-
- boolean outputPartial =
- queryStatement.isGroupByLevel()
- || (queryStatement.isGroupByTime()
- && analysis.getGroupByTimeParameter().hasOverlap());
- curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE;
+ planBuilder =
+ planBuilder
+ .planRawDataSource(
+ sourceExpressions,
+ queryStatement.getResultTimeOrder(),
+ 0,
+ 0,
+ analysis.isLastLevelUseWildcard())
+ .planWhereAndSourceTransform(
+ whereExpression,
+ sourceTransformExpressions,
+ queryStatement.isGroupByTime(),
+ queryStatement.getResultTimeOrder());
+
+ boolean outputPartial =
+ queryStatement.isGroupByLevel()
+ || (queryStatement.isGroupByTime()
+ && analysis.getGroupByTimeParameter().hasOverlap());
+ AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE;
+ planBuilder =
+ planBuilder.planRawDataAggregation(
+ aggregationExpressions,
+ groupByExpression,
+ analysis.getGroupByTimeParameter(),
+ analysis.getGroupByParameter(),
+ queryStatement.isOutputEndTime(),
+ curStep,
+ queryStatement.getResultTimeOrder());
+
+ if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
+ curStep =
+ queryStatement.isGroupByLevel() ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
planBuilder =
- planBuilder.planAggregation(
+ planBuilder.planSlidingWindowAggregation(
aggregationExpressions,
- groupByExpression,
analysis.getGroupByTimeParameter(),
- analysis.getGroupByParameter(),
- queryStatement.isOutputEndTime(),
curStep,
queryStatement.getResultTimeOrder());
+ }
- if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
- curStep =
- queryStatement.isGroupByLevel()
- ? AggregationStep.INTERMEDIATE
- : AggregationStep.FINAL;
- planBuilder =
- planBuilder.planSlidingWindowAggregation(
- aggregationExpressions,
- analysis.getGroupByTimeParameter(),
- curStep,
- queryStatement.getResultTimeOrder());
- }
-
- if (queryStatement.isGroupByLevel()) {
- planBuilder =
- planBuilder.planGroupByLevel(
- analysis.getCrossGroupByExpressions(),
- analysis.getGroupByTimeParameter(),
- queryStatement.getResultTimeOrder());
- }
- } else {
- curStep =
- (analysis.getCrossGroupByExpressions() != null
- || (analysis.getGroupByTimeParameter() != null
- && analysis.getGroupByTimeParameter().hasOverlap()))
- ? AggregationStep.PARTIAL
- : AggregationStep.SINGLE;
-
+ if (queryStatement.isGroupByLevel()) {
planBuilder =
- deviceViewInputIndexes == null
- ? planBuilder.planAggregationSource(
- curStep,
- queryStatement.getResultTimeOrder(),
- analysis.getGroupByTimeParameter(),
- aggregationExpressions,
- sourceTransformExpressions,
- analysis.getCrossGroupByExpressions(),
- analysis.getTagKeys(),
- analysis.getTagValuesToGroupedTimeseriesOperands())
- : planBuilder.planAggregationSourceWithIndexAdjust(
- curStep,
- queryStatement.getResultTimeOrder(),
- analysis.getGroupByTimeParameter(),
- aggregationExpressions,
- sourceTransformExpressions,
- analysis.getCrossGroupByExpressions(),
- deviceViewInputIndexes,
- queryStatement.isOutputEndTime());
+ planBuilder.planGroupByLevel(
+ analysis.getCrossGroupByExpressions(),
+ analysis.getGroupByTimeParameter(),
+ queryStatement.getResultTimeOrder());
}
if (queryStatement.isGroupByTime() && queryStatement.isOutputEndTime()) {
+ context.getTypeProvider().setType(ENDTIME, TSDataType.INT64);
planBuilder =
planBuilder.planEndTimeColumnInject(
analysis.getGroupByTimeParameter(),
@@ -395,55 +343,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
return 0;
}
- private boolean needTransform(Set<Expression> expressions) {
- for (Expression expression : expressions) {
- if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean cannotUseStatistics(
- Set<Expression> expressions, Set<Expression> sourceTransformExpressions) {
- for (Expression expression : expressions) {
-
- if (expression instanceof FunctionExpression) {
- FunctionExpression functionExpression = (FunctionExpression) expression;
- // Disable statistics optimization of UDAF for now
- if (functionExpression.isExternalAggregationFunctionExpression()) {
- return true;
- }
-
- if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) {
- String alignedDeviceId = "";
- for (Expression countTimeExpression : sourceTransformExpressions) {
- TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression;
- if (!(ts.getPath() instanceof AlignedPath
- || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) {
- return true;
- }
- if (StringUtils.isEmpty(alignedDeviceId)) {
- alignedDeviceId = ts.getPath().getDevice();
- } else if (!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) {
- // count_time from only one aligned device can use AlignedSeriesAggScan
- return true;
- }
- }
- return false;
- }
-
- if (!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName())) {
- return true;
- }
- } else {
- throw new IllegalArgumentException(
- String.format("Invalid Aggregation Expression: %s", expression.getExpressionString()));
- }
- }
- return false;
- }
-
@Override
public PlanNode visitCreateTimeseries(
CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java
index 78d939f019f..5f460f0267d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.db.queryengine.plan.planner;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.optimization.AggregationPushDown;
import org.apache.iotdb.db.queryengine.plan.optimization.PlanOptimizer;
import org.apache.iotdb.db.queryengine.plan.optimization.PredicatePushDown;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.LOGICAL_PLANNER;
@@ -35,7 +36,8 @@ import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.LOGI
public class LogicalPlanner {
private final MPPQueryContext context;
- private final List<PlanOptimizer> optimizers = Collections.singletonList(new PredicatePushDown());
+ private final List<PlanOptimizer> optimizers =
+ Arrays.asList(new PredicatePushDown(), new AggregationPushDown());
public LogicalPlanner(MPPQueryContext context) {
this.context = context;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index a6d043f5ad2..9517179c98a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -192,6 +192,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -1753,11 +1754,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
@Override
- public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) {
+ public Operator visitRawDataAggregation(
+ RawDataAggregationNode node, LocalExecutionPlanContext context) {
checkArgument(
!node.getAggregationDescriptorList().isEmpty(),
"Aggregation descriptorList cannot be empty");
- List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
+ Operator child = node.getChild().accept(this, context);
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
@@ -1779,128 +1781,154 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
descriptor.getStep(),
inputLocationList));
}
- boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
GroupByParameter groupByParameter = node.getGroupByParameter();
- if (inputRaw) {
- checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input");
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- RawDataAggregationOperator.class.getSimpleName());
-
- ITimeRangeIterator timeRangeIterator =
- initTimeRangeIterator(groupByTimeParameter, ascending, true);
- long maxReturnSize =
- calculateMaxAggregationResultSize(
- aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
-
- // groupByParameter and groupByTimeParameter
- if (groupByParameter != null) {
- WindowType windowType = groupByParameter.getWindowType();
-
- WindowParameter windowParameter;
- switch (windowType) {
- case VARIATION_WINDOW:
- Expression groupByVariationExpression = node.getGroupByExpression();
- if (groupByVariationExpression == null) {
- throw new IllegalArgumentException("groupByVariationExpression can't be null");
- }
- String controlColumn = groupByVariationExpression.getExpressionString();
- TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn);
- windowParameter =
- new VariationWindowParameter(
- controlColumnType,
- layout.get(controlColumn).get(0).getValueColumnIndex(),
- node.isOutputEndTime(),
- ((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
- ((GroupByVariationParameter) groupByParameter).getDelta());
- break;
- case CONDITION_WINDOW:
- Expression groupByConditionExpression = node.getGroupByExpression();
- if (groupByConditionExpression == null) {
- throw new IllegalArgumentException("groupByConditionExpression can't be null");
- }
- windowParameter =
- new ConditionWindowParameter(
- node.isOutputEndTime(),
- ((GroupByConditionParameter) groupByParameter).isIgnoringNull(),
- layout
- .get(groupByConditionExpression.getExpressionString())
- .get(0)
- .getValueColumnIndex(),
- ((GroupByConditionParameter) groupByParameter).getKeepExpression());
- break;
- case SESSION_WINDOW:
- windowParameter =
- new SessionWindowParameter(
- ((GroupBySessionParameter) groupByParameter).getTimeInterval(),
- node.isOutputEndTime());
- break;
- case COUNT_WINDOW:
- Expression groupByCountExpression = node.getGroupByExpression();
- if (groupByCountExpression == null) {
- throw new IllegalArgumentException("groupByCountExpression can't be null");
- }
- windowParameter =
- new CountWindowParameter(
- ((GroupByCountParameter) groupByParameter).getCountNumber(),
- layout
- .get(groupByCountExpression.getExpressionString())
- .get(0)
- .getValueColumnIndex(),
- node.isOutputEndTime(),
- ((GroupByCountParameter) groupByParameter).isIgnoreNull());
- break;
- default:
- throw new IllegalArgumentException("Unsupported window type");
- }
- return new RawDataAggregationOperator(
- operatorContext,
- aggregators,
- timeRangeIterator,
- children.get(0),
- ascending,
- maxReturnSize,
- windowParameter);
- }
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ RawDataAggregationOperator.class.getSimpleName());
+
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
- WindowParameter windowParameter = new TimeWindowParameter(node.isOutputEndTime());
+ // groupByParameter and groupByTimeParameter
+ if (groupByParameter != null) {
+ WindowType windowType = groupByParameter.getWindowType();
+
+ WindowParameter windowParameter;
+ switch (windowType) {
+ case VARIATION_WINDOW:
+ Expression groupByVariationExpression = node.getGroupByExpression();
+ if (groupByVariationExpression == null) {
+ throw new IllegalArgumentException("groupByVariationExpression can't be null");
+ }
+ String controlColumn = groupByVariationExpression.getExpressionString();
+ TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn);
+ windowParameter =
+ new VariationWindowParameter(
+ controlColumnType,
+ layout.get(controlColumn).get(0).getValueColumnIndex(),
+ node.isOutputEndTime(),
+ ((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
+ ((GroupByVariationParameter) groupByParameter).getDelta());
+ break;
+ case CONDITION_WINDOW:
+ Expression groupByConditionExpression = node.getGroupByExpression();
+ if (groupByConditionExpression == null) {
+ throw new IllegalArgumentException("groupByConditionExpression can't be null");
+ }
+ windowParameter =
+ new ConditionWindowParameter(
+ node.isOutputEndTime(),
+ ((GroupByConditionParameter) groupByParameter).isIgnoringNull(),
+ layout
+ .get(groupByConditionExpression.getExpressionString())
+ .get(0)
+ .getValueColumnIndex(),
+ ((GroupByConditionParameter) groupByParameter).getKeepExpression());
+ break;
+ case SESSION_WINDOW:
+ windowParameter =
+ new SessionWindowParameter(
+ ((GroupBySessionParameter) groupByParameter).getTimeInterval(),
+ node.isOutputEndTime());
+ break;
+ case COUNT_WINDOW:
+ Expression groupByCountExpression = node.getGroupByExpression();
+ if (groupByCountExpression == null) {
+ throw new IllegalArgumentException("groupByCountExpression can't be null");
+ }
+ windowParameter =
+ new CountWindowParameter(
+ ((GroupByCountParameter) groupByParameter).getCountNumber(),
+ layout
+ .get(groupByCountExpression.getExpressionString())
+ .get(0)
+ .getValueColumnIndex(),
+ node.isOutputEndTime(),
+ ((GroupByCountParameter) groupByParameter).isIgnoreNull());
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported window type");
+ }
return new RawDataAggregationOperator(
operatorContext,
aggregators,
timeRangeIterator,
- children.get(0),
+ child,
ascending,
maxReturnSize,
windowParameter);
- } else {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- AggregationOperator.class.getSimpleName());
+ }
- ITimeRangeIterator timeRangeIterator =
- initTimeRangeIterator(groupByTimeParameter, ascending, true);
- long maxReturnSize =
- calculateMaxAggregationResultSize(
- aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+ WindowParameter windowParameter = new TimeWindowParameter(node.isOutputEndTime());
+ return new RawDataAggregationOperator(
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ child,
+ ascending,
+ maxReturnSize,
+ windowParameter);
+ }
- return new AggregationOperator(
- operatorContext,
- aggregators,
- timeRangeIterator,
- children,
- node.isOutputEndTime(),
- maxReturnSize);
+ @Override
+ public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) {
+ checkArgument(
+ !node.getAggregationDescriptorList().isEmpty(),
+ "Aggregation descriptorList cannot be empty");
+ List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
+ boolean ascending = node.getScanOrder() == Ordering.ASC;
+ List<Aggregator> aggregators = new ArrayList<>();
+ Map<String, List<InputLocation>> layout = makeLayout(node);
+ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+ for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
+ List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ descriptor.getAggregationFuncName(),
+ descriptor.getAggregationType(),
+ descriptor.getInputExpressions().stream()
+ .map(x -> context.getTypeProvider().getType(x.getExpressionString()))
+ .collect(Collectors.toList()),
+ descriptor.getInputExpressions(),
+ descriptor.getInputAttributes(),
+ ascending,
+ descriptor.getStep().isInputRaw()),
+ descriptor.getStep(),
+ inputLocationList));
}
+
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ AggregationOperator.class.getSimpleName());
+
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(node.getGroupByTimeParameter(), ascending, true);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
+ return new AggregationOperator(
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ children,
+ node.isOutputEndTime(),
+ maxReturnSize);
}
private List<InputLocation[]> calcInputLocationList(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 3ae7a3f126f..958e85592ea 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -381,6 +382,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processOneChildNode(node, context);
}
+ @Override
+ public PlanNode visitRawDataAggregation(RawDataAggregationNode node, NodeGroupContext context) {
+ return processOneChildNode(node, context);
+ }
+
@Override
public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, NodeGroupContext context) {
ExplainAnalyzeNode newNode = (ExplainAnalyzeNode) node.clone();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index bda66843a97..c96d87573b9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -209,6 +210,8 @@ public enum PlanNodeType {
EXPLAIN_ANALYZE((short) 90),
PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91),
+
+ RAW_DATA_AGGREGATION((short) 92),
;
public static final int BYTES = Short.BYTES;
@@ -440,6 +443,8 @@ public enum PlanNodeType {
return ExplainAnalyzeNode.deserialize(buffer);
case 91:
return PipeOperateSchemaQueueNode.deserialize(buffer);
+ case 92:
+ return RawDataAggregationNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 608ca14df24..932a9d22597 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -76,6 +76,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -222,6 +223,10 @@ public abstract class PlanVisitor<R, C> {
return visitSingleChildProcess(node, context);
}
+ public R visitRawDataAggregation(RawDataAggregationNode node, C context) {
+ return visitSingleChildProcess(node, context);
+ }
+
// two child -----------------------------------------------------------------------------------
public R visitTwoChildProcess(TwoChildProcessNode node, C context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
new file mode 100644
index 00000000000..ab2683743a3
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode.getDeduplicatedDescriptors;
+
+public class RawDataAggregationNode extends SingleChildProcessNode {
+
+ // The list of aggregate functions, each AggregateDescriptor will be output as one or two column
+ // of result TsBlock
+ protected List<AggregationDescriptor> aggregationDescriptorList;
+
+ // The parameter of `group by time`.
+ // Its value will be null if there is no `group by time` clause.
+ @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
+ // The parameter of `group by`.
+ // Its value will be null if there is no `group by` clause.
+ @Nullable protected GroupByParameter groupByParameter;
+
+ // In some situation of `group by` clause, groupByExpression is required.
+ // It will be null if the clause doesn't refer to any expression.
+ protected Expression groupByExpression;
+
+ protected Ordering scanOrder;
+
+ protected boolean outputEndTime = false;
+
+ public RawDataAggregationNode(
+ PlanNodeId id,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ @Nullable GroupByTimeParameter groupByTimeParameter,
+ Ordering scanOrder) {
+ super(id);
+ this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.scanOrder = scanOrder;
+ }
+
+ public RawDataAggregationNode(
+ PlanNodeId id,
+ PlanNode child,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ @Nullable GroupByTimeParameter groupByTimeParameter,
+ Ordering scanOrder) {
+ super(id, child);
+ this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.scanOrder = scanOrder;
+ }
+
+ public RawDataAggregationNode(
+ PlanNodeId id,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ @Nullable GroupByTimeParameter groupByTimeParameter,
+ @Nullable GroupByParameter groupByParameter,
+ Expression groupByExpression,
+ boolean outputEndTime,
+ Ordering scanOrder) {
+ super(id);
+ this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.scanOrder = scanOrder;
+ this.groupByParameter = groupByParameter;
+ this.groupByExpression = groupByExpression;
+ this.outputEndTime = outputEndTime;
+ }
+
+ public RawDataAggregationNode(
+ PlanNodeId id,
+ PlanNode child,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ @Nullable GroupByTimeParameter groupByTimeParameter,
+ @Nullable GroupByParameter groupByParameter,
+ Expression groupByExpression,
+ boolean outputEndTime,
+ Ordering scanOrder) {
+ super(id, child);
+ this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+ this.scanOrder = scanOrder;
+ this.groupByParameter = groupByParameter;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.groupByExpression = groupByExpression;
+ this.outputEndTime = outputEndTime;
+ }
+
+ public List<AggregationDescriptor> getAggregationDescriptorList() {
+ return aggregationDescriptorList;
+ }
+
+ @Nullable
+ public GroupByTimeParameter getGroupByTimeParameter() {
+ return groupByTimeParameter;
+ }
+
+ @Nullable
+ public GroupByParameter getGroupByParameter() {
+ return groupByParameter;
+ }
+
+ public Ordering getScanOrder() {
+ return scanOrder;
+ }
+
+ public boolean isOutputEndTime() {
+ return outputEndTime;
+ }
+
+ public void setOutputEndTime(boolean outputEndTime) {
+ this.outputEndTime = outputEndTime;
+ }
+
+ @Nullable
+ public Expression getGroupByExpression() {
+ return groupByExpression;
+ }
+
+ @Override
+ public PlanNodeType getType() {
+ return PlanNodeType.RAW_DATA_AGGREGATION;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new RawDataAggregationNode(
+ getPlanNodeId(),
+ getAggregationDescriptorList(),
+ getGroupByTimeParameter(),
+ getGroupByParameter(),
+ getGroupByExpression(),
+ outputEndTime,
+ getScanOrder());
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ List<String> outputColumnNames = new ArrayList<>();
+ if (outputEndTime) {
+ outputColumnNames.add(ColumnHeaderConstant.ENDTIME);
+ }
+ outputColumnNames.addAll(
+ aggregationDescriptorList.stream()
+ .map(AggregationDescriptor::getOutputColumnNames)
+ .flatMap(List::stream)
+ .collect(Collectors.toList()));
+
+ return outputColumnNames;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitRawDataAggregation(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.RAW_DATA_AGGREGATION.serialize(byteBuffer);
+ ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer);
+ for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
+ aggregationDescriptor.serialize(byteBuffer);
+ }
+ if (groupByTimeParameter == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ groupByTimeParameter.serialize(byteBuffer);
+ }
+ if (groupByParameter == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ groupByParameter.serialize(byteBuffer);
+ }
+ if (groupByExpression == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ Expression.serialize(groupByExpression, byteBuffer);
+ }
+ ReadWriteIOUtils.write(outputEndTime, byteBuffer);
+ ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.RAW_DATA_AGGREGATION.serialize(stream);
+ ReadWriteIOUtils.write(aggregationDescriptorList.size(), stream);
+ for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
+ aggregationDescriptor.serialize(stream);
+ }
+ if (groupByTimeParameter == null) {
+ ReadWriteIOUtils.write((byte) 0, stream);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, stream);
+ groupByTimeParameter.serialize(stream);
+ }
+ if (groupByParameter == null) {
+ ReadWriteIOUtils.write((byte) 0, stream);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, stream);
+ groupByParameter.serialize(stream);
+ }
+ if (groupByExpression == null) {
+ ReadWriteIOUtils.write((byte) 0, stream);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, stream);
+ Expression.serialize(groupByExpression, stream);
+ }
+ ReadWriteIOUtils.write(outputEndTime, stream);
+ ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+ }
+
+ public static RawDataAggregationNode deserialize(ByteBuffer byteBuffer) {
+ int descriptorSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>();
+ while (descriptorSize > 0) {
+ aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer));
+ descriptorSize--;
+ }
+ byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
+ GroupByTimeParameter groupByTimeParameter = null;
+ if (isNull == 1) {
+ groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+ }
+ isNull = ReadWriteIOUtils.readByte(byteBuffer);
+ GroupByParameter groupByParameter = null;
+ if (isNull == 1) {
+ groupByParameter = GroupByParameter.deserialize(byteBuffer);
+ }
+ isNull = ReadWriteIOUtils.readByte(byteBuffer);
+ Expression groupByExpression = null;
+ if (isNull == 1) {
+ groupByExpression = Expression.deserialize(byteBuffer);
+ }
+ boolean outputEndTime = ReadWriteIOUtils.readBool(byteBuffer);
+ Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new RawDataAggregationNode(
+ planNodeId,
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ groupByParameter,
+ groupByExpression,
+ outputEndTime,
+ scanOrder);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ RawDataAggregationNode that = (RawDataAggregationNode) o;
+ return Objects.equals(aggregationDescriptorList, that.aggregationDescriptorList)
+ && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+ && Objects.equals(groupByParameter, that.groupByParameter)
+ && Objects.equals(groupByExpression, that.groupByExpression)
+ && Objects.equals(outputEndTime, that.outputEndTime)
+ && scanOrder == that.scanOrder;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ groupByParameter,
+ groupByExpression,
+ outputEndTime,
+ scanOrder);
+ }
+
+ public String toString() {
+ return String.format("RawDataAggregationNode-%s", getPlanNodeId());
+ }
+}