You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:10 UTC

[iotdb] 04/13: finish

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ac917e467f041fd72f1a5afb9b6953bd7461c2bb
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Mar 16 16:35:56 2023 +0800

    finish
---
 .../operator/source/FileAggregationScanUtil.java   | 173 +++++++++++++++++++--
 .../mpp/plan/analyze/GroupByLevelController.java   |  25 +++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |   2 +-
 .../iotdb/tsfile/read/filter/basic/Filter.java     |   8 +
 4 files changed, 194 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index be136d531c..2f34285f7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -19,43 +19,190 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.plan.analyze.GroupByLevelController;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.materializer.TsFileResourceMaterializer;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
 
-import java.util.Comparator;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeSet;
 
 public class FileAggregationScanUtil {
 
-  private final Map<PartialPath, List<Aggregator>> pathToAggregatorsMap;
+  private final Map<PartialPath, Aggregator> pathToAggregatorMap;
 
   private final TsFileResourceMaterializer fileResourceMaterializer;
 
-  private final TreeSet<ChunkMetadata> chunkMetadataList =
-      new TreeSet<>(
-          Comparator.comparingLong(ChunkMetadata::getVersion)
-              .thenComparingLong(ChunkMetadata::getOffsetOfChunkHeader));
+  private final Map<PartialPath, List<IChunkMetadata>> chunkMetadataMap;
+
+  private final PartialPathPool partialPathPool;
+
+  private final int[] levels;
+
+  private final SeriesScanOptions scanOptions;
 
   public FileAggregationScanUtil(
-      Map<PartialPath, List<Aggregator>> pathToAggregatorsMap, QueryDataSource dataSource) {
-    this.pathToAggregatorsMap = pathToAggregatorsMap;
+      Map<PartialPath, Aggregator> pathToAggregatorMap,
+      QueryDataSource dataSource,
+      int[] levels,
+      SeriesScanOptions scanOptions) {
+    this.pathToAggregatorMap = pathToAggregatorMap;
     this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
+    this.chunkMetadataMap = new HashMap<>();
+    this.partialPathPool = new PartialPathPool();
+    this.levels = levels;
+    this.scanOptions = scanOptions;
   }
 
   public boolean hasNextFile() {
     return fileResourceMaterializer.hasNext();
   }
 
-  public void consume() {
+  public void consume() throws IOException {
     TsFileResource nextFile = fileResourceMaterializer.next();
+    TsFileSequenceReader reader =
+        FileReaderManager.getInstance().get(nextFile.getTsFilePath(), nextFile.isClosed());
+    List<String> allDevices = reader.getAllDevices();
+    for (String device : allDevices) {
+      PartialPath devicePath = partialPathPool.get(device);
+      List<TimeseriesMetadata> timeseriesMetadataMap = reader.getDeviceTimeseriesMetadata(device);
+      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) {
+        consumeTimeseriesMetadata(devicePath, timeseriesMetadata);
+      }
+    }
+
+    for (Map.Entry<PartialPath, List<IChunkMetadata>> entry : chunkMetadataMap.entrySet()) {
+      PartialPath device = entry.getKey();
+      for (IChunkMetadata chunkMetadata : entry.getValue()) {
+        unpackChunkMetadata(device, chunkMetadata);
+      }
+    }
+  }
+
+  private void unpackChunkMetadata(PartialPath devicePath, IChunkMetadata chunkMetadata)
+      throws IOException {
+    PartialPath groupedPath =
+        partialPathPool.getGroupedPath(devicePath, chunkMetadata.getMeasurementUid());
+
+    List<IPageReader> pageReaderList =
+        FileLoaderUtils.loadPageReaderList(chunkMetadata, scanOptions.getGlobalTimeFilter());
+    for (IPageReader pageReader : pageReaderList) {
+      Filter queryFilter = scanOptions.getQueryFilter();
+      Statistics statistics = pageReader.getStatistics();
+
+      if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+        updateAggregationResult(groupedPath, statistics);
+      } else if (queryFilter.allNotSatisfy(statistics)) {
+        // skip
+      } else {
+        pageReader.setFilter(scanOptions.getQueryFilter());
+        updateAggregationResult(groupedPath, pageReader.getAllSatisfiedData());
+      }
+    }
+  }
+
+  private void consumeTimeseriesMetadata(
+      PartialPath devicePath, TimeseriesMetadata timeseriesMetadata) {
+    PartialPath groupedPath =
+        partialPathPool.getGroupedPath(devicePath, timeseriesMetadata.getMeasurementId());
+    if (pathToAggregatorMap.containsKey(groupedPath)) {
+      pathToAggregatorMap.put(
+          groupedPath,
+          new Aggregator(
+              AccumulatorFactory.createAccumulator(
+                  TAggregationType.COUNT,
+                  timeseriesMetadata.getTSDataType(),
+                  Collections.emptyList(),
+                  Collections.emptyMap(),
+                  true),
+              AggregationStep.SINGLE));
+    }
+
+    Filter queryFilter = scanOptions.getQueryFilter();
+    Statistics statistics = timeseriesMetadata.getStatistics();
+
+    if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+      updateAggregationResult(groupedPath, statistics);
+    } else if (queryFilter.allNotSatisfy(statistics)) {
+      // skip
+    } else {
+      consumeChunkMetadataList(devicePath, groupedPath, timeseriesMetadata.getChunkMetadataList());
+    }
+  }
+
+  private void consumeChunkMetadataList(
+      PartialPath devicePath, PartialPath groupedPath, List<IChunkMetadata> chunkMetadataList) {
+    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      Filter queryFilter = scanOptions.getQueryFilter();
+      Statistics statistics = chunkMetadata.getStatistics();
+
+      if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+        updateAggregationResult(groupedPath, statistics);
+      } else if (queryFilter.allNotSatisfy(statistics)) {
+        // skip
+      } else {
+        chunkMetadataMap.computeIfAbsent(devicePath, key -> new ArrayList<>()).add(chunkMetadata);
+      }
+    }
+  }
+
+  private void updateAggregationResult(PartialPath groupedPath, Statistics statistics) {}
+
+  private void updateAggregationResult(PartialPath groupedPath, TsBlock tsBlock) {}
+
+  private class PartialPathPool {
+    Map<String, PartialPath> pool;
+    Map<String, PartialPath> rawPathToGroupedPathMap;
+
+    public PartialPathPool() {
+      this.pool = new HashMap<>();
+      this.rawPathToGroupedPathMap = new HashMap<>();
+    }
+
+    public PartialPath get(String pathStr) {
+      if (pool.containsKey(pathStr)) {
+        return pool.get(pathStr);
+      } else {
+        PartialPath path = null;
+        try {
+          path = new PartialPath(pathStr);
+        } catch (IllegalPathException ignored) {
+
+        }
+        pool.put(pathStr, path);
+        return path;
+      }
+    }
 
-    List<ITimeSeriesMetadata> timeSeriesMetadata;
+    public PartialPath getGroupedPath(PartialPath devicePath, String measurementId) {
+      String rawPathStr = devicePath.getDevice().concat(measurementId);
+      if (rawPathToGroupedPathMap.containsKey(rawPathStr)) {
+        return rawPathToGroupedPathMap.get(rawPathStr);
+      }
+      PartialPath groupedPath =
+          GroupByLevelController.groupPathByLevel(devicePath, measurementId, levels);
+      rawPathToGroupedPathMap.put(rawPathStr, groupedPath);
+      return groupedPath;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
index 8641e3542e..632a5c776e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -172,6 +173,30 @@ public class GroupByLevelController {
     return groupedPath;
   }
 
+  public static PartialPath groupPathByLevel(
+      PartialPath rawDevicePath, String measurement, int[] levels) {
+    String[] nodes = Arrays.copyOf(rawDevicePath.getNodes(), rawDevicePath.getNodes().length + 1);
+    nodes[nodes.length - 1] = measurement;
+
+    Set<Integer> levelSet = new HashSet<>();
+    for (int level : levels) {
+      levelSet.add(level);
+    }
+
+    List<String> transformedNodes = new ArrayList<>(nodes.length);
+
+    transformedNodes.add(nodes[0]);
+    for (int k = 1; k < nodes.length - 1; k++) {
+      if (levelSet.contains(k)) {
+        transformedNodes.add(nodes[k]);
+      } else {
+        transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
+      }
+    }
+    transformedNodes.add(nodes[nodes.length - 1]);
+    return new PartialPath(transformedNodes.toArray(new String[0]));
+  }
+
   public Map<Expression, Set<Expression>> getGroupedExpressionToRawExpressionsMap() {
     return groupedExpressionToRawExpressionsMap;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index de57b49e1e..531e41f045 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -1129,7 +1129,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   }
 
   /* This method will not only deserialize the TimeseriesMetadata, but also all the chunk metadata list meanwhile. */
-  private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
+  public List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
     MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
     Pair<MetadataIndexEntry, Long> metadataIndexPair =
         getMetadataAndEndOffset(metadataIndexNode, device, true, true);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
index dc0d479c83..9bee46c445 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
@@ -38,6 +38,14 @@ public interface Filter {
    */
   boolean satisfy(Statistics statistics);
 
+  default boolean allSatisfy(Statistics statistics) {
+    return true;
+  }
+
+  default boolean allNotSatisfy(Statistics statistics) {
+    return false;
+  }
+
   /**
    * To examine whether the single point(with time and value) is satisfied with the filter.
    *