You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:13 UTC

[iotdb] 07/13: fix bugs & can run

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79e4a1cb5af7945926d0b88b652b636a38c2682a
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Mar 20 21:54:51 2023 +0800

    fix bugs & can run
---
 .../iotdb/commons/partition/DataPartition.java     | 16 +++++++++++
 .../db/engine/querycontext/QueryDataSource.java    |  8 ++++++
 .../operator/source/FileAggregationScanUtil.java   | 12 +++++++--
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  4 +++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  7 +++--
 .../mpp/plan/analyze/GroupByLevelController.java   |  8 ++++--
 .../planner/distribution/ExchangeNodeAdder.java    | 12 +++++----
 .../plan/planner/distribution/SourceRewriter.java  |  3 +--
 .../plan/node/source/FileAggregationScanNode.java  | 31 +++++++++++++++++++---
 .../materializer/TsFileResourceMaterializer.java   |  2 +-
 10 files changed, 84 insertions(+), 19 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index f25f0a928f..b5e851e3ca 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -82,6 +82,22 @@ public class DataPartition extends Partition {
         .collect(Collectors.toList());
   }
 
+  public List<TRegionReplicaSet> getAllDataRegionReplicaSet() {
+    List<TRegionReplicaSet> regionReplicaSet = new ArrayList<>();
+    dataPartitionMap
+        .values()
+        .forEach(
+            tSeriesPartitionSlotMapMap -> {
+              tSeriesPartitionSlotMapMap
+                  .values()
+                  .forEach(
+                      tTimePartitionSlotListMap -> {
+                        tTimePartitionSlotListMap.values().forEach(regionReplicaSet::addAll);
+                      });
+            });
+    return regionReplicaSet;
+  }
+
   public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting(
       String deviceName, List<TTimePartitionSlot> timePartitionSlotList) {
     // A list of data region replica sets will store data in a same time partition.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 8793b2c135..012b243960 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -121,4 +121,12 @@ public class QueryDataSource {
     }
     this.unSeqFileOrderIndex = unSeqFileOrderIndex;
   }
+
+  public void constructOrderIndexes() {
+    int[] unSeqFileOrderIndex = new int[unseqResources.size()];
+    for (int i = 0; i < unseqResources.size(); i++) {
+      unSeqFileOrderIndex[i] = i;
+    }
+    this.unSeqFileOrderIndex = unSeqFileOrderIndex;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index 5a4ef5cb38..12290b4ebc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -50,6 +50,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
 public class FileAggregationScanUtil {
 
   private final PartialPath pathPattern;
@@ -68,6 +70,8 @@ public class FileAggregationScanUtil {
 
   private final SeriesScanOptions scanOptions;
 
+  private boolean isCountStar;
+
   public FileAggregationScanUtil(
       PartialPath pathPattern,
       AggregationDescriptor aggregationDescriptor,
@@ -80,9 +84,13 @@ public class FileAggregationScanUtil {
     this.partialPathPool = new PartialPathPool();
     this.levels = levels;
     this.scanOptions = scanOptions;
+    this.isCountStar =
+        pathPattern.getMeasurement().equals(ONE_LEVEL_PATH_WILDCARD)
+            && aggregationDescriptor.getAggregationType() == TAggregationType.COUNT;
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
+    dataSource.constructOrderIndexes();
     this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
   }
 
@@ -166,7 +174,7 @@ public class FileAggregationScanUtil {
 
     PartialPath groupedPath =
         partialPathPool.getGroupedPath(devicePath, timeseriesMetadata.getMeasurementId());
-    if (pathToAggregatorMap.containsKey(groupedPath)) {
+    if (!pathToAggregatorMap.containsKey(groupedPath)) {
       pathToAggregatorMap.put(
           groupedPath,
           new Aggregator(
@@ -248,7 +256,7 @@ public class FileAggregationScanUtil {
         return rawPathToGroupedPathMap.get(rawPathStr);
       }
       PartialPath groupedPath =
-          GroupByLevelController.groupPathByLevel(devicePath, measurementId, levels);
+          GroupByLevelController.groupPathByLevel(devicePath, measurementId, levels, isCountStar);
       rawPathToGroupedPathMap.put(rawPathStr, groupedPath);
       return groupedPath;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index be471ba3b5..952541dd1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -227,6 +227,10 @@ public class Analysis {
     return dataPartition.getDataRegionReplicaSet(deviceName, null);
   }
 
+  public List<TRegionReplicaSet> getPartitionInfo() {
+    return dataPartition.getAllDataRegionReplicaSet();
+  }
+
   public Statement getStatement() {
     return statement;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index f922ce89dd..5c03ff0763 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -230,6 +230,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       queryStatement.semanticCheck();
 
       if (queryStatement.isGroupByLevel()) {
+        analysis.setStatement(queryStatement);
+
         analysis.setLevels(queryStatement.getGroupByLevelComponent().getLevels());
 
         PartialPath pathPrefix = queryStatement.getFromComponent().getPrefixPaths().get(0);
@@ -258,10 +260,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
             new DatasetHeader(ColumnHeaderConstant.lastQueryColumnHeaders, true));
 
         Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
-        sgNameToQueryParamsMap.put(
-            "root.iov",
-            Collections.singletonList(
-                new DataPartitionQueryParam("root.iov.**", Collections.emptyList(), true, true)));
+        sgNameToQueryParamsMap.put("root.iov", Collections.emptyList());
         analysis.setDataPartitionInfo(partitionFetcher.getDataPartition(sgNameToQueryParamsMap));
 
         return analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
index 632a5c776e..da57edac25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
@@ -174,7 +174,7 @@ public class GroupByLevelController {
   }
 
   public static PartialPath groupPathByLevel(
-      PartialPath rawDevicePath, String measurement, int[] levels) {
+      PartialPath rawDevicePath, String measurement, int[] levels, boolean isCountStar) {
     String[] nodes = Arrays.copyOf(rawDevicePath.getNodes(), rawDevicePath.getNodes().length + 1);
     nodes[nodes.length - 1] = measurement;
 
@@ -193,7 +193,11 @@ public class GroupByLevelController {
         transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
       }
     }
-    transformedNodes.add(nodes[nodes.length - 1]);
+    if (isCountStar) {
+      transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
+    } else {
+      transformedNodes.add(nodes[nodes.length - 1]);
+    }
     return new PartialPath(transformedNodes.toArray(new String[0]));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 56dfda1cd7..b7c243661f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -186,6 +187,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processNoChildSourceNode(node, context);
   }
 
+  @Override
+  public PlanNode visitFileAggregationScan(FileAggregationScanNode node, NodeGroupContext context) {
+    return processNoChildSourceNode(node, context);
+  }
+
   private PlanNode processNoChildSourceNode(SourceNode node, NodeGroupContext context) {
     context.putNodeDistribution(
         node.getPlanNodeId(),
@@ -276,11 +282,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
 
     MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
     List<PlanNode> visitedChildren = new ArrayList<>();
-    node.getChildren()
-        .forEach(
-            child -> {
-              visitedChildren.add(visit(child, context));
-            });
+    node.getChildren().forEach(child -> visitedChildren.add(visit(child, context)));
 
     TRegionReplicaSet dataRegion;
     NodeDistributionType distributionType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index f61891e59c..c9cfeb1102 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -448,8 +448,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   @Override
   public List<PlanNode> visitFileAggregationScan(
       FileAggregationScanNode node, DistributionPlanContext context) {
-    List<TRegionReplicaSet> dataDistribution =
-        analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
+    List<TRegionReplicaSet> dataDistribution = analysis.getPartitionInfo();
     if (dataDistribution.size() == 1) {
       node.setRegionReplicaSet(dataDistribution.get(0));
       return Collections.singletonList(node);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
index 63158d132a..9c1eeaa20c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -45,6 +46,9 @@ public class FileAggregationScanNode extends SeriesSourceNode {
 
   private final int[] levels;
 
+  // The id of DataRegion where the node will run
+  private TRegionReplicaSet regionReplicaSet;
+
   public FileAggregationScanNode(
       PlanNodeId id,
       PartialPath pathPattern,
@@ -56,6 +60,16 @@ public class FileAggregationScanNode extends SeriesSourceNode {
     this.levels = levels;
   }
 
+  public FileAggregationScanNode(
+      PlanNodeId id,
+      PartialPath pathPattern,
+      AggregationDescriptor aggregationDescriptor,
+      int[] levels,
+      TRegionReplicaSet regionReplicaSet) {
+    this(id, pathPattern, aggregationDescriptor, levels);
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
   public PartialPath getPathPattern() {
     return pathPattern;
   }
@@ -83,11 +97,13 @@ public class FileAggregationScanNode extends SeriesSourceNode {
   }
 
   @Override
-  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
 
   @Override
   public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
+    return regionReplicaSet;
   }
 
   @Override
@@ -114,7 +130,11 @@ public class FileAggregationScanNode extends SeriesSourceNode {
   @Override
   public PlanNode clone() {
     return new FileAggregationScanNode(
-        getPlanNodeId(), getPathPattern(), getAggregationDescriptor(), getLevels());
+        getPlanNodeId(),
+        getPathPattern(),
+        getAggregationDescriptor(),
+        getLevels(),
+        getRegionReplicaSet());
   }
 
   @Override
@@ -155,4 +175,9 @@ public class FileAggregationScanNode extends SeriesSourceNode {
     PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
     return new FileAggregationScanNode(planNodeId, pathPattern, aggregationDescriptor, levels);
   }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFileAggregationScan(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java b/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java
index e5e1c0bc77..efdec760ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java
@@ -42,7 +42,7 @@ public class TsFileResourceMaterializer {
 
   public TsFileResource next() {
     if (tsFileResourceSource.hasNextSeqResource(curSeqFileIndex, true)) {
-      TsFileResource nextFile = tsFileResourceSource.getSeqResourceByIndex(curUnSeqFileIndex);
+      TsFileResource nextFile = tsFileResourceSource.getSeqResourceByIndex(curSeqFileIndex);
       curSeqFileIndex++;
       return nextFile;
     } else if (tsFileResourceSource.hasNextUnseqResource(curUnSeqFileIndex)) {