You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:19 UTC
[iotdb] 13/13: use agg instead of last
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8cb7027ed056f37b09d1077a24e14c5732b203c0
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Mar 21 16:45:02 2023 +0800
use agg instead of last
---
.../source/FileAggregationScanOperator.java | 5 +++--
.../operator/source/FileAggregationScanUtil.java | 11 +--------
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 16 ++++++++-----
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 15 ++++++-------
.../plan/planner/distribution/SourceRewriter.java | 19 ++++++++++++----
.../plan/node/source/FileAggregationScanNode.java | 26 +++++++++++++++++-----
6 files changed, 57 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
index 0f61f0656b..23ed2cc12c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
@@ -22,14 +22,15 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import java.io.IOException;
+import java.util.Collections;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
@@ -51,7 +52,7 @@ public class FileAggregationScanOperator extends AbstractSourceOperator
this.aggregationScanUtil =
new FileAggregationScanUtil(
pathPattern, aggregationDescriptor, levels, new SeriesScanOptions.Builder().build());
- this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
+ this.tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT64));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index 9267d68cb6..4b91a9f6ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
-import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
import java.util.ArrayList;
@@ -122,20 +121,12 @@ public class FileAggregationScanUtil {
public TsBlock getAggregationResult(TsBlockBuilder builder) {
for (Map.Entry<PartialPath, Aggregator> entry : pathToAggregatorMap.entrySet()) {
builder.getTimeColumnBuilder().writeLong(0L);
- builder.getValueColumnBuilders()[0].writeBinary(
- Binary.valueOf(
- String.format(
- "%s(%s)",
- aggregationDescriptor.getAggregationFuncName(), entry.getKey().toString())));
Aggregator aggregator = entry.getValue();
ColumnBuilder[] columnBuilders = new ColumnBuilder[1];
- columnBuilders[0] = builder.getValueColumnBuilders()[1];
+ columnBuilders[0] = builder.getValueColumnBuilders()[0];
aggregator.outputResult(columnBuilders);
- builder.getValueColumnBuilders()[2].writeBinary(
- Binary.valueOf(aggregator.getOutputType()[0].toString()));
-
builder.declarePosition();
}
return builder.build();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 5c03ff0763..35a6801a33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -250,14 +250,20 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
Set<Expression> aggregationExpressions = new HashSet<>();
FunctionExpression aggregationExpression =
new FunctionExpression(
- aggregationMeasurementExpression.getFunctionName(),
- aggregationMeasurementExpression.getFunctionAttributes(),
- Collections.singletonList(sourceExpression));
+ "count",
+ new LinkedHashMap<>(),
+ Collections.singletonList(
+ new TimeSeriesOperand(
+ new MeasurementPath("root.*.*.*.*.*.*", TSDataType.INT64))));
+ analyzeExpression(analysis, aggregationExpression);
aggregationExpressions.add(aggregationExpression);
analysis.setAggregationExpressions(aggregationExpressions);
analysis.setRespDatasetHeader(
- new DatasetHeader(ColumnHeaderConstant.lastQueryColumnHeaders, true));
+ new DatasetHeader(
+ Collections.singletonList(
+ new ColumnHeader(aggregationExpression.toString(), TSDataType.INT64)),
+ true));
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
sgNameToQueryParamsMap.put("root.iov", Collections.emptyList());
@@ -381,7 +387,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
// fetch partition information
analyzeDataPartition(analysis, queryStatement, schemaTree);
- } catch (StatementAnalyzeException e) {
+ } catch (StatementAnalyzeException | IllegalPathException e) {
logger.warn("Meet error when analyzing the query statement: ", e);
throw new StatementAnalyzeException(
"Meet error when analyzing the query statement: " + e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 70b470ee42..7a14f750fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -1204,17 +1204,16 @@ public class LogicalPlanBuilder {
new AggregationDescriptor(
functionExpression.getFunctionName(),
AggregationStep.SINGLE,
- Collections.emptyList(),
+ Collections.singletonList(functionExpression.getExpressions().get(0)),
Collections.emptyMap());
+ updateTypeProvider(Collections.singletonList(functionExpression));
this.root =
new FileAggregationScanNode(
- context.getQueryId().genPlanNodeId(), pathPattern, aggregationDescriptor, levels);
-
- ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
- columnHeader ->
- context
- .getTypeProvider()
- .setType(columnHeader.getColumnName(), columnHeader.getColumnType()));
+ context.getQueryId().genPlanNodeId(),
+ pathPattern,
+ aggregationDescriptor,
+ levels,
+ functionExpression);
return this;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index c9cfeb1102..376284d2a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -454,17 +454,28 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return Collections.singletonList(node);
}
- LastQueryCollectNode lastQueryCollectNode =
- new LastQueryCollectNode(context.queryContext.getQueryId().genPlanNodeId());
+ CrossSeriesAggregationDescriptor rootAggregationDescriptor =
+ new CrossSeriesAggregationDescriptor(
+ "count",
+ AggregationStep.FINAL,
+ Collections.singletonList(node.getOutputExpression().getExpressions().get(0)),
+ Collections.emptyMap(),
+ node.getOutputExpression().getExpressions().get(0));
+ AggregationNode aggregationNode =
+ new AggregationNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ Collections.singletonList(rootAggregationDescriptor),
+ null,
+ Ordering.ASC);
for (TRegionReplicaSet dataRegion : dataDistribution) {
FileAggregationScanNode split = (FileAggregationScanNode) node.clone();
split.setAggregationDescriptor(node.getAggregationDescriptor());
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
- lastQueryCollectNode.addChild(split);
+ aggregationNode.addChild(split);
}
- return Collections.singletonList(lastQueryCollectNode);
+ return Collections.singletonList(aggregationNode);
}
private List<PlanNode> processSeriesAggregationSource(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
index 5c997b9db7..ae4b2f6b2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -35,10 +36,9 @@ import com.google.common.collect.ImmutableList;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
-import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-
public class FileAggregationScanNode extends SeriesSourceNode {
private final PartialPath pathPattern;
@@ -50,15 +50,19 @@ public class FileAggregationScanNode extends SeriesSourceNode {
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
+ private final Expression outputExpression;
+
public FileAggregationScanNode(
PlanNodeId id,
PartialPath pathPattern,
AggregationDescriptor aggregationDescriptor,
- int[] levels) {
+ int[] levels,
+ Expression outputExpression) {
super(id);
this.pathPattern = pathPattern;
this.aggregationDescriptor = aggregationDescriptor;
this.levels = levels;
+ this.outputExpression = outputExpression;
}
public FileAggregationScanNode(
@@ -66,8 +70,9 @@ public class FileAggregationScanNode extends SeriesSourceNode {
PartialPath pathPattern,
AggregationDescriptor aggregationDescriptor,
int[] levels,
+ Expression outputExpression,
TRegionReplicaSet regionReplicaSet) {
- this(id, pathPattern, aggregationDescriptor, levels);
+ this(id, pathPattern, aggregationDescriptor, levels, outputExpression);
this.regionReplicaSet = regionReplicaSet;
}
@@ -87,6 +92,10 @@ public class FileAggregationScanNode extends SeriesSourceNode {
return levels;
}
+ public Expression getOutputExpression() {
+ return outputExpression;
+ }
+
@Override
public PartialPath getPartitionPath() {
return null;
@@ -135,12 +144,13 @@ public class FileAggregationScanNode extends SeriesSourceNode {
getPathPattern(),
getAggregationDescriptor(),
getLevels(),
+ getOutputExpression(),
getRegionReplicaSet());
}
@Override
public List<String> getOutputColumnNames() {
- return LAST_QUERY_HEADER_COLUMNS;
+ return Collections.singletonList(outputExpression.toString());
}
@Override
@@ -152,6 +162,7 @@ public class FileAggregationScanNode extends SeriesSourceNode {
for (int level : levels) {
ReadWriteIOUtils.write(level, buffer);
}
+ Expression.serialize(outputExpression, buffer);
}
@Override
@@ -163,6 +174,7 @@ public class FileAggregationScanNode extends SeriesSourceNode {
for (int level : levels) {
ReadWriteIOUtils.write(level, stream);
}
+ Expression.serialize(outputExpression, stream);
}
public static PlanNode deserialize(ByteBuffer buffer) {
@@ -173,8 +185,10 @@ public class FileAggregationScanNode extends SeriesSourceNode {
for (int i = 0; i < levelsSize; i++) {
levels[i] = ReadWriteIOUtils.readInt(buffer);
}
+ Expression outputExpression = Expression.deserialize(buffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
- return new FileAggregationScanNode(planNodeId, pathPattern, aggregationDescriptor, levels);
+ return new FileAggregationScanNode(
+ planNodeId, pathPattern, aggregationDescriptor, levels, outputExpression);
}
@Override