You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:12 UTC
[iotdb] 06/13: finish FE
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4f70135562b83530bc677b3db52ed1a77a3263d4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Mar 20 16:29:06 2023 +0800
finish FE
---
.../commons/partition/DataPartitionQueryParam.java | 6 -
.../source/FileAggregationScanOperator.java | 107 ++++++++++++++
.../operator/source/FileAggregationScanUtil.java | 58 ++++++--
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 10 ++
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 38 +++++
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 25 ++++
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 9 ++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 26 ++++
.../plan/planner/distribution/SourceRewriter.java | 24 ++++
.../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +
.../plan/node/source/FileAggregationScanNode.java | 158 +++++++++++++++++++++
.../common/block/column/BinaryColumnBuilder.java | 52 +++++++
13 files changed, 509 insertions(+), 15 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 4ed3cd8dc4..6925fb2191 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -36,12 +36,6 @@ public class DataPartitionQueryParam {
// (timePartitionSlotList.get(timePartitionSlotList.size() - 1), +oo)
private boolean needRightAll = false;
- public DataPartitionQueryParam(
- String devicePath, List<TTimePartitionSlot> timePartitionSlotList) {
- this.devicePath = devicePath;
- this.timePartitionSlotList = timePartitionSlotList;
- }
-
public DataPartitionQueryParam(
String devicePath,
List<TTimePartitionSlot> timePartitionSlotList,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
new file mode 100644
index 0000000000..083332ad9c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.io.IOException;
+
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class FileAggregationScanOperator extends AbstractSourceOperator
+ implements DataSourceOperator, SourceOperator {
+
+ private final FileAggregationScanUtil aggregationScanUtil;
+
+ private final TsBlockBuilder tsBlockBuilder;
+
+ public FileAggregationScanOperator(
+ OperatorContext context,
+ PlanNodeId sourceId,
+ PartialPath pathPattern,
+ AggregationDescriptor aggregationDescriptor,
+ int[] levels) {
+ this.sourceId = sourceId;
+ this.operatorContext = context;
+ this.aggregationScanUtil =
+ new FileAggregationScanUtil(
+ pathPattern, aggregationDescriptor, levels, new SeriesScanOptions.Builder().build());
+ this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
+ }
+
+ @Override
+ public void initQueryDataSource(QueryDataSource dataSource) {
+ aggregationScanUtil.initQueryDataSource(dataSource);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return aggregationScanUtil.hasNextFile();
+ }
+
+ @Override
+ public TsBlock next() {
+ tsBlockBuilder.reset();
+ try {
+ aggregationScanUtil.consume();
+ return aggregationScanUtil.getAggregationResult(tsBlockBuilder);
+ } catch (IOException e) {
+ throw new RuntimeException("Error happened while scanning the file", e);
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNextWithTimer();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index ff12114388..5a4ef5cb38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.plan.analyze.GroupByLevelController;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.materializer.TsFileResourceMaterializer;
@@ -37,8 +37,11 @@ import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
import java.util.ArrayList;
@@ -49,9 +52,13 @@ import java.util.Map;
public class FileAggregationScanUtil {
+ private final PartialPath pathPattern;
+
+ private final AggregationDescriptor aggregationDescriptor;
+
private final Map<PartialPath, Aggregator> pathToAggregatorMap;
- private final TsFileResourceMaterializer fileResourceMaterializer;
+ private TsFileResourceMaterializer fileResourceMaterializer;
private final Map<PartialPath, List<IChunkMetadata>> chunkMetadataMap;
@@ -62,23 +69,30 @@ public class FileAggregationScanUtil {
private final SeriesScanOptions scanOptions;
public FileAggregationScanUtil(
- Map<PartialPath, Aggregator> pathToAggregatorMap,
- QueryDataSource dataSource,
+ PartialPath pathPattern,
+ AggregationDescriptor aggregationDescriptor,
int[] levels,
SeriesScanOptions scanOptions) {
- this.pathToAggregatorMap = pathToAggregatorMap;
- this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
+ this.pathPattern = pathPattern;
+ this.aggregationDescriptor = aggregationDescriptor;
+ this.pathToAggregatorMap = new HashMap<>();
this.chunkMetadataMap = new HashMap<>();
this.partialPathPool = new PartialPathPool();
this.levels = levels;
this.scanOptions = scanOptions;
}
+ public void initQueryDataSource(QueryDataSource dataSource) {
+ this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
+ }
+
public boolean hasNextFile() {
return fileResourceMaterializer.hasNext();
}
public void consume() throws IOException {
+ pathToAggregatorMap.clear();
+
TsFileResource nextFile = fileResourceMaterializer.next();
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(nextFile.getTsFilePath(), nextFile.isClosed());
@@ -99,6 +113,28 @@ public class FileAggregationScanUtil {
}
}
+ public TsBlock getAggregationResult(TsBlockBuilder builder) {
+ for (Map.Entry<PartialPath, Aggregator> entry : pathToAggregatorMap.entrySet()) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getValueColumnBuilders()[0].writeBinary(
+ Binary.valueOf(
+ String.format(
+ "%s(%s)",
+ aggregationDescriptor.getAggregationFuncName(), entry.getKey().toString())));
+
+ Aggregator aggregator = entry.getValue();
+ ColumnBuilder[] columnBuilders = new ColumnBuilder[1];
+ columnBuilders[0] = builder.getValueColumnBuilders()[1];
+ aggregator.outputResult(columnBuilders);
+
+ builder.getValueColumnBuilders()[2].writeBinary(
+ Binary.valueOf(aggregator.getOutputType()[0].toString()));
+
+ builder.declarePosition();
+ }
+ return builder.build();
+ }
+
private void unpackChunkMetadata(PartialPath devicePath, IChunkMetadata chunkMetadata)
throws IOException {
PartialPath groupedPath =
@@ -123,6 +159,11 @@ public class FileAggregationScanUtil {
private void consumeTimeseriesMetadata(
PartialPath devicePath, TimeseriesMetadata timeseriesMetadata) {
+ PartialPath fullPath = devicePath.concatNode(timeseriesMetadata.getMeasurementId());
+ if (!pathPattern.matchFullPath(fullPath)) {
+ return;
+ }
+
PartialPath groupedPath =
partialPathPool.getGroupedPath(devicePath, timeseriesMetadata.getMeasurementId());
if (pathToAggregatorMap.containsKey(groupedPath)) {
@@ -130,12 +171,13 @@ public class FileAggregationScanUtil {
groupedPath,
new Aggregator(
AccumulatorFactory.createAccumulator(
- TAggregationType.COUNT,
+ TAggregationType.valueOf(
+ aggregationDescriptor.getAggregationFuncName().toUpperCase()),
timeseriesMetadata.getTSDataType(),
Collections.emptyList(),
Collections.emptyMap(),
true),
- AggregationStep.SINGLE));
+ aggregationDescriptor.getStep()));
}
Filter queryFilter = scanOptions.getQueryFilter();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index bf15b2a826..be471ba3b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -83,6 +83,8 @@ public class Analysis {
// Query Analysis (used in ALIGN BY TIME)
/////////////////////////////////////////////////////////////////////////////////////////////////
+ private int[] levels;
+
// map from device name to series/aggregation under this device
private Set<Expression> sourceExpressions;
@@ -586,4 +588,12 @@ public class Analysis {
public Map<NodeRef<Expression>, TSDataType> getExpressionTypes() {
return expressionTypes;
}
+
+ public void setLevels(int[] levels) {
+ this.levels = levels;
+ }
+
+ public int[] getLevels() {
+ return levels;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index a6bc60a90e..f922ce89dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -229,6 +229,44 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
// check for semantic errors
queryStatement.semanticCheck();
+ if (queryStatement.isGroupByLevel()) {
+ analysis.setLevels(queryStatement.getGroupByLevelComponent().getLevels());
+
+ PartialPath pathPrefix = queryStatement.getFromComponent().getPrefixPaths().get(0);
+ FunctionExpression aggregationMeasurementExpression =
+ (FunctionExpression)
+ queryStatement.getSelectComponent().getResultColumns().get(0).getExpression();
+ TimeSeriesOperand sourceMeasurementExpression =
+ (TimeSeriesOperand) aggregationMeasurementExpression.getExpressions().get(0);
+
+ Set<Expression> sourceExpressions = new HashSet<>();
+ TimeSeriesOperand sourceExpression =
+ new TimeSeriesOperand(pathPrefix.concatPath(sourceMeasurementExpression.getPath()));
+ sourceExpressions.add(sourceExpression);
+ analysis.setSourceExpressions(sourceExpressions);
+
+ Set<Expression> aggregationExpressions = new HashSet<>();
+ FunctionExpression aggregationExpression =
+ new FunctionExpression(
+ aggregationMeasurementExpression.getFunctionName(),
+ aggregationMeasurementExpression.getFunctionAttributes(),
+ Collections.singletonList(sourceExpression));
+ aggregationExpressions.add(aggregationExpression);
+ analysis.setAggregationExpressions(aggregationExpressions);
+
+ analysis.setRespDatasetHeader(
+ new DatasetHeader(ColumnHeaderConstant.lastQueryColumnHeaders, true));
+
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ sgNameToQueryParamsMap.put(
+ "root.iov",
+ Collections.singletonList(
+ new DataPartitionQueryParam("root.iov.**", Collections.emptyList(), true, true)));
+ analysis.setDataPartitionInfo(partitionFetcher.getDataPartition(sgNameToQueryParamsMap));
+
+ return analysis;
+ }
+
// concat path and construct path pattern tree
PathPatternTree patternTree = new PathPatternTree();
queryStatement =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index f2ba699208..70b470ee42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
@@ -1192,4 +1193,28 @@ public class LogicalPlanBuilder {
this.root = new ShowQueriesNode(context.getQueryId().genPlanNodeId(), dataNodeLocation);
return this;
}
+
+ public LogicalPlanBuilder planFileAggregation(
+ Set<Expression> sourceExpressions, Set<Expression> aggregationExpressions, int[] levels) {
+ PartialPath pathPattern =
+ ((TimeSeriesOperand) (new ArrayList<>(sourceExpressions).get(0))).getPath();
+ FunctionExpression functionExpression =
+ (FunctionExpression) (new ArrayList<>(aggregationExpressions).get(0));
+ AggregationDescriptor aggregationDescriptor =
+ new AggregationDescriptor(
+ functionExpression.getFunctionName(),
+ AggregationStep.SINGLE,
+ Collections.emptyList(),
+ Collections.emptyMap());
+ this.root =
+ new FileAggregationScanNode(
+ context.getQueryId().genPlanNodeId(), pathPattern, aggregationDescriptor, levels);
+
+ ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
+ columnHeader ->
+ context
+ .getTypeProvider()
+ .setType(columnHeader.getColumnName(), columnHeader.getColumnType()));
+ return this;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 5e503cb6c6..5c0749d634 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -109,6 +109,15 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+ if (queryStatement.isGroupByLevel()) {
+ return planBuilder
+ .planFileAggregation(
+ analysis.getSourceExpressions(),
+ analysis.getAggregationExpressions(),
+ analysis.getLevels())
+ .getRoot();
+ }
+
if (queryStatement.isLastQuery()) {
return planBuilder
.planLast(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0e556f45bb..1d3851cb1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -120,6 +120,7 @@ import org.apache.iotdb.db.mpp.execution.operator.sink.ShuffleHelperOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.FileAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
@@ -182,6 +183,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -487,6 +489,30 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return seriesAggregationScanOperator;
}
+ @Override
+ public Operator visitFileAggregationScan(
+ FileAggregationScanNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ FileAggregationScanOperator.class.getSimpleName());
+ FileAggregationScanOperator fileAggregationScanOperator =
+ new FileAggregationScanOperator(
+ operatorContext,
+ node.getPlanNodeId(),
+ node.getPathPattern(),
+ node.getAggregationDescriptor(),
+ node.getLevels());
+
+ ((DataDriverContext) context.getDriverContext()).addSourceOperator(fileAggregationScanOperator);
+ context.getDriverContext().setInputDriver(true);
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return fileAggregationScanOperator;
+ }
+
@Override
public Operator visitSchemaQueryOrderByHeat(
SchemaQueryOrderByHeatNode node, LocalExecutionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 22efafdb10..f61891e59c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
@@ -444,6 +445,29 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return processSeriesAggregationSource(node, context);
}
+ @Override
+ public List<PlanNode> visitFileAggregationScan(
+ FileAggregationScanNode node, DistributionPlanContext context) {
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
+ if (dataDistribution.size() == 1) {
+ node.setRegionReplicaSet(dataDistribution.get(0));
+ return Collections.singletonList(node);
+ }
+
+ LastQueryCollectNode lastQueryCollectNode =
+ new LastQueryCollectNode(context.queryContext.getQueryId().genPlanNodeId());
+
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ FileAggregationScanNode split = (FileAggregationScanNode) node.clone();
+ split.setAggregationDescriptor(node.getAggregationDescriptor());
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ lastQueryCollectNode.addChild(split);
+ }
+ return Collections.singletonList(lastQueryCollectNode);
+ }
+
private List<PlanNode> processSeriesAggregationSource(
SeriesAggregationSourceNode node, DistributionPlanContext context) {
List<TRegionReplicaSet> dataDistribution =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 9b75d9e4a1..20661f7571 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -165,7 +166,8 @@ public enum PlanNodeType {
INTERNAL_BATCH_ACTIVATE_TEMPLATE((short) 68),
INTERNAL_CREATE_MULTI_TIMESERIES((short) 69),
IDENTITY_SINK((short) 70),
- SHUFFLE_SINK((short) 71);
+ SHUFFLE_SINK((short) 71),
+ FILE_AGGREGATION((short) 72);
public static final int BYTES = Short.BYTES;
@@ -356,6 +358,8 @@ public enum PlanNodeType {
return IdentitySinkNode.deserialize(buffer);
case 71:
return ShuffleSinkNode.deserialize(buffer);
+ case 72:
+ return FileAggregationScanNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index cf3fbe02af..245b79f955 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -125,6 +126,10 @@ public abstract class PlanVisitor<R, C> {
return visitSourceNode(node, context);
}
+ public R visitFileAggregationScan(FileAggregationScanNode node, C context) {
+ return visitSourceNode(node, context);
+ }
+
public R visitDeviceView(DeviceViewNode node, C context) {
return visitMultiChildProcess(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
new file mode 100644
index 0000000000..63158d132a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
+
+public class FileAggregationScanNode extends SeriesSourceNode {
+
+ private final PartialPath pathPattern;
+
+ private AggregationDescriptor aggregationDescriptor;
+
+ private final int[] levels;
+
+ public FileAggregationScanNode(
+ PlanNodeId id,
+ PartialPath pathPattern,
+ AggregationDescriptor aggregationDescriptor,
+ int[] levels) {
+ super(id);
+ this.pathPattern = pathPattern;
+ this.aggregationDescriptor = aggregationDescriptor;
+ this.levels = levels;
+ }
+
+ public PartialPath getPathPattern() {
+ return pathPattern;
+ }
+
+ public AggregationDescriptor getAggregationDescriptor() {
+ return aggregationDescriptor;
+ }
+
+ public void setAggregationDescriptor(AggregationDescriptor aggregationDescriptor) {
+ this.aggregationDescriptor = aggregationDescriptor;
+ }
+
+ public int[] getLevels() {
+ return levels;
+ }
+
+ @Override
+ public PartialPath getPartitionPath() {
+ return null;
+ }
+
+ @Override
+ public Filter getPartitionTimeFilter() {
+ return null;
+ }
+
+ @Override
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return null;
+ }
+
+ @Override
+ public void open() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ throw new UnsupportedOperationException("no child is allowed for FileAggregationScanNode");
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new FileAggregationScanNode(
+ getPlanNodeId(), getPathPattern(), getAggregationDescriptor(), getLevels());
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return LAST_QUERY_HEADER_COLUMNS;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer buffer) {
+ PlanNodeType.FILE_AGGREGATION.serialize(buffer);
+ pathPattern.serialize(buffer);
+ aggregationDescriptor.serialize(buffer);
+ ReadWriteIOUtils.write(levels.length, buffer);
+ for (int level : levels) {
+ ReadWriteIOUtils.write(level, buffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.FILE_AGGREGATION.serialize(stream);
+ pathPattern.serialize(stream);
+ aggregationDescriptor.serialize(stream);
+ ReadWriteIOUtils.write(levels.length, stream);
+ for (int level : levels) {
+ ReadWriteIOUtils.write(level, stream);
+ }
+ }
+
+ public static PlanNode deserialize(ByteBuffer buffer) {
+ PartialPath pathPattern = PartialPath.deserialize(buffer);
+ AggregationDescriptor aggregationDescriptor = AggregationDescriptor.deserialize(buffer);
+ int levelsSize = ReadWriteIOUtils.readInt(buffer);
+ int[] levels = new int[levelsSize];
+ for (int i = 0; i < levelsSize; i++) {
+ levels[i] = ReadWriteIOUtils.readInt(buffer);
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+ return new FileAggregationScanNode(planNodeId, pathPattern, aggregationDescriptor, levels);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
index ad1e05ac61..40bb95a7cd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -151,4 +151,56 @@ public class BinaryColumnBuilder implements ColumnBuilder {
private void updateArraysDataSize() {
arraysRetainedSizeInBytes = sizeOf(valueIsNull) + sizeOf(values);
}
+
+ @Override
+ public ColumnBuilder writeInt(int value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+ hasNonNullValue = true;
+ positionCount++;
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeLong(long value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+ hasNonNullValue = true;
+ positionCount++;
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeFloat(float value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+ hasNonNullValue = true;
+ positionCount++;
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeDouble(double value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+ hasNonNullValue = true;
+ positionCount++;
+ return this;
+ }
}