You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/01/05 13:22:08 UTC
[iotdb] branch master updated: [IOTDB-5364] Refactor Count TimeSeries Group By Level based on SchemaReader (#8759)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new afc58a0ad3 [IOTDB-5364] Refactor Count TimeSeries Group By Level based on SchemaReader (#8759)
afc58a0ad3 is described below
commit afc58a0ad3362a1124804deaa04086daef6d3965
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu Jan 5 21:22:02 2023 +0800
[IOTDB-5364] Refactor Count TimeSeries Group By Level based on SchemaReader (#8759)
[IOTDB-5364] Refactor Count TimeSeries Group By Level based on SchemaReader (#8759)
---
.../iotdb/db/it/schema/IoTDBMetadataFetchIT.java | 29 +----
.../schemaregion/rocksdb/RSchemaRegion.java | 58 ---------
.../metadata/tagSchemaRegion/TagSchemaRegion.java | 18 ---
.../iotdb/db/metadata/mtree/IMTreeBelowSG.java | 11 --
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 32 -----
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 32 -----
.../counter/MeasurementGroupByLevelCounter.java | 124 -------------------
.../db/metadata/schemaregion/ISchemaRegion.java | 26 ----
.../schemaregion/SchemaRegionMemoryImpl.java | 28 -----
.../schemaregion/SchemaRegionSchemaFileImpl.java | 28 -----
...or.java => CountGroupByLevelMergeOperator.java} | 80 ++++++------
.../operator/schema/CountMergeOperator.java | 36 ------
.../schema/LevelTimeSeriesCountOperator.java | 103 ++++++++++------
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 7 +-
.../iotdb/db/metadata/mtree/MTreeBelowSGTest.java | 71 -----------
.../schemaRegion/SchemaRegionBasicTest.java | 46 +++----
.../schemaRegion/SchemaRegionTestUtil.java | 30 +++++
...ava => CountGroupByLevelMergeOperatorTest.java} | 135 ++++++++++++++-------
.../operator/schema/SchemaCountOperatorTest.java | 108 +++++++++++++----
19 files changed, 335 insertions(+), 667 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBMetadataFetchIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBMetadataFetchIT.java
index a5fadb78f0..bd410f12b7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBMetadataFetchIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBMetadataFetchIT.java
@@ -509,31 +509,10 @@ public class IoTDBMetadataFetchIT {
};
Set<String>[] standards =
new Set[] {
- new HashSet<>(
- Arrays.asList(
- "root.ln,0,", "root.ln1,0,", "root.ln2,0,", "root.sg,1,", "root.sg1,1,")),
- new HashSet<>(
- Arrays.asList(
- "root.ln.wf01.wt01,0,",
- "root.ln.wf01.wt02,0,",
- "root.ln1.wf01.wt01,0,",
- "root.ln2.wf01.wt01,0,",
- "root.sg.d.status,1,",
- "root.sg1.d.status,0,")),
- new HashSet<>(
- Arrays.asList(
- "root.ln.wf01,0,",
- "root.ln1.wf01,0,",
- "root.ln2.wf01,0,",
- "root.sg.d,1,",
- "root.sg1.d,1,")),
- new HashSet<>(
- Arrays.asList(
- "root.ln.wf01,0,",
- "root.ln1.wf01,0,",
- "root.ln2.wf01,0,",
- "root.sg.d,0,",
- "root.sg1.d,0,")),
+ new HashSet<>(Arrays.asList("root.sg,1,", "root.sg1,1,")),
+ new HashSet<>(Collections.singletonList("root.sg.d.status,1,")),
+ new HashSet<>(Arrays.asList("root.sg.d,1,", "root.sg1.d,1,")),
+ Collections.emptySet(),
};
for (int n = 0; n < sqls.length; n++) {
String sql = sqls[n];
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index e145067685..e1fb5b1da7 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -826,7 +826,6 @@ public class RSchemaRegion implements ISchemaRegion {
return atomicInteger.get();
}
- @Override
public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
Map<PartialPath, Long> result = new ConcurrentHashMap<>();
@@ -852,63 +851,6 @@ public class RSchemaRegion implements ISchemaRegion {
return result;
}
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- String key,
- String value,
- boolean isContains)
- throws MetadataException {
- Map<PartialPath, Long> result = new ConcurrentHashMap<>();
- Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> measurementPathsAndTags =
- getMatchedMeasurementPathWithTags(pathPattern.getNodes());
- BiFunction<byte[], byte[], Boolean> function;
- if (!measurementPathsAndTags.isEmpty()) {
- function =
- (a, b) -> {
- String k = new String(a);
- String partialName = splitToPartialNameByLevel(k, level);
- if (partialName != null) {
- PartialPath path = null;
- try {
- path = new PartialPath(partialName);
- } catch (IllegalPathException e) {
- logger.warn(e.getMessage());
- }
- if (!measurementPathsAndTags.keySet().contains(partialName)) {
- result.put(path, result.get(path));
- } else {
- result.putIfAbsent(path, 0L);
- result.put(path, result.get(path) + 1);
- }
- }
- return true;
- };
- } else {
- function =
- (a, b) -> {
- String k = new String(a);
- String partialName = splitToPartialNameByLevel(k, level);
- if (partialName != null) {
- PartialPath path = null;
- try {
- path = new PartialPath(partialName);
- } catch (IllegalPathException e) {
- logger.warn(e.getMessage());
- }
- result.putIfAbsent(path, 0L);
- }
- return true;
- };
- }
- traverseOutcomeBasins(
- pathPattern.getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
-
- return result;
- }
-
private String splitToPartialNameByLevel(String innerName, int level) {
StringBuilder stringBuilder = new StringBuilder(ROOT_STRING);
boolean currentIsFlag;
diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 9955502d6f..b95cb02a6b 100644
--- a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -335,24 +335,6 @@ public class TagSchemaRegion implements ISchemaRegion {
throw new UnsupportedOperationException("deleteTimeseriesInBlackList");
}
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
- throw new UnsupportedOperationException("getMeasurementCountGroupByLevel");
- }
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- String key,
- String value,
- boolean isContains)
- throws MetadataException {
- throw new UnsupportedOperationException("getMeasurementCountGroupByLevel");
- }
-
@Override
public List<PartialPath> getNodesListInGivenLevel(
PartialPath pathPattern, int nodeLevel, boolean isPrefixMatch) throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
index 143a1451ac..d68eba0312 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
@@ -196,17 +196,6 @@ public interface IMTreeBelowSG {
List<PartialPath> getNodesListInGivenLevel(
PartialPath pathPattern, int nodeLevel, boolean isPrefixMatch) throws MetadataException;
- Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException;
-
- Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- List<String> timeseries,
- boolean hasTag)
- throws MetadataException;
-
/**
* Get node by the path
*
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 5d4dce01cf..c0ffbc9e69 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.db.metadata.mtree.traverser.collector.EntityCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.MNodeCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.MeasurementCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.counter.CounterTraverser;
-import org.apache.iotdb.db.metadata.mtree.traverser.counter.MeasurementGroupByLevelCounter;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowTimeSeriesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.result.ShowDevicesResult;
@@ -95,7 +94,6 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCAR
* <li>Interfaces for Device info Query
* <li>Interfaces for timeseries, measurement and schema info Query
* <li>Interfaces for Level Node info Query
- * <li>Interfaces and Implementation for metadata count
* </ol>
* <li>Interfaces and Implementation for MNode Query
* <li>Interfaces and Implementation for Template check
@@ -838,36 +836,6 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
}
// endregion
- // region Interfaces and Implementation for metadata count
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
- MeasurementGroupByLevelCounter counter =
- new MeasurementGroupByLevelCounter(storageGroupMNode, pathPattern, store, level);
- counter.setPrefixMatch(isPrefixMatch);
- counter.traverse();
- return counter.getResult();
- }
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- List<String> timeseries,
- boolean hasTag)
- throws MetadataException {
- MeasurementGroupByLevelCounter counter =
- new MeasurementGroupByLevelCounter(
- storageGroupMNode, pathPattern, store, level, timeseries, hasTag);
- counter.setPrefixMatch(isPrefixMatch);
- counter.traverse();
- return counter.getResult();
- }
-
- // endregion
-
// endregion
// region Interfaces and Implementation for MNode Query
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 23d3d94597..fbd357c1d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.db.metadata.mtree.traverser.collector.EntityCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.MNodeCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.MeasurementCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.counter.CounterTraverser;
-import org.apache.iotdb.db.metadata.mtree.traverser.counter.MeasurementGroupByLevelCounter;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowTimeSeriesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.result.ShowDevicesResult;
@@ -95,7 +94,6 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCAR
* <li>Interfaces for Device info Query
* <li>Interfaces for timeseries, measurement and schema info Query
* <li>Interfaces for Level Node info Query
- * <li>Interfaces and Implementation for metadata count
* </ol>
* <li>Interfaces and Implementation for MNode Query
* <li>Interfaces and Implementation for Template check
@@ -782,36 +780,6 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
}
// endregion
- // region Interfaces and Implementation for metadata count
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
- MeasurementGroupByLevelCounter counter =
- new MeasurementGroupByLevelCounter(storageGroupMNode, pathPattern, store, level);
- counter.setPrefixMatch(isPrefixMatch);
- counter.traverse();
- return counter.getResult();
- }
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- List<String> timeseries,
- boolean hasTag)
- throws MetadataException {
- MeasurementGroupByLevelCounter counter =
- new MeasurementGroupByLevelCounter(
- storageGroupMNode, pathPattern, store, level, timeseries, hasTag);
- counter.setPrefixMatch(isPrefixMatch);
- counter.traverse();
- return counter.getResult();
- }
-
- // endregion
-
// endregion
// region Interfaces and Implementation for MNode Query
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/counter/MeasurementGroupByLevelCounter.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/counter/MeasurementGroupByLevelCounter.java
deleted file mode 100644
index 698905d8d6..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/counter/MeasurementGroupByLevelCounter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.metadata.mtree.traverser.counter;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.metadata.mnode.IMNode;
-import org.apache.iotdb.db.metadata.mtree.store.IMTreeStore;
-import org.apache.iotdb.db.metadata.mtree.traverser.Traverser;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MeasurementGroupByLevelCounter extends Traverser {
-
- // level query option
- private int groupByLevel;
-
- private Map<PartialPath, Long> result = new HashMap<>();
- private List<String> timeseries = new ArrayList<>();
- private boolean hasTag = false;
-
- // path representing current branch and matching the pattern and level
- private PartialPath path;
-
- public MeasurementGroupByLevelCounter(
- IMNode startNode, PartialPath path, IMTreeStore store, int groupByLevel)
- throws MetadataException {
- super(startNode, path, store);
- this.groupByLevel = groupByLevel;
- checkLevelAboveSG();
- }
-
- public MeasurementGroupByLevelCounter(
- IMNode startNode,
- PartialPath path,
- IMTreeStore store,
- int groupByLevel,
- List<String> timeseries,
- boolean hasTag)
- throws MetadataException {
- super(startNode, path, store);
- this.groupByLevel = groupByLevel;
- this.timeseries = timeseries;
- this.hasTag = hasTag;
- checkLevelAboveSG();
- }
-
- /**
- * The traverser may start traversing from a storageGroupMNode, which is an InternalMNode of the
- * whole MTree.
- */
- private void checkLevelAboveSG() {
- if (groupByLevel >= startLevel) {
- return;
- }
- IMNode parent = startNode.getParent();
- int level = startLevel;
- while (parent != null) {
- level--;
- if (level == groupByLevel) {
- path = parent.getPartialPath();
- result.putIfAbsent(path, 0L);
- break;
- }
- parent = parent.getParent();
- }
- }
-
- @Override
- protected boolean processInternalMatchedMNode(IMNode node, int idx, int level)
- throws MetadataException {
- if (level == groupByLevel) {
- path = node.getPartialPath();
- result.putIfAbsent(path, 0L);
- }
- return false;
- }
-
- @Override
- protected boolean processFullMatchedMNode(IMNode node, int idx, int level)
- throws MetadataException {
- if (level == groupByLevel) {
- path = node.getPartialPath();
- result.putIfAbsent(path, 0L);
- }
- if (!node.isMeasurement()) {
- return false;
- }
- if (hasTag && !timeseries.contains(node.getFullPath())) {
- return true;
- }
- if (level >= groupByLevel) {
- result.put(path, result.get(path) + 1);
- }
- return true;
- }
-
- public Map<PartialPath, Long> getResult() {
- return result;
- }
-
- public void setResult(Map<PartialPath, Long> result) {
- this.result = result;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index e32dbc1390..54df443754 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -60,7 +60,6 @@ import java.util.Set;
* <li>Interfaces for Timeseries operation
* <li>Interfaces for metadata info Query
* <ol>
- * <li>Interfaces for metadata count
* <li>Interfaces for level Node info Query
* <li>Interfaces for Entity/Device info Query
* <li>Interfaces for timeseries, measurement and schema info Query
@@ -174,31 +173,6 @@ public interface ISchemaRegion {
// region Interfaces for metadata info Query
- // region Interfaces for metadata count
-
- /**
- * The measurements will be grouped by the node in given level and then counted for each group. If
- * no measurements found, but the path is contained in the group, then this path will also be
- * returned with measurements count zero.
- *
- * @param pathPattern
- * @param level the level you want to group by
- * @param isPrefixMatch using pathPattern as prefix matched path if set true
- * @return return a map from PartialPath to the count of matched measurements
- */
- Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException;
-
- Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- String key,
- String value,
- boolean isContains)
- throws MetadataException;
- // endregion
-
// region Interfaces for level Node info Query
/**
* Get paths of nodes in given level and matching the pathPattern.
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index df606c0517..dc3530ef79 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -112,7 +112,6 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARA
* <li>Interfaces for auto create device
* <li>Interfaces for metadata info Query
* <ol>
- * <li>Interfaces for metadata count
* <li>Interfaces for level Node info Query
* <li>Interfaces for Entity/Device info Query
* <li>Interfaces for timeseries, measurement and schema info Query
@@ -881,33 +880,6 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
// region Interfaces for metadata info Query
- // region Interfaces for metadata count
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
- return mtree.getMeasurementCountGroupByLevel(pathPattern, level, isPrefixMatch);
- }
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- String key,
- String value,
- boolean isContains)
- throws MetadataException {
- return mtree.getMeasurementCountGroupByLevel(
- pathPattern,
- level,
- isPrefixMatch,
- tagManager.getMatchedTimeseriesInIndex(key, value, isContains),
- true);
- }
-
- // endregion
-
// region Interfaces for level Node info Query
@Override
public List<PartialPath> getNodesListInGivenLevel(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 9ad3184e8b..1300ee88e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -109,7 +109,6 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARA
* <li>Interfaces for auto create device
* <li>Interfaces for metadata info Query
* <ol>
- * <li>Interfaces for metadata count
* <li>Interfaces for level Node info Query
* <li>Interfaces for Entity/Device info Query
* <li>Interfaces for timeseries, measurement and schema info Query
@@ -940,33 +939,6 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
// region Interfaces for metadata info Query
- // region Interfaces for metadata count
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
- return mtree.getMeasurementCountGroupByLevel(pathPattern, level, isPrefixMatch);
- }
-
- @Override
- public Map<PartialPath, Long> getMeasurementCountGroupByLevel(
- PartialPath pathPattern,
- int level,
- boolean isPrefixMatch,
- String key,
- String value,
- boolean isContains)
- throws MetadataException {
- return mtree.getMeasurementCountGroupByLevel(
- pathPattern,
- level,
- isPrefixMatch,
- tagManager.getMatchedTimeseriesInIndex(key, value, isContains),
- true);
- }
-
- // endregion
-
// region Interfaces for level Node info Query
@Override
public List<PartialPath> getNodesListInGivenLevel(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
index 32f215716e..6fc1ef5fb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.schema;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -31,7 +32,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,27 +39,29 @@ import java.util.NoSuchElementException;
import static com.google.common.util.concurrent.Futures.successfulAsList;
-public class CountMergeOperator implements ProcessOperator {
+public class CountGroupByLevelMergeOperator implements ProcessOperator {
+
private final PlanNodeId planNodeId;
private final OperatorContext operatorContext;
- private final TsBlock[] childrenTsBlocks;
+ private final List<Operator> children;
- private List<TsBlock> resultTsBlockList;
- private int currentIndex = 0;
+ private final boolean[] childrenHasNext;
- private final List<Operator> children;
+ private final Map<String, Long> countMap = new HashMap<>();
- private final boolean isGroupByLevel;
+ private List<TsBlock> resultTsBlockList;
- public CountMergeOperator(
+ private int currentIndex = 0;
+
+ public CountGroupByLevelMergeOperator(
PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator> children) {
this.planNodeId = planNodeId;
this.operatorContext = operatorContext;
this.children = children;
- isGroupByLevel = children.get(0) instanceof LevelTimeSeriesCountOperator;
- childrenTsBlocks = new TsBlock[children.size()];
+ childrenHasNext = new boolean[children.size()];
+ Arrays.fill(childrenHasNext, true);
}
@Override
@@ -71,7 +73,7 @@ public class CountMergeOperator implements ProcessOperator {
public ListenableFuture<?> isBlocked() {
List<ListenableFuture<?>> listenableFutureList = new ArrayList<>(children.size());
for (int i = 0; i < children.size(); i++) {
- if (childrenTsBlocks[i] == null) {
+ if (childrenHasNext[i]) {
ListenableFuture<?> blocked = children.get(i).isBlocked();
if (!blocked.isDone()) {
listenableFutureList.add(blocked);
@@ -91,22 +93,24 @@ public class CountMergeOperator implements ProcessOperator {
currentIndex++;
return resultTsBlockList.get(currentIndex - 1);
}
- boolean allChildrenReady = true;
+
+ boolean allChildrenConsumed = true;
for (int i = 0; i < children.size(); i++) {
- if (childrenTsBlocks[i] == null) {
- // when this operator is not blocked, it means all children that have not return TsBlock is
+ if (childrenHasNext[i]) {
+ // when this operator is not blocked, it means all children that have remaining TsBlock is
// not blocked.
if (children.get(i).hasNextWithTimer()) {
+ allChildrenConsumed = false;
TsBlock tsBlock = children.get(i).nextWithTimer();
- if (tsBlock == null || tsBlock.isEmpty()) {
- allChildrenReady = false;
- } else {
- childrenTsBlocks[i] = tsBlock;
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ consumeChildrenTsBlock(tsBlock);
}
+ } else {
+ childrenHasNext[i] = false;
}
}
}
- if (allChildrenReady) {
+ if (allChildrenConsumed) {
generateResultTsBlockList();
currentIndex++;
return resultTsBlockList.get(currentIndex - 1);
@@ -115,36 +119,15 @@ public class CountMergeOperator implements ProcessOperator {
}
}
- private void generateResultTsBlockList() {
- if (isGroupByLevel) {
- generateResultWithGroupByLevel();
- } else {
- generateResultWithoutGroupByLevel();
- }
- }
-
- private void generateResultWithoutGroupByLevel() {
- long totalCount = 0;
- for (TsBlock tsBlock : childrenTsBlocks) {
- long count = tsBlock.getColumn(0).getLong(0);
- totalCount += count;
+ private void consumeChildrenTsBlock(TsBlock tsBlock) {
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ String columnName = tsBlock.getColumn(0).getBinary(i).getStringValue();
+ long count = tsBlock.getColumn(1).getLong(i);
+ countMap.put(columnName, countMap.getOrDefault(columnName, 0L) + count);
}
- TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT64));
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder.getColumnBuilder(0).writeLong(totalCount);
- tsBlockBuilder.declarePosition();
- this.resultTsBlockList = Collections.singletonList(tsBlockBuilder.build());
}
- private void generateResultWithGroupByLevel() {
- Map<String, Long> countMap = new HashMap<>();
- for (TsBlock tsBlock : childrenTsBlocks) {
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- String columnName = tsBlock.getColumn(0).getBinary(i).getStringValue();
- long count = tsBlock.getColumn(1).getLong(i);
- countMap.put(columnName, countMap.getOrDefault(columnName, 0L) + count);
- }
- }
+ private void generateResultTsBlockList() {
this.resultTsBlockList =
SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
countMap.entrySet().iterator(),
@@ -155,6 +138,11 @@ public class CountMergeOperator implements ProcessOperator {
tsBlockBuilder.getColumnBuilder(1).writeLong(entry.getValue());
tsBlockBuilder.declarePosition();
});
+ if (resultTsBlockList.isEmpty()) {
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(Arrays.asList(TSDataType.TEXT, TSDataType.INT64));
+ resultTsBlockList.add(tsBlockBuilder.build());
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 32f215716e..177017f2cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -25,16 +25,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
import static com.google.common.util.concurrent.Futures.successfulAsList;
@@ -50,14 +46,11 @@ public class CountMergeOperator implements ProcessOperator {
private final List<Operator> children;
- private final boolean isGroupByLevel;
-
public CountMergeOperator(
PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator> children) {
this.planNodeId = planNodeId;
this.operatorContext = operatorContext;
this.children = children;
- isGroupByLevel = children.get(0) instanceof LevelTimeSeriesCountOperator;
childrenTsBlocks = new TsBlock[children.size()];
}
@@ -116,14 +109,6 @@ public class CountMergeOperator implements ProcessOperator {
}
private void generateResultTsBlockList() {
- if (isGroupByLevel) {
- generateResultWithGroupByLevel();
- } else {
- generateResultWithoutGroupByLevel();
- }
- }
-
- private void generateResultWithoutGroupByLevel() {
long totalCount = 0;
for (TsBlock tsBlock : childrenTsBlocks) {
long count = tsBlock.getColumn(0).getLong(0);
@@ -136,27 +121,6 @@ public class CountMergeOperator implements ProcessOperator {
this.resultTsBlockList = Collections.singletonList(tsBlockBuilder.build());
}
- private void generateResultWithGroupByLevel() {
- Map<String, Long> countMap = new HashMap<>();
- for (TsBlock tsBlock : childrenTsBlocks) {
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- String columnName = tsBlock.getColumn(0).getBinary(i).getStringValue();
- long count = tsBlock.getColumn(1).getLong(i);
- countMap.put(columnName, countMap.getOrDefault(columnName, 0L) + count);
- }
- }
- this.resultTsBlockList =
- SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
- countMap.entrySet().iterator(),
- Arrays.asList(TSDataType.TEXT, TSDataType.INT64),
- (entry, tsBlockBuilder) -> {
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(entry.getKey()));
- tsBlockBuilder.getColumnBuilder(1).writeLong(entry.getValue());
- tsBlockBuilder.declarePosition();
- });
- }
-
@Override
public boolean hasNext() {
return resultTsBlockList == null || currentIndex < resultTsBlockList.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index ab33978776..c92cffef53 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.execution.operator.schema;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
+import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
+import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
@@ -29,8 +32,11 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -39,19 +45,21 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class LevelTimeSeriesCountOperator implements SourceOperator {
+
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+
private final PlanNodeId sourceId;
private final OperatorContext operatorContext;
+ private final List<TSDataType> outputDataTypes;
+
private final PartialPath partialPath;
private final boolean isPrefixPath;
- private final int level;
private final String key;
private final String value;
private final boolean isContains;
+ private final int level;
- private List<TsBlock> tsBlockList;
- private int currentIndex = 0;
-
- private final List<TSDataType> outputDataTypes;
+ private ISchemaReader<ITimeSeriesSchemaInfo> timeSeriesReader;
public LevelTimeSeriesCountOperator(
PlanNodeId sourceId,
@@ -91,51 +99,63 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
if (!hasNext()) {
throw new NoSuchElementException();
}
- currentIndex++;
- return tsBlockList.get(currentIndex - 1);
+ return generateResult();
}
@Override
public boolean hasNext() {
- if (tsBlockList == null) {
- createTsBlockList();
+ if (timeSeriesReader == null) {
+ timeSeriesReader = createTimeSeriesReader();
}
-
- return currentIndex < tsBlockList.size();
+ return timeSeriesReader.hasNext();
}
- public void createTsBlockList() {
- Map<PartialPath, Long> countMap;
+ public ISchemaReader<ITimeSeriesSchemaInfo> createTimeSeriesReader() {
try {
- if (key != null && value != null) {
- countMap =
- ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
- .getSchemaRegion()
- .getMeasurementCountGroupByLevel(
- partialPath, level, isPrefixPath, key, value, isContains);
- } else {
- countMap =
- ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
- .getSchemaRegion()
- .getMeasurementCountGroupByLevel(partialPath, level, isPrefixPath);
- }
-
+ return ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ partialPath, null, isContains, key, value, 0, 0, isPrefixPath));
} catch (MetadataException e) {
throw new RuntimeException(e.getMessage(), e);
}
+ }
+
+ private TsBlock generateResult() {
+ Map<PartialPath, Long> countMap = new HashMap<>();
+ ITimeSeriesSchemaInfo timeSeriesSchemaInfo;
+ PartialPath path;
+ PartialPath levelPath;
+ while (timeSeriesReader.hasNext()) {
+ timeSeriesSchemaInfo = timeSeriesReader.next();
+ path = timeSeriesSchemaInfo.getPartialPath();
+ if (path.getNodeLength() < level) {
+ continue;
+ }
+ levelPath = new PartialPath(Arrays.copyOf(path.getNodes(), level + 1));
+ countMap.compute(
+ levelPath,
+ (k, v) -> {
+ if (v == null) {
+ return 1L;
+ } else {
+ return v + 1;
+ }
+ });
+ if (countMap.size() == DEFAULT_BATCH_SIZE) {
+ break;
+ }
+ }
- tsBlockList =
- SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
- countMap.entrySet().iterator(),
- outputDataTypes,
- (entry, tsBlockBuilder) -> {
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder
- .getColumnBuilder(0)
- .writeBinary(new Binary(entry.getKey().getFullPath()));
- tsBlockBuilder.getColumnBuilder(1).writeLong(entry.getValue());
- tsBlockBuilder.declarePosition();
- });
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+ for (Map.Entry<PartialPath, Long> entry : countMap.entrySet()) {
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(entry.getKey().getFullPath()));
+ tsBlockBuilder.getColumnBuilder(1).writeLong(entry.getValue());
+ tsBlockBuilder.declarePosition();
+ }
+ return tsBlockBuilder.build();
}
@Override
@@ -157,4 +177,11 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
public long calculateRetainedSizeAfterCallingNext() {
return 0L;
}
+
+ @Override
+ public void close() throws Exception {
+ if (timeSeriesReader != null) {
+ timeSeriesReader.close();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index b75dfc6cab..e3400d0120 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -96,6 +96,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.CountGroupByLevelMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
@@ -534,7 +535,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
CountMergeOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new CountMergeOperator(node.getPlanNodeId(), operatorContext, children);
+ if (children.get(0) instanceof LevelTimeSeriesCountOperator) {
+ return new CountGroupByLevelMergeOperator(node.getPlanNodeId(), operatorContext, children);
+ } else {
+ return new CountMergeOperator(node.getPlanNodeId(), operatorContext, children);
+ }
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
index 757265d26c..fd05ee80f3 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
@@ -44,7 +44,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -503,74 +502,4 @@ public abstract class MTreeBelowSGTest {
Assert.assertEquals(
2, storageGroup.getNodesListInGivenLevel(new PartialPath("root.*.*.s1"), 2, false).size());
}
-
- @Test
- public void testGetMeasurementCountGroupByLevel() throws Exception {
- storageGroup = getStorageGroup(new PartialPath("root.sg"));
- storageGroup.createTimeseries(
- new PartialPath("root.sg.a1.s1"),
- TSDataType.INT32,
- TSEncoding.PLAIN,
- CompressionType.GZIP,
- null,
- null);
- storageGroup.createTimeseries(
- new PartialPath("root.sg.a1.d1.s1"),
- TSDataType.INT32,
- TSEncoding.PLAIN,
- CompressionType.GZIP,
- null,
- null);
- storageGroup.createTimeseries(
- new PartialPath("root.sg.a1.d1.s2"),
- TSDataType.INT32,
- TSEncoding.PLAIN,
- CompressionType.GZIP,
- null,
- null);
-
- storageGroup.createTimeseries(
- new PartialPath("root.sg.a2.s1"),
- TSDataType.INT32,
- TSEncoding.PLAIN,
- CompressionType.GZIP,
- null,
- null);
- storageGroup.createTimeseries(
- new PartialPath("root.sg.a2.d1.s1"),
- TSDataType.INT32,
- TSEncoding.PLAIN,
- CompressionType.GZIP,
- null,
- null);
-
- PartialPath pattern = new PartialPath("root.sg.**");
- Map<PartialPath, Long> result = storageGroup.getMeasurementCountGroupByLevel(pattern, 2, false);
- assertEquals(2, result.size());
- assertEquals(3, (long) result.get(new PartialPath("root.sg.a1")));
- assertEquals(2, (long) result.get(new PartialPath("root.sg.a2")));
-
- result = storageGroup.getMeasurementCountGroupByLevel(pattern, 3, false);
- assertEquals(4, result.size());
- assertEquals(1, (long) result.get(new PartialPath("root.sg.a1.s1")));
- assertEquals(2, (long) result.get(new PartialPath("root.sg.a1.d1")));
- assertEquals(1, (long) result.get(new PartialPath("root.sg.a2.s1")));
- assertEquals(1, (long) result.get(new PartialPath("root.sg.a2.d1")));
-
- result = storageGroup.getMeasurementCountGroupByLevel(pattern, 5, false);
- assertEquals(0, result.size());
-
- pattern = new PartialPath("root.**.s1");
- result = storageGroup.getMeasurementCountGroupByLevel(pattern, 2, false);
- assertEquals(2, result.size());
- assertEquals(2, (long) result.get(new PartialPath("root.sg.a1")));
- assertEquals(2, (long) result.get(new PartialPath("root.sg.a2")));
-
- result = storageGroup.getMeasurementCountGroupByLevel(pattern, 3, false);
- assertEquals(4, result.size());
- assertEquals(1, (long) result.get(new PartialPath("root.sg.a1.s1")));
- assertEquals(1, (long) result.get(new PartialPath("root.sg.a1.d1")));
- assertEquals(1, (long) result.get(new PartialPath("root.sg.a2.s1")));
- assertEquals(1, (long) result.get(new PartialPath("root.sg.a2.d1")));
- }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
index 58f5b0286b..707df2c821 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
@@ -52,6 +52,7 @@ import java.util.stream.IntStream;
import static org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getAllTimeseriesCount;
import static org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getDevicesNum;
+import static org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getMeasurementCountGroupByLevel;
/**
* This class define test cases for {@link ISchemaRegion}. All test cases will be run in both Memory
@@ -384,89 +385,82 @@ public class SchemaRegionBasicTest extends AbstractSchemaRegionTest {
expected.put(new PartialPath("root"), (long) 6);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.**"), 0, false));
+ getMeasurementCountGroupByLevel(schemaRegion, new PartialPath("root.**"), 0, false));
expected.clear();
expected.put(new PartialPath("root.laptop"), (long) 1);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.laptop.*"), 1, false));
+ getMeasurementCountGroupByLevel(schemaRegion, new PartialPath("root.laptop.*"), 1, false));
expected.clear();
expected.put(new PartialPath("root.laptop.d0"), (long) 1);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.laptop.d0"), 2, false));
- expected.clear();
-
- expected.put(new PartialPath("root.laptop.d1"), (long) 0);
- Assert.assertEquals(
- expected,
- schemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.laptop.d1"), 2, false));
+ getMeasurementCountGroupByLevel(schemaRegion, new PartialPath("root.laptop.d0"), 2, false));
expected.clear();
expected.put(new PartialPath("root.laptop.d1"), (long) 2);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath("root.laptop.d1.*"), 2, false));
+ getMeasurementCountGroupByLevel(
+ schemaRegion, new PartialPath("root.laptop.d1.*"), 2, false));
expected.clear();
expected.put(new PartialPath("root.laptop.d1"), (long) 3);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath("root.laptop.d1.**"), 2, false));
+ getMeasurementCountGroupByLevel(
+ schemaRegion, new PartialPath("root.laptop.d1.**"), 2, false));
expected.clear();
expected.put(new PartialPath("root.laptop.d2"), (long) 2);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath("root.laptop.d2.*"), 2, false));
+ getMeasurementCountGroupByLevel(
+ schemaRegion, new PartialPath("root.laptop.d2.*"), 2, false));
expected.clear();
expected.put(new PartialPath("root.laptop"), (long) 2);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath("root.laptop.*.s1"), 1, false));
+ getMeasurementCountGroupByLevel(
+ schemaRegion, new PartialPath("root.laptop.*.s1"), 1, false));
expected.clear();
- expected.put(new PartialPath("root.laptop.d0"), (long) 0);
expected.put(new PartialPath("root.laptop.d1"), (long) 1);
expected.put(new PartialPath("root.laptop.d2"), (long) 1);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath("root.laptop.*.s1"), 2, false));
+ getMeasurementCountGroupByLevel(
+ schemaRegion, new PartialPath("root.laptop.*.s1"), 2, false));
expected.clear();
expected.put(new PartialPath("root.laptop"), (long) 1);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath("root.laptop.*.s2"), 1, false));
+ getMeasurementCountGroupByLevel(
+ schemaRegion, new PartialPath("root.laptop.*.s2"), 1, false));
expected.clear();
- expected.put(new PartialPath("root.laptop.d0"), (long) 0);
expected.put(new PartialPath("root.laptop.d1"), (long) 2);
expected.put(new PartialPath("root.laptop.d2"), (long) 2);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.laptop.*.*"), 2, false));
+ getMeasurementCountGroupByLevel(
+ schemaRegion, new PartialPath("root.laptop.*.*"), 2, false));
expected.clear();
// for prefix matched path
expected.put(new PartialPath("root"), (long) 6);
Assert.assertEquals(
- expected, schemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root"), 0, true));
+ expected, getMeasurementCountGroupByLevel(schemaRegion, new PartialPath("root"), 0, true));
expected.clear();
expected.put(new PartialPath("root.laptop.d1"), (long) 3);
Assert.assertEquals(
expected,
- schemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.laptop.d1"), 2, true));
+ getMeasurementCountGroupByLevel(schemaRegion, new PartialPath("root.laptop.d1"), 2, true));
expected.clear();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java
index d5fc8e8b58..671f2d53b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -179,6 +181,34 @@ public class SchemaRegionTestUtil {
}
}
+ public static Map<PartialPath, Long> getMeasurementCountGroupByLevel(
+ ISchemaRegion schemaRegion, PartialPath pathPattern, int level, boolean isPrefixMatch) {
+ try (ISchemaReader<ITimeSeriesSchemaInfo> timeSeriesReader =
+ schemaRegion.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ pathPattern, null, false, null, null, 0, 0, isPrefixMatch)); ) {
+ Map<PartialPath, Long> countMap = new HashMap<>();
+ while (timeSeriesReader.hasNext()) {
+ ITimeSeriesSchemaInfo timeSeriesSchemaInfo = timeSeriesReader.next();
+ PartialPath path = timeSeriesSchemaInfo.getPartialPath();
+ if (path.getNodeLength() < level) {
+ continue;
+ }
+ countMap.compute(
+ new PartialPath(Arrays.copyOf(path.getNodes(), level + 1)),
+ (k, v) -> {
+ if (v == null) {
+ return 1L;
+ }
+ return v + 1;
+ });
+ }
+ return countMap;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static List<String> getPathsUsingTemplate(
ISchemaRegion schemaRegion, PartialPath pathPattern, int templateId)
throws MetadataException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
similarity index 51%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
index 6049de92ff..43dbece707 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.db.mpp.execution.operator.schema;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
+import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
+import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -36,21 +39,22 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class CountMergeOperatorTest {
- private static final String COUNT_MERGE_OPERATOR_TEST_SG = "root.CountMergeOperatorTest";
+public class CountGroupByLevelMergeOperatorTest {
+ private static final String OPERATOR_TEST_SG = "root.CountGroupByLevelMergeOperatorTest";
@Test
public void testCountMergeOperator() {
@@ -68,21 +72,7 @@ public class CountMergeOperatorTest {
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
1, planNodeId, LevelTimeSeriesCountOperator.class.getSimpleName());
- ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
- Map<PartialPath, Long> sgResult = new HashMap<>();
- for (int i = 0; i < 10; i++) {
- sgResult.put(new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG + ".device" + i), 10L);
- }
- Mockito.when(
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG), 2, true))
- .thenReturn(sgResult);
- Mockito.when(
- schemaRegion.getMeasurementCountGroupByLevel(
- new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG + ".device2"), 2, true))
- .thenReturn(
- Collections.singletonMap(
- new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG + ".device2"), 10L));
+ ISchemaRegion schemaRegion = mockSchemaRegion();
operatorContext
.getInstanceContext()
.setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
@@ -90,50 +80,113 @@ public class CountMergeOperatorTest {
new LevelTimeSeriesCountOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
- new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG),
+ new PartialPath(OPERATOR_TEST_SG + ".device2"),
true,
2,
null,
null,
false);
+
LevelTimeSeriesCountOperator timeSeriesCountOperator2 =
new LevelTimeSeriesCountOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
- new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG + ".device2"),
+ new PartialPath(OPERATOR_TEST_SG),
true,
2,
null,
null,
false);
- CountMergeOperator countMergeOperator =
- new CountMergeOperator(
+
+ CountGroupByLevelMergeOperator mergeOperator =
+ new CountGroupByLevelMergeOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
Arrays.asList(timeSeriesCountOperator1, timeSeriesCountOperator2));
- TsBlock tsBlock = null;
- Assert.assertTrue(countMergeOperator.isBlocked().isDone());
- while (countMergeOperator.hasNext()) {
- tsBlock = countMergeOperator.next();
- if (tsBlock != null) {
- assertFalse(countMergeOperator.hasNext());
+
+ Assert.assertTrue(mergeOperator.isBlocked().isDone());
+
+ List<TsBlock> tsBlocks = new ArrayList<>();
+ while (mergeOperator.hasNext()) {
+ TsBlock tsBlock = mergeOperator.next();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
}
+ tsBlocks.add(tsBlock);
}
- assertNotNull(tsBlock);
- for (int i = 0; i < 10; i++) {
- String path = tsBlock.getColumn(0).getBinary(i).getStringValue();
- assertTrue(path.startsWith(COUNT_MERGE_OPERATOR_TEST_SG + ".device"));
- if (path.equals(COUNT_MERGE_OPERATOR_TEST_SG + ".device2")) {
- assertEquals(20, tsBlock.getColumn(1).getLong(i));
- } else {
- assertEquals(10, tsBlock.getColumn(1).getLong(i));
+ assertFalse(tsBlocks.isEmpty());
+
+ Set<String> pathSet = new HashSet<>(2001);
+ for (TsBlock tsBlock : tsBlocks) {
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ String path = tsBlock.getColumn(0).getBinary(i).getStringValue();
+ pathSet.add(path);
+ assertTrue(path.startsWith(OPERATOR_TEST_SG));
+ if (path.equals(OPERATOR_TEST_SG + ".device2")) {
+ assertEquals(10, tsBlock.getColumn(1).getLong(i));
+ } else {
+ assertEquals(1, tsBlock.getColumn(1).getLong(i));
+ }
}
}
- } catch (MetadataException e) {
+
+ Assert.assertEquals(2001, pathSet.size());
+ } catch (Exception e) {
e.printStackTrace();
fail();
} finally {
instanceNotificationExecutor.shutdown();
}
}
+
+ private ISchemaRegion mockSchemaRegion() throws Exception {
+ ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
+ ISchemaReader<ITimeSeriesSchemaInfo> schemaReader =
+ mockSchemaReader(10, OPERATOR_TEST_SG + ".device2");
+ Mockito.when(
+ schemaRegion.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ new PartialPath(OPERATOR_TEST_SG + ".device2"),
+ null,
+ false,
+ null,
+ null,
+ 0,
+ 0,
+ true)))
+ .thenReturn(schemaReader);
+ schemaReader = mockSchemaReader(2000, OPERATOR_TEST_SG);
+ Mockito.when(
+ schemaRegion.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ new PartialPath(OPERATOR_TEST_SG), null, false, null, null, 0, 0, true)))
+ .thenReturn(schemaReader);
+ return schemaRegion;
+ }
+
+ private ISchemaReader<ITimeSeriesSchemaInfo> mockSchemaReader(int expectedNum, String prefix)
+ throws IllegalPathException {
+ List<ITimeSeriesSchemaInfo> timeSeriesSchemaInfoList = new ArrayList<>(expectedNum);
+ for (int i = 0; i < expectedNum; i++) {
+ ITimeSeriesSchemaInfo timeSeriesSchemaInfo = Mockito.mock(ITimeSeriesSchemaInfo.class);
+ Mockito.when(timeSeriesSchemaInfo.getPartialPath())
+ .thenReturn(new PartialPath(prefix + ".d" + i + ".s"));
+ timeSeriesSchemaInfoList.add(timeSeriesSchemaInfo);
+ }
+ Iterator<ITimeSeriesSchemaInfo> iterator = timeSeriesSchemaInfoList.iterator();
+ return new ISchemaReader<ITimeSeriesSchemaInfo>() {
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public ITimeSeriesSchemaInfo next() {
+ return iterator.next();
+ }
+ };
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
index 4f0ceaf0cd..e4d001a3f1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.operator.schema;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
@@ -41,10 +42,8 @@ import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -236,18 +235,7 @@ public class SchemaCountOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId, LevelTimeSeriesCountOperator.class.getSimpleName());
PartialPath partialPath = new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG);
- ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
-
- Map<PartialPath, Long> result = new HashMap<>();
- for (int i = 0; i < 10; i++) {
- result.put(new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG + ".device" + i), 10L);
- }
-
- Mockito.when(schemaRegion.getMeasurementCountGroupByLevel(partialPath, 2, true))
- .thenReturn(result);
- Mockito.when(schemaRegion.getMeasurementCountGroupByLevel(partialPath, 1, true))
- .thenReturn(
- Collections.singletonMap(new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG), 100L));
+ ISchemaRegion schemaRegion = mockSchemaRegion();
operatorContext
.getInstanceContext()
@@ -263,11 +251,9 @@ public class SchemaCountOperatorTest {
null,
false);
TsBlock tsBlock = null;
- while (timeSeriesCountOperator.hasNext()) {
- tsBlock = timeSeriesCountOperator.next();
- assertFalse(timeSeriesCountOperator.hasNext());
- }
- assertNotNull(tsBlock);
+ List<TsBlock> tsBlockList = collectResult(timeSeriesCountOperator);
+ assertEquals(1, tsBlockList.size());
+ tsBlock = tsBlockList.get(0);
for (int i = 0; i < 10; i++) {
String path = tsBlock.getColumn(0).getBinary(i).getStringValue();
@@ -279,22 +265,92 @@ public class SchemaCountOperatorTest {
new LevelTimeSeriesCountOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
- partialPath,
- true,
+ new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG + ".**"),
+ false,
1,
null,
null,
false);
- while (timeSeriesCountOperator2.hasNext()) {
- tsBlock = timeSeriesCountOperator2.next();
- }
- assertNotNull(tsBlock);
+ tsBlockList = collectResult(timeSeriesCountOperator2);
+ assertEquals(1, tsBlockList.size());
+ tsBlock = tsBlockList.get(0);
+
assertEquals(100, tsBlock.getColumn(1).getLong(0));
- } catch (MetadataException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
} finally {
instanceNotificationExecutor.shutdown();
}
}
+
+ private List<TsBlock> collectResult(LevelTimeSeriesCountOperator operator) {
+ List<TsBlock> tsBlocks = new ArrayList<>();
+ while (operator.hasNext()) {
+ TsBlock tsBlock = operator.next();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
+ }
+ tsBlocks.add(tsBlock);
+ }
+ return tsBlocks;
+ }
+
+ private ISchemaRegion mockSchemaRegion() throws Exception {
+ ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
+ ISchemaReader<ITimeSeriesSchemaInfo> schemaReader = mockSchemaReader();
+ Mockito.when(
+ schemaRegion.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG),
+ null,
+ false,
+ null,
+ null,
+ 0,
+ 0,
+ true)))
+ .thenReturn(schemaReader);
+ schemaReader = mockSchemaReader();
+ Mockito.when(
+ schemaRegion.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG + ".**"),
+ null,
+ false,
+ null,
+ null,
+ 0,
+ 0,
+ false)))
+ .thenReturn(schemaReader);
+ return schemaRegion;
+ }
+
+ private ISchemaReader<ITimeSeriesSchemaInfo> mockSchemaReader() throws IllegalPathException {
+ List<ITimeSeriesSchemaInfo> timeSeriesSchemaInfoList = new ArrayList<>(1000);
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ ITimeSeriesSchemaInfo timeSeriesSchemaInfo = Mockito.mock(ITimeSeriesSchemaInfo.class);
+ Mockito.when(timeSeriesSchemaInfo.getPartialPath())
+ .thenReturn(new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG + ".device" + i + ".s" + j));
+ timeSeriesSchemaInfoList.add(timeSeriesSchemaInfo);
+ }
+ }
+ Iterator<ITimeSeriesSchemaInfo> iterator = timeSeriesSchemaInfoList.iterator();
+ return new ISchemaReader<ITimeSeriesSchemaInfo>() {
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public ITimeSeriesSchemaInfo next() {
+ return iterator.next();
+ }
+ };
+ }
}