You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:19 UTC

[iotdb] 13/13: use agg instead of last

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8cb7027ed056f37b09d1077a24e14c5732b203c0
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Mar 21 16:45:02 2023 +0800

    use agg instead of last
---
 .../source/FileAggregationScanOperator.java        |  5 +++--
 .../operator/source/FileAggregationScanUtil.java   | 11 +--------
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 16 ++++++++-----
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 15 ++++++-------
 .../plan/planner/distribution/SourceRewriter.java  | 19 ++++++++++++----
 .../plan/node/source/FileAggregationScanNode.java  | 26 +++++++++++++++++-----
 6 files changed, 57 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
index 0f61f0656b..23ed2cc12c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
@@ -22,14 +22,15 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
@@ -51,7 +52,7 @@ public class FileAggregationScanOperator extends AbstractSourceOperator
     this.aggregationScanUtil =
         new FileAggregationScanUtil(
             pathPattern, aggregationDescriptor, levels, new SeriesScanOptions.Builder().build());
-    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
+    this.tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT64));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index 9267d68cb6..4b91a9f6ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -122,20 +121,12 @@ public class FileAggregationScanUtil {
   public TsBlock getAggregationResult(TsBlockBuilder builder) {
     for (Map.Entry<PartialPath, Aggregator> entry : pathToAggregatorMap.entrySet()) {
       builder.getTimeColumnBuilder().writeLong(0L);
-      builder.getValueColumnBuilders()[0].writeBinary(
-          Binary.valueOf(
-              String.format(
-                  "%s(%s)",
-                  aggregationDescriptor.getAggregationFuncName(), entry.getKey().toString())));
 
       Aggregator aggregator = entry.getValue();
       ColumnBuilder[] columnBuilders = new ColumnBuilder[1];
-      columnBuilders[0] = builder.getValueColumnBuilders()[1];
+      columnBuilders[0] = builder.getValueColumnBuilders()[0];
       aggregator.outputResult(columnBuilders);
 
-      builder.getValueColumnBuilders()[2].writeBinary(
-          Binary.valueOf(aggregator.getOutputType()[0].toString()));
-
       builder.declarePosition();
     }
     return builder.build();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 5c03ff0763..35a6801a33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -250,14 +250,20 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         Set<Expression> aggregationExpressions = new HashSet<>();
         FunctionExpression aggregationExpression =
             new FunctionExpression(
-                aggregationMeasurementExpression.getFunctionName(),
-                aggregationMeasurementExpression.getFunctionAttributes(),
-                Collections.singletonList(sourceExpression));
+                "count",
+                new LinkedHashMap<>(),
+                Collections.singletonList(
+                    new TimeSeriesOperand(
+                        new MeasurementPath("root.*.*.*.*.*.*", TSDataType.INT64))));
+        analyzeExpression(analysis, aggregationExpression);
         aggregationExpressions.add(aggregationExpression);
         analysis.setAggregationExpressions(aggregationExpressions);
 
         analysis.setRespDatasetHeader(
-            new DatasetHeader(ColumnHeaderConstant.lastQueryColumnHeaders, true));
+            new DatasetHeader(
+                Collections.singletonList(
+                    new ColumnHeader(aggregationExpression.toString(), TSDataType.INT64)),
+                true));
 
         Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
         sgNameToQueryParamsMap.put("root.iov", Collections.emptyList());
@@ -381,7 +387,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       // fetch partition information
       analyzeDataPartition(analysis, queryStatement, schemaTree);
 
-    } catch (StatementAnalyzeException e) {
+    } catch (StatementAnalyzeException | IllegalPathException e) {
       logger.warn("Meet error when analyzing the query statement: ", e);
       throw new StatementAnalyzeException(
           "Meet error when analyzing the query statement: " + e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 70b470ee42..7a14f750fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -1204,17 +1204,16 @@ public class LogicalPlanBuilder {
         new AggregationDescriptor(
             functionExpression.getFunctionName(),
             AggregationStep.SINGLE,
-            Collections.emptyList(),
+            Collections.singletonList(functionExpression.getExpressions().get(0)),
             Collections.emptyMap());
+    updateTypeProvider(Collections.singletonList(functionExpression));
     this.root =
         new FileAggregationScanNode(
-            context.getQueryId().genPlanNodeId(), pathPattern, aggregationDescriptor, levels);
-
-    ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
-        columnHeader ->
-            context
-                .getTypeProvider()
-                .setType(columnHeader.getColumnName(), columnHeader.getColumnType()));
+            context.getQueryId().genPlanNodeId(),
+            pathPattern,
+            aggregationDescriptor,
+            levels,
+            functionExpression);
     return this;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index c9cfeb1102..376284d2a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -454,17 +454,28 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       return Collections.singletonList(node);
     }
 
-    LastQueryCollectNode lastQueryCollectNode =
-        new LastQueryCollectNode(context.queryContext.getQueryId().genPlanNodeId());
+    CrossSeriesAggregationDescriptor rootAggregationDescriptor =
+        new CrossSeriesAggregationDescriptor(
+            "count",
+            AggregationStep.FINAL,
+            Collections.singletonList(node.getOutputExpression().getExpressions().get(0)),
+            Collections.emptyMap(),
+            node.getOutputExpression().getExpressions().get(0));
+    AggregationNode aggregationNode =
+        new AggregationNode(
+            context.queryContext.getQueryId().genPlanNodeId(),
+            Collections.singletonList(rootAggregationDescriptor),
+            null,
+            Ordering.ASC);
 
     for (TRegionReplicaSet dataRegion : dataDistribution) {
       FileAggregationScanNode split = (FileAggregationScanNode) node.clone();
       split.setAggregationDescriptor(node.getAggregationDescriptor());
       split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
       split.setRegionReplicaSet(dataRegion);
-      lastQueryCollectNode.addChild(split);
+      aggregationNode.addChild(split);
     }
-    return Collections.singletonList(lastQueryCollectNode);
+    return Collections.singletonList(aggregationNode);
   }
 
   private List<PlanNode> processSeriesAggregationSource(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
index 5c997b9db7..ae4b2f6b2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -35,10 +36,9 @@ import com.google.common.collect.ImmutableList;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
-import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-
 public class FileAggregationScanNode extends SeriesSourceNode {
 
   private final PartialPath pathPattern;
@@ -50,15 +50,19 @@ public class FileAggregationScanNode extends SeriesSourceNode {
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
+  private final Expression outputExpression;
+
   public FileAggregationScanNode(
       PlanNodeId id,
       PartialPath pathPattern,
       AggregationDescriptor aggregationDescriptor,
-      int[] levels) {
+      int[] levels,
+      Expression outputExpression) {
     super(id);
     this.pathPattern = pathPattern;
     this.aggregationDescriptor = aggregationDescriptor;
     this.levels = levels;
+    this.outputExpression = outputExpression;
   }
 
   public FileAggregationScanNode(
@@ -66,8 +70,9 @@ public class FileAggregationScanNode extends SeriesSourceNode {
       PartialPath pathPattern,
       AggregationDescriptor aggregationDescriptor,
       int[] levels,
+      Expression outputExpression,
       TRegionReplicaSet regionReplicaSet) {
-    this(id, pathPattern, aggregationDescriptor, levels);
+    this(id, pathPattern, aggregationDescriptor, levels, outputExpression);
     this.regionReplicaSet = regionReplicaSet;
   }
 
@@ -87,6 +92,10 @@ public class FileAggregationScanNode extends SeriesSourceNode {
     return levels;
   }
 
+  public Expression getOutputExpression() {
+    return outputExpression;
+  }
+
   @Override
   public PartialPath getPartitionPath() {
     return null;
@@ -135,12 +144,13 @@ public class FileAggregationScanNode extends SeriesSourceNode {
         getPathPattern(),
         getAggregationDescriptor(),
         getLevels(),
+        getOutputExpression(),
         getRegionReplicaSet());
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return LAST_QUERY_HEADER_COLUMNS;
+    return Collections.singletonList(outputExpression.toString());
   }
 
   @Override
@@ -152,6 +162,7 @@ public class FileAggregationScanNode extends SeriesSourceNode {
     for (int level : levels) {
       ReadWriteIOUtils.write(level, buffer);
     }
+    Expression.serialize(outputExpression, buffer);
   }
 
   @Override
@@ -163,6 +174,7 @@ public class FileAggregationScanNode extends SeriesSourceNode {
     for (int level : levels) {
       ReadWriteIOUtils.write(level, stream);
     }
+    Expression.serialize(outputExpression, stream);
   }
 
   public static PlanNode deserialize(ByteBuffer buffer) {
@@ -173,8 +185,10 @@ public class FileAggregationScanNode extends SeriesSourceNode {
     for (int i = 0; i < levelsSize; i++) {
       levels[i] = ReadWriteIOUtils.readInt(buffer);
     }
+    Expression outputExpression = Expression.deserialize(buffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
-    return new FileAggregationScanNode(planNodeId, pathPattern, aggregationDescriptor, levels);
+    return new FileAggregationScanNode(
+        planNodeId, pathPattern, aggregationDescriptor, levels, outputExpression);
   }
 
   @Override