You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/27 01:58:13 UTC
[iotdb] 07/13: fix bugs & can run
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/fileScan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79e4a1cb5af7945926d0b88b652b636a38c2682a
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Mar 20 21:54:51 2023 +0800
fix bugs & can run
---
.../iotdb/commons/partition/DataPartition.java | 16 +++++++++++
.../db/engine/querycontext/QueryDataSource.java | 8 ++++++
.../operator/source/FileAggregationScanUtil.java | 12 +++++++--
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 4 +++
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 7 +++--
.../mpp/plan/analyze/GroupByLevelController.java | 8 ++++--
.../planner/distribution/ExchangeNodeAdder.java | 12 +++++----
.../plan/planner/distribution/SourceRewriter.java | 3 +--
.../plan/node/source/FileAggregationScanNode.java | 31 +++++++++++++++++++---
.../materializer/TsFileResourceMaterializer.java | 2 +-
10 files changed, 84 insertions(+), 19 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index f25f0a928f..b5e851e3ca 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -82,6 +82,22 @@ public class DataPartition extends Partition {
.collect(Collectors.toList());
}
+ public List<TRegionReplicaSet> getAllDataRegionReplicaSet() {
+ List<TRegionReplicaSet> regionReplicaSet = new ArrayList<>();
+ dataPartitionMap
+ .values()
+ .forEach(
+ tSeriesPartitionSlotMapMap -> {
+ tSeriesPartitionSlotMapMap
+ .values()
+ .forEach(
+ tTimePartitionSlotListMap -> {
+ tTimePartitionSlotListMap.values().forEach(regionReplicaSet::addAll);
+ });
+ });
+ return regionReplicaSet;
+ }
+
public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting(
String deviceName, List<TTimePartitionSlot> timePartitionSlotList) {
// A list of data region replica sets will store data in a same time partition.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 8793b2c135..012b243960 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -121,4 +121,12 @@ public class QueryDataSource {
}
this.unSeqFileOrderIndex = unSeqFileOrderIndex;
}
+
+ public void constructOrderIndexes() {
+ int[] unSeqFileOrderIndex = new int[unseqResources.size()];
+ for (int i = 0; i < unseqResources.size(); i++) {
+ unSeqFileOrderIndex[i] = i;
+ }
+ this.unSeqFileOrderIndex = unSeqFileOrderIndex;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
index 5a4ef5cb38..12290b4ebc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -50,6 +50,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
public class FileAggregationScanUtil {
private final PartialPath pathPattern;
@@ -68,6 +70,8 @@ public class FileAggregationScanUtil {
private final SeriesScanOptions scanOptions;
+ private boolean isCountStar;
+
public FileAggregationScanUtil(
PartialPath pathPattern,
AggregationDescriptor aggregationDescriptor,
@@ -80,9 +84,13 @@ public class FileAggregationScanUtil {
this.partialPathPool = new PartialPathPool();
this.levels = levels;
this.scanOptions = scanOptions;
+ this.isCountStar =
+ pathPattern.getMeasurement().equals(ONE_LEVEL_PATH_WILDCARD)
+ && aggregationDescriptor.getAggregationType() == TAggregationType.COUNT;
}
public void initQueryDataSource(QueryDataSource dataSource) {
+ dataSource.constructOrderIndexes();
this.fileResourceMaterializer = new TsFileResourceMaterializer(dataSource);
}
@@ -166,7 +174,7 @@ public class FileAggregationScanUtil {
PartialPath groupedPath =
partialPathPool.getGroupedPath(devicePath, timeseriesMetadata.getMeasurementId());
- if (pathToAggregatorMap.containsKey(groupedPath)) {
+ if (!pathToAggregatorMap.containsKey(groupedPath)) {
pathToAggregatorMap.put(
groupedPath,
new Aggregator(
@@ -248,7 +256,7 @@ public class FileAggregationScanUtil {
return rawPathToGroupedPathMap.get(rawPathStr);
}
PartialPath groupedPath =
- GroupByLevelController.groupPathByLevel(devicePath, measurementId, levels);
+ GroupByLevelController.groupPathByLevel(devicePath, measurementId, levels, isCountStar);
rawPathToGroupedPathMap.put(rawPathStr, groupedPath);
return groupedPath;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index be471ba3b5..952541dd1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -227,6 +227,10 @@ public class Analysis {
return dataPartition.getDataRegionReplicaSet(deviceName, null);
}
+ public List<TRegionReplicaSet> getPartitionInfo() {
+ return dataPartition.getAllDataRegionReplicaSet();
+ }
+
public Statement getStatement() {
return statement;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index f922ce89dd..5c03ff0763 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -230,6 +230,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
queryStatement.semanticCheck();
if (queryStatement.isGroupByLevel()) {
+ analysis.setStatement(queryStatement);
+
analysis.setLevels(queryStatement.getGroupByLevelComponent().getLevels());
PartialPath pathPrefix = queryStatement.getFromComponent().getPrefixPaths().get(0);
@@ -258,10 +260,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
new DatasetHeader(ColumnHeaderConstant.lastQueryColumnHeaders, true));
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- sgNameToQueryParamsMap.put(
- "root.iov",
- Collections.singletonList(
- new DataPartitionQueryParam("root.iov.**", Collections.emptyList(), true, true)));
+ sgNameToQueryParamsMap.put("root.iov", Collections.emptyList());
analysis.setDataPartitionInfo(partitionFetcher.getDataPartition(sgNameToQueryParamsMap));
return analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
index 632a5c776e..da57edac25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
@@ -174,7 +174,7 @@ public class GroupByLevelController {
}
public static PartialPath groupPathByLevel(
- PartialPath rawDevicePath, String measurement, int[] levels) {
+ PartialPath rawDevicePath, String measurement, int[] levels, boolean isCountStar) {
String[] nodes = Arrays.copyOf(rawDevicePath.getNodes(), rawDevicePath.getNodes().length + 1);
nodes[nodes.length - 1] = measurement;
@@ -193,7 +193,11 @@ public class GroupByLevelController {
transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
}
}
- transformedNodes.add(nodes[nodes.length - 1]);
+ if (isCountStar) {
+ transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
+ } else {
+ transformedNodes.add(nodes[nodes.length - 1]);
+ }
return new PartialPath(transformedNodes.toArray(new String[0]));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 56dfda1cd7..b7c243661f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.FileAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -186,6 +187,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processNoChildSourceNode(node, context);
}
+ @Override
+ public PlanNode visitFileAggregationScan(FileAggregationScanNode node, NodeGroupContext context) {
+ return processNoChildSourceNode(node, context);
+ }
+
private PlanNode processNoChildSourceNode(SourceNode node, NodeGroupContext context) {
context.putNodeDistribution(
node.getPlanNodeId(),
@@ -276,11 +282,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
List<PlanNode> visitedChildren = new ArrayList<>();
- node.getChildren()
- .forEach(
- child -> {
- visitedChildren.add(visit(child, context));
- });
+ node.getChildren().forEach(child -> visitedChildren.add(visit(child, context)));
TRegionReplicaSet dataRegion;
NodeDistributionType distributionType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index f61891e59c..c9cfeb1102 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -448,8 +448,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
@Override
public List<PlanNode> visitFileAggregationScan(
FileAggregationScanNode node, DistributionPlanContext context) {
- List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
+ List<TRegionReplicaSet> dataDistribution = analysis.getPartitionInfo();
if (dataDistribution.size() == 1) {
node.setRegionReplicaSet(dataDistribution.get(0));
return Collections.singletonList(node);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
index 63158d132a..9c1eeaa20c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -45,6 +46,9 @@ public class FileAggregationScanNode extends SeriesSourceNode {
private final int[] levels;
+ // The id of DataRegion where the node will run
+ private TRegionReplicaSet regionReplicaSet;
+
public FileAggregationScanNode(
PlanNodeId id,
PartialPath pathPattern,
@@ -56,6 +60,16 @@ public class FileAggregationScanNode extends SeriesSourceNode {
this.levels = levels;
}
+ public FileAggregationScanNode(
+ PlanNodeId id,
+ PartialPath pathPattern,
+ AggregationDescriptor aggregationDescriptor,
+ int[] levels,
+ TRegionReplicaSet regionReplicaSet) {
+ this(id, pathPattern, aggregationDescriptor, levels);
+ this.regionReplicaSet = regionReplicaSet;
+ }
+
public PartialPath getPathPattern() {
return pathPattern;
}
@@ -83,11 +97,13 @@ public class FileAggregationScanNode extends SeriesSourceNode {
}
@Override
- public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
@Override
public TRegionReplicaSet getRegionReplicaSet() {
- return null;
+ return regionReplicaSet;
}
@Override
@@ -114,7 +130,11 @@ public class FileAggregationScanNode extends SeriesSourceNode {
@Override
public PlanNode clone() {
return new FileAggregationScanNode(
- getPlanNodeId(), getPathPattern(), getAggregationDescriptor(), getLevels());
+ getPlanNodeId(),
+ getPathPattern(),
+ getAggregationDescriptor(),
+ getLevels(),
+ getRegionReplicaSet());
}
@Override
@@ -155,4 +175,9 @@ public class FileAggregationScanNode extends SeriesSourceNode {
PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
return new FileAggregationScanNode(planNodeId, pathPattern, aggregationDescriptor, levels);
}
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFileAggregationScan(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java b/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java
index e5e1c0bc77..efdec760ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/materializer/TsFileResourceMaterializer.java
@@ -42,7 +42,7 @@ public class TsFileResourceMaterializer {
public TsFileResource next() {
if (tsFileResourceSource.hasNextSeqResource(curSeqFileIndex, true)) {
- TsFileResource nextFile = tsFileResourceSource.getSeqResourceByIndex(curUnSeqFileIndex);
+ TsFileResource nextFile = tsFileResourceSource.getSeqResourceByIndex(curSeqFileIndex);
curSeqFileIndex++;
return nextFile;
} else if (tsFileResourceSource.hasNextUnseqResource(curUnSeqFileIndex)) {