You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:10 UTC
[iotdb] 04/13: finish
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac917e467f041fd72f1a5afb9b6953bd7461c2bb
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Mar 16 16:35:56 2023 +0800
finish
---
.../operator/source/FileAggregationScanUtil.java | 173 +++++++++++++++++++--
.../mpp/plan/analyze/GroupByLevelController.java | 25 +++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 2 +-
.../iotdb/tsfile/read/filter/basic/Filter.java | 8 +
4 files changed, 194 insertions(+), 14 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index be136d531c..2f34285f7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -19,43 +19,190 @@
package org.apache.iotdb.db.mpp.execution.operator.source;
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.plan.analyze.GroupByLevelController;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.materializer.TsFileResourceMaterializer;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
-import java.util.Comparator;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.TreeSet;
public class FileAggregationScanUtil {
- private final Map<PartialPath, List<Aggregator>> pathToAggregatorsMap;
+ private final Map<PartialPath, Aggregator> pathToAggregatorMap;
private final TsFileResourceMaterializer fileResourceMaterializer;
- private final TreeSet<ChunkMetadata> chunkMetadataList =
- new TreeSet<>(
- Comparator.comparingLong(ChunkMetadata::getVersion)
- .thenComparingLong(ChunkMetadata::getOffsetOfChunkHeader));
+ private final Map<PartialPath, List<IChunkMetadata>> chunkMetadataMap;
+
+ private final PartialPathPool partialPathPool;
+
+ private final int[] levels;
+
+ private final SeriesScanOptions scanOptions;
public FileAggregationScanUtil(
- Map<PartialPath, List<Aggregator>> pathToAggregatorsMap, QueryDataSource dataSource) {
- this.pathToAggregatorsMap = pathToAggregatorsMap;
+ Map<PartialPath, Aggregator> pathToAggregatorMap,
+ QueryDataSource dataSource,
+ int[] levels,
+ SeriesScanOptions scanOptions) {
+ this.pathToAggregatorMap = pathToAggregatorMap;
this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
+ this.chunkMetadataMap = new HashMap<>();
+ this.partialPathPool = new PartialPathPool();
+ this.levels = levels;
+ this.scanOptions = scanOptions;
}
public boolean hasNextFile() {
return fileResourceMaterializer.hasNext();
}
- public void consume() {
+ public void consume() throws IOException {
TsFileResource nextFile = fileResourceMaterializer.next();
+ TsFileSequenceReader reader =
+ FileReaderManager.getInstance().get(nextFile.getTsFilePath(), nextFile.isClosed());
+ List<String> allDevices = reader.getAllDevices();
+ for (String device : allDevices) {
+ PartialPath devicePath = partialPathPool.get(device);
+ List<TimeseriesMetadata> timeseriesMetadataMap = reader.getDeviceTimeseriesMetadata(device);
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) {
+ consumeTimeseriesMetadata(devicePath, timeseriesMetadata);
+ }
+ }
+
+ for (Map.Entry<PartialPath, List<IChunkMetadata>> entry : chunkMetadataMap.entrySet()) {
+ PartialPath device = entry.getKey();
+ for (IChunkMetadata chunkMetadata : entry.getValue()) {
+ unpackChunkMetadata(device, chunkMetadata);
+ }
+ }
+ }
+
+ private void unpackChunkMetadata(PartialPath devicePath, IChunkMetadata chunkMetadata)
+ throws IOException {
+ PartialPath groupedPath =
+ partialPathPool.getGroupedPath(devicePath, chunkMetadata.getMeasurementUid());
+
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(chunkMetadata, scanOptions.getGlobalTimeFilter());
+ for (IPageReader pageReader : pageReaderList) {
+ Filter queryFilter = scanOptions.getQueryFilter();
+ Statistics statistics = pageReader.getStatistics();
+
+ if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+ updateAggregationResult(groupedPath, statistics);
+ } else if (queryFilter.allNotSatisfy(statistics)) {
+ // skip
+ } else {
+ pageReader.setFilter(scanOptions.getQueryFilter());
+ updateAggregationResult(groupedPath, pageReader.getAllSatisfiedData());
+ }
+ }
+ }
+
+ private void consumeTimeseriesMetadata(
+ PartialPath devicePath, TimeseriesMetadata timeseriesMetadata) {
+ PartialPath groupedPath =
+ partialPathPool.getGroupedPath(devicePath, timeseriesMetadata.getMeasurementId());
+ if (pathToAggregatorMap.containsKey(groupedPath)) {
+ pathToAggregatorMap.put(
+ groupedPath,
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.COUNT,
+ timeseriesMetadata.getTSDataType(),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true),
+ AggregationStep.SINGLE));
+ }
+
+ Filter queryFilter = scanOptions.getQueryFilter();
+ Statistics statistics = timeseriesMetadata.getStatistics();
+
+ if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+ updateAggregationResult(groupedPath, statistics);
+ } else if (queryFilter.allNotSatisfy(statistics)) {
+ // skip
+ } else {
+ consumeChunkMetadataList(devicePath, groupedPath, timeseriesMetadata.getChunkMetadataList());
+ }
+ }
+
+ private void consumeChunkMetadataList(
+ PartialPath devicePath, PartialPath groupedPath, List<IChunkMetadata> chunkMetadataList) {
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ Filter queryFilter = scanOptions.getQueryFilter();
+ Statistics statistics = chunkMetadata.getStatistics();
+
+ if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+ updateAggregationResult(groupedPath, statistics);
+ } else if (queryFilter.allNotSatisfy(statistics)) {
+ // skip
+ } else {
+ chunkMetadataMap.computeIfAbsent(devicePath, key -> new ArrayList<>()).add(chunkMetadata);
+ }
+ }
+ }
+
+ private void updateAggregationResult(PartialPath groupedPath, Statistics statistics) {}
+
+ private void updateAggregationResult(PartialPath groupedPath, TsBlock tsBlock) {}
+
+ private class PartialPathPool {
+ Map<String, PartialPath> pool;
+ Map<String, PartialPath> rawPathToGroupedPathMap;
+
+ public PartialPathPool() {
+ this.pool = new HashMap<>();
+ this.rawPathToGroupedPathMap = new HashMap<>();
+ }
+
+ public PartialPath get(String pathStr) {
+ if (pool.containsKey(pathStr)) {
+ return pool.get(pathStr);
+ } else {
+ PartialPath path = null;
+ try {
+ path = new PartialPath(pathStr);
+ } catch (IllegalPathException ignored) {
+
+ }
+ pool.put(pathStr, path);
+ return path;
+ }
+ }
- List<ITimeSeriesMetadata> timeSeriesMetadata;
+ public PartialPath getGroupedPath(PartialPath devicePath, String measurementId) {
+ String rawPathStr = devicePath.getDevice().concat(measurementId);
+ if (rawPathToGroupedPathMap.containsKey(rawPathStr)) {
+ return rawPathToGroupedPathMap.get(rawPathStr);
+ }
+ PartialPath groupedPath =
+ GroupByLevelController.groupPathByLevel(devicePath, measurementId, levels);
+ rawPathToGroupedPathMap.put(rawPathStr, groupedPath);
+ return groupedPath;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
index 8641e3542e..632a5c776e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -172,6 +173,30 @@ public class GroupByLevelController {
return groupedPath;
}
+ public static PartialPath groupPathByLevel(
+ PartialPath rawDevicePath, String measurement, int[] levels) {
+ String[] nodes = Arrays.copyOf(rawDevicePath.getNodes(), rawDevicePath.getNodes().length + 1);
+ nodes[nodes.length - 1] = measurement;
+
+ Set<Integer> levelSet = new HashSet<>();
+ for (int level : levels) {
+ levelSet.add(level);
+ }
+
+ List<String> transformedNodes = new ArrayList<>(nodes.length);
+
+ transformedNodes.add(nodes[0]);
+ for (int k = 1; k < nodes.length - 1; k++) {
+ if (levelSet.contains(k)) {
+ transformedNodes.add(nodes[k]);
+ } else {
+ transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
+ }
+ }
+ transformedNodes.add(nodes[nodes.length - 1]);
+ return new PartialPath(transformedNodes.toArray(new String[0]));
+ }
+
public Map<Expression, Set<Expression>> getGroupedExpressionToRawExpressionsMap() {
return groupedExpressionToRawExpressionsMap;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index de57b49e1e..531e41f045 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -1129,7 +1129,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
/* This method will not only deserialize the TimeseriesMetadata, but also all the chunk metadata list meanwhile. */
- private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
+ public List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
Pair<MetadataIndexEntry, Long> metadataIndexPair =
getMetadataAndEndOffset(metadataIndexNode, device, true, true);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
index dc0d479c83..9bee46c445 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
@@ -38,6 +38,14 @@ public interface Filter {
*/
boolean satisfy(Statistics statistics);
+ default boolean allSatisfy(Statistics statistics) {
+ return true;
+ }
+
+ default boolean allNotSatisfy(Statistics statistics) {
+ return false;
+ }
+
/**
* To examine whether the single point(with time and value) is satisfied with the filter.
*