You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:12 UTC

[iotdb] 06/13: finish FE

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4f70135562b83530bc677b3db52ed1a77a3263d4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Mar 20 16:29:06 2023 +0800

    finish FE
---
 .../commons/partition/DataPartitionQueryParam.java |   6 -
 .../source/FileAggregationScanOperator.java        | 107 ++++++++++++++
 .../operator/source/FileAggregationScanUtil.java   |  58 ++++++--
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  10 ++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  38 +++++
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  25 ++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |   9 ++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  26 ++++
 .../plan/planner/distribution/SourceRewriter.java  |  24 ++++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../plan/node/source/FileAggregationScanNode.java  | 158 +++++++++++++++++++++
 .../common/block/column/BinaryColumnBuilder.java   |  52 +++++++
 13 files changed, 509 insertions(+), 15 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 4ed3cd8dc4..6925fb2191 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -36,12 +36,6 @@ public class DataPartitionQueryParam {
   // (timePartitionSlotList.get(timePartitionSlotList.size() - 1), +oo)
   private boolean needRightAll = false;
 
-  public DataPartitionQueryParam(
-      String devicePath, List<TTimePartitionSlot> timePartitionSlotList) {
-    this.devicePath = devicePath;
-    this.timePartitionSlotList = timePartitionSlotList;
-  }
-
   public DataPartitionQueryParam(
       String devicePath,
       List<TTimePartitionSlot> timePartitionSlotList,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
new file mode 100644
index 0000000000..083332ad9c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.io.IOException;
+
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class FileAggregationScanOperator extends AbstractSourceOperator
+    implements DataSourceOperator, SourceOperator {
+
+  private final FileAggregationScanUtil aggregationScanUtil;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  public FileAggregationScanOperator(
+      OperatorContext context,
+      PlanNodeId sourceId,
+      PartialPath pathPattern,
+      AggregationDescriptor aggregationDescriptor,
+      int[] levels) {
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.aggregationScanUtil =
+        new FileAggregationScanUtil(
+            pathPattern, aggregationDescriptor, levels, new SeriesScanOptions.Builder().build());
+    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
+  }
+
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    aggregationScanUtil.initQueryDataSource(dataSource);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return aggregationScanUtil.hasNextFile();
+  }
+
+  @Override
+  public TsBlock next() {
+    tsBlockBuilder.reset();
+    try {
+      aggregationScanUtil.consume();
+      return aggregationScanUtil.getAggregationResult(tsBlockBuilder);
+    } catch (IOException e) {
+      throw new RuntimeException("Error happened while scanning the file", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index ff12114388..5a4ef5cb38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.plan.analyze.GroupByLevelController;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.materializer.TsFileResourceMaterializer;
@@ -37,8 +37,11 @@ import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -49,9 +52,13 @@ import java.util.Map;
 
 public class FileAggregationScanUtil {
 
+  private final PartialPath pathPattern;
+
+  private final AggregationDescriptor aggregationDescriptor;
+
   private final Map<PartialPath, Aggregator> pathToAggregatorMap;
 
-  private final TsFileResourceMaterializer fileResourceMaterializer;
+  private TsFileResourceMaterializer fileResourceMaterializer;
 
   private final Map<PartialPath, List<IChunkMetadata>> chunkMetadataMap;
 
@@ -62,23 +69,30 @@ public class FileAggregationScanUtil {
   private final SeriesScanOptions scanOptions;
 
   public FileAggregationScanUtil(
-      Map<PartialPath, Aggregator> pathToAggregatorMap,
-      QueryDataSource dataSource,
+      PartialPath pathPattern,
+      AggregationDescriptor aggregationDescriptor,
       int[] levels,
       SeriesScanOptions scanOptions) {
-    this.pathToAggregatorMap = pathToAggregatorMap;
-    this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
+    this.pathPattern = pathPattern;
+    this.aggregationDescriptor = aggregationDescriptor;
+    this.pathToAggregatorMap = new HashMap<>();
     this.chunkMetadataMap = new HashMap<>();
     this.partialPathPool = new PartialPathPool();
     this.levels = levels;
     this.scanOptions = scanOptions;
   }
 
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
+  }
+
   public boolean hasNextFile() {
     return fileResourceMaterializer.hasNext();
   }
 
   public void consume() throws IOException {
+    pathToAggregatorMap.clear();
+
     TsFileResource nextFile = fileResourceMaterializer.next();
     TsFileSequenceReader reader =
         FileReaderManager.getInstance().get(nextFile.getTsFilePath(), nextFile.isClosed());
@@ -99,6 +113,28 @@ public class FileAggregationScanUtil {
     }
   }
 
+  public TsBlock getAggregationResult(TsBlockBuilder builder) {
+    for (Map.Entry<PartialPath, Aggregator> entry : pathToAggregatorMap.entrySet()) {
+      builder.getTimeColumnBuilder().writeLong(0L);
+      builder.getValueColumnBuilders()[0].writeBinary(
+          Binary.valueOf(
+              String.format(
+                  "%s(%s)",
+                  aggregationDescriptor.getAggregationFuncName(), entry.getKey().toString())));
+
+      Aggregator aggregator = entry.getValue();
+      ColumnBuilder[] columnBuilders = new ColumnBuilder[1];
+      columnBuilders[0] = builder.getValueColumnBuilders()[1];
+      aggregator.outputResult(columnBuilders);
+
+      builder.getValueColumnBuilders()[2].writeBinary(
+          Binary.valueOf(aggregator.getOutputType()[0].toString()));
+
+      builder.declarePosition();
+    }
+    return builder.build();
+  }
+
   private void unpackChunkMetadata(PartialPath devicePath, IChunkMetadata chunkMetadata)
       throws IOException {
     PartialPath groupedPath =
@@ -123,6 +159,11 @@ public class FileAggregationScanUtil {
 
   private void consumeTimeseriesMetadata(
       PartialPath devicePath, TimeseriesMetadata timeseriesMetadata) {
+    PartialPath fullPath = devicePath.concatNode(timeseriesMetadata.getMeasurementId());
+    if (!pathPattern.matchFullPath(fullPath)) {
+      return;
+    }
+
     PartialPath groupedPath =
         partialPathPool.getGroupedPath(devicePath, timeseriesMetadata.getMeasurementId());
     if (pathToAggregatorMap.containsKey(groupedPath)) {
@@ -130,12 +171,13 @@ public class FileAggregationScanUtil {
           groupedPath,
           new Aggregator(
               AccumulatorFactory.createAccumulator(
-                  TAggregationType.COUNT,
+                  TAggregationType.valueOf(
+                      aggregationDescriptor.getAggregationFuncName().toUpperCase()),
                   timeseriesMetadata.getTSDataType(),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
-              AggregationStep.SINGLE));
+              aggregationDescriptor.getStep()));
     }
 
     Filter queryFilter = scanOptions.getQueryFilter();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index bf15b2a826..be471ba3b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -83,6 +83,8 @@ public class Analysis {
   // Query Analysis (used in ALIGN BY TIME)
   /////////////////////////////////////////////////////////////////////////////////////////////////
 
+  private int[] levels;
+
   // map from device name to series/aggregation under this device
   private Set<Expression> sourceExpressions;
 
@@ -586,4 +588,12 @@ public class Analysis {
   public Map<NodeRef<Expression>, TSDataType> getExpressionTypes() {
     return expressionTypes;
   }
+
+  public void setLevels(int[] levels) {
+    this.levels = levels;
+  }
+
+  public int[] getLevels() {
+    return levels;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index a6bc60a90e..f922ce89dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -229,6 +229,44 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       // check for semantic errors
       queryStatement.semanticCheck();
 
+      if (queryStatement.isGroupByLevel()) {
+        analysis.setLevels(queryStatement.getGroupByLevelComponent().getLevels());
+
+        PartialPath pathPrefix = queryStatement.getFromComponent().getPrefixPaths().get(0);
+        FunctionExpression aggregationMeasurementExpression =
+            (FunctionExpression)
+                queryStatement.getSelectComponent().getResultColumns().get(0).getExpression();
+        TimeSeriesOperand sourceMeasurementExpression =
+            (TimeSeriesOperand) aggregationMeasurementExpression.getExpressions().get(0);
+
+        Set<Expression> sourceExpressions = new HashSet<>();
+        TimeSeriesOperand sourceExpression =
+            new TimeSeriesOperand(pathPrefix.concatPath(sourceMeasurementExpression.getPath()));
+        sourceExpressions.add(sourceExpression);
+        analysis.setSourceExpressions(sourceExpressions);
+
+        Set<Expression> aggregationExpressions = new HashSet<>();
+        FunctionExpression aggregationExpression =
+            new FunctionExpression(
+                aggregationMeasurementExpression.getFunctionName(),
+                aggregationMeasurementExpression.getFunctionAttributes(),
+                Collections.singletonList(sourceExpression));
+        aggregationExpressions.add(aggregationExpression);
+        analysis.setAggregationExpressions(aggregationExpressions);
+
+        analysis.setRespDatasetHeader(
+            new DatasetHeader(ColumnHeaderConstant.lastQueryColumnHeaders, true));
+
+        Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+        sgNameToQueryParamsMap.put(
+            "root.iov",
+            Collections.singletonList(
+                new DataPartitionQueryParam("root.iov.**", Collections.emptyList(), true, true)));
+        analysis.setDataPartitionInfo(partitionFetcher.getDataPartition(sgNameToQueryParamsMap));
+
+        return analysis;
+      }
+
       // concat path and construct path pattern tree
       PathPatternTree patternTree = new PathPatternTree();
       queryStatement =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index f2ba699208..70b470ee42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
@@ -1192,4 +1193,28 @@ public class LogicalPlanBuilder {
     this.root = new ShowQueriesNode(context.getQueryId().genPlanNodeId(), dataNodeLocation);
     return this;
   }
+
+  public LogicalPlanBuilder planFileAggregation(
+      Set<Expression> sourceExpressions, Set<Expression> aggregationExpressions, int[] levels) {
+    PartialPath pathPattern =
+        ((TimeSeriesOperand) (new ArrayList<>(sourceExpressions).get(0))).getPath();
+    FunctionExpression functionExpression =
+        (FunctionExpression) (new ArrayList<>(aggregationExpressions).get(0));
+    AggregationDescriptor aggregationDescriptor =
+        new AggregationDescriptor(
+            functionExpression.getFunctionName(),
+            AggregationStep.SINGLE,
+            Collections.emptyList(),
+            Collections.emptyMap());
+    this.root =
+        new FileAggregationScanNode(
+            context.getQueryId().genPlanNodeId(), pathPattern, aggregationDescriptor, levels);
+
+    ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
+        columnHeader ->
+            context
+                .getTypeProvider()
+                .setType(columnHeader.getColumnName(), columnHeader.getColumnType()));
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 5e503cb6c6..5c0749d634 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -109,6 +109,15 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
   public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
 
+    if (queryStatement.isGroupByLevel()) {
+      return planBuilder
+          .planFileAggregation(
+              analysis.getSourceExpressions(),
+              analysis.getAggregationExpressions(),
+              analysis.getLevels())
+          .getRoot();
+    }
+
     if (queryStatement.isLastQuery()) {
       return planBuilder
           .planLast(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0e556f45bb..1d3851cb1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -120,6 +120,7 @@ import org.apache.iotdb.db.mpp.execution.operator.sink.ShuffleHelperOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.FileAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
@@ -182,6 +183,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -487,6 +489,30 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return seriesAggregationScanOperator;
   }
 
+  @Override
+  public Operator visitFileAggregationScan(
+      FileAggregationScanNode node, LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                FileAggregationScanOperator.class.getSimpleName());
+    FileAggregationScanOperator fileAggregationScanOperator =
+        new FileAggregationScanOperator(
+            operatorContext,
+            node.getPlanNodeId(),
+            node.getPathPattern(),
+            node.getAggregationDescriptor(),
+            node.getLevels());
+
+    ((DataDriverContext) context.getDriverContext()).addSourceOperator(fileAggregationScanOperator);
+    context.getDriverContext().setInputDriver(true);
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return fileAggregationScanOperator;
+  }
+
   @Override
   public Operator visitSchemaQueryOrderByHeat(
       SchemaQueryOrderByHeatNode node, LocalExecutionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 22efafdb10..f61891e59c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
@@ -444,6 +445,29 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return processSeriesAggregationSource(node, context);
   }
 
+  @Override
+  public List<PlanNode> visitFileAggregationScan(
+      FileAggregationScanNode node, DistributionPlanContext context) {
+    List<TRegionReplicaSet> dataDistribution =
+        analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
+    if (dataDistribution.size() == 1) {
+      node.setRegionReplicaSet(dataDistribution.get(0));
+      return Collections.singletonList(node);
+    }
+
+    LastQueryCollectNode lastQueryCollectNode =
+        new LastQueryCollectNode(context.queryContext.getQueryId().genPlanNodeId());
+
+    for (TRegionReplicaSet dataRegion : dataDistribution) {
+      FileAggregationScanNode split = (FileAggregationScanNode) node.clone();
+      split.setAggregationDescriptor(node.getAggregationDescriptor());
+      split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+      split.setRegionReplicaSet(dataRegion);
+      lastQueryCollectNode.addChild(split);
+    }
+    return Collections.singletonList(lastQueryCollectNode);
+  }
+
   private List<PlanNode> processSeriesAggregationSource(
       SeriesAggregationSourceNode node, DistributionPlanContext context) {
     List<TRegionReplicaSet> dataDistribution =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 9b75d9e4a1..20661f7571 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -165,7 +166,8 @@ public enum PlanNodeType {
   INTERNAL_BATCH_ACTIVATE_TEMPLATE((short) 68),
   INTERNAL_CREATE_MULTI_TIMESERIES((short) 69),
   IDENTITY_SINK((short) 70),
-  SHUFFLE_SINK((short) 71);
+  SHUFFLE_SINK((short) 71),
+  FILE_AGGREGATION((short) 72);
 
   public static final int BYTES = Short.BYTES;
 
@@ -356,6 +358,8 @@ public enum PlanNodeType {
         return IdentitySinkNode.deserialize(buffer);
       case 71:
         return ShuffleSinkNode.deserialize(buffer);
+      case 72:
+        return FileAggregationScanNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index cf3fbe02af..245b79f955 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -125,6 +126,10 @@ public abstract class PlanVisitor<R, C> {
     return visitSourceNode(node, context);
   }
 
+  public R visitFileAggregationScan(FileAggregationScanNode node, C context) {
+    return visitSourceNode(node, context);
+  }
+
   public R visitDeviceView(DeviceViewNode node, C context) {
     return visitMultiChildProcess(node, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
new file mode 100644
index 0000000000..63158d132a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
+
+public class FileAggregationScanNode extends SeriesSourceNode {
+
+  private final PartialPath pathPattern;
+
+  private AggregationDescriptor aggregationDescriptor;
+
+  private final int[] levels;
+
+  public FileAggregationScanNode(
+      PlanNodeId id,
+      PartialPath pathPattern,
+      AggregationDescriptor aggregationDescriptor,
+      int[] levels) {
+    super(id);
+    this.pathPattern = pathPattern;
+    this.aggregationDescriptor = aggregationDescriptor;
+    this.levels = levels;
+  }
+
+  public PartialPath getPathPattern() {
+    return pathPattern;
+  }
+
+  public AggregationDescriptor getAggregationDescriptor() {
+    return aggregationDescriptor;
+  }
+
+  public void setAggregationDescriptor(AggregationDescriptor aggregationDescriptor) {
+    this.aggregationDescriptor = aggregationDescriptor;
+  }
+
+  public int[] getLevels() {
+    return levels;
+  }
+
+  @Override
+  public PartialPath getPartitionPath() {
+    return null;
+  }
+
+  @Override
+  public Filter getPartitionTimeFilter() {
+    return null;
+  }
+
+  @Override
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    throw new UnsupportedOperationException("no child is allowed for FileAggregationScanNode");
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new FileAggregationScanNode(
+        getPlanNodeId(), getPathPattern(), getAggregationDescriptor(), getLevels());
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return LAST_QUERY_HEADER_COLUMNS;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer buffer) {
+    PlanNodeType.FILE_AGGREGATION.serialize(buffer);
+    pathPattern.serialize(buffer);
+    aggregationDescriptor.serialize(buffer);
+    ReadWriteIOUtils.write(levels.length, buffer);
+    for (int level : levels) {
+      ReadWriteIOUtils.write(level, buffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.FILE_AGGREGATION.serialize(stream);
+    pathPattern.serialize(stream);
+    aggregationDescriptor.serialize(stream);
+    ReadWriteIOUtils.write(levels.length, stream);
+    for (int level : levels) {
+      ReadWriteIOUtils.write(level, stream);
+    }
+  }
+
+  public static PlanNode deserialize(ByteBuffer buffer) {
+    PartialPath pathPattern = PartialPath.deserialize(buffer);
+    AggregationDescriptor aggregationDescriptor = AggregationDescriptor.deserialize(buffer);
+    int levelsSize = ReadWriteIOUtils.readInt(buffer);
+    int[] levels = new int[levelsSize];
+    for (int i = 0; i < levelsSize; i++) {
+      levels[i] = ReadWriteIOUtils.readInt(buffer);
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+    return new FileAggregationScanNode(planNodeId, pathPattern, aggregationDescriptor, levels);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
index ad1e05ac61..40bb95a7cd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -151,4 +151,56 @@ public class BinaryColumnBuilder implements ColumnBuilder {
   private void updateArraysDataSize() {
     arraysRetainedSizeInBytes = sizeOf(valueIsNull) + sizeOf(values);
   }
+
+  @Override
+  public ColumnBuilder writeInt(int value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+    hasNonNullValue = true;
+    positionCount++;
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeLong(long value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+    hasNonNullValue = true;
+    positionCount++;
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeFloat(float value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+    hasNonNullValue = true;
+    positionCount++;
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeDouble(double value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = Binary.valueOf(String.valueOf(value));
+
+    hasNonNullValue = true;
+    positionCount++;
+    return this;
+  }
 }