You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/27 13:12:43 UTC
[iotdb] branch master updated: [IOTDB-2803][new cluster][mpp] Adapt count devices / timeseries to mpp (#5654)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 71fc0921db [IOTDB-2803][new cluster][mpp] Adapt count devices / timeseries to mpp (#5654)
71fc0921db is described below
commit 71fc0921dbe6920943d8b874032ace34bb6ef26b
Author: xinzhongtianxia <45...@qq.com>
AuthorDate: Wed Apr 27 21:12:38 2022 +0800
[IOTDB-2803][new cluster][mpp] Adapt count devices / timeseries to mpp (#5654)
---
.../iotdb/db/mpp/common/header/DatasetHeader.java | 6 +
.../iotdb/db/mpp/common/header/HeaderConstant.java | 40 ++++
.../db/mpp/operator/schema/CountMergeOperator.java | 133 +++++++++++++
.../mpp/operator/schema/DevicesCountOperator.java | 90 +++++++++
.../operator/schema/DevicesSchemaScanOperator.java | 20 +-
.../schema/LevelTimeSeriesCountOperator.java | 100 ++++++++++
.../mpp/operator/schema/SchemaMergeOperator.java | 31 ++-
.../operator/schema/TimeSeriesCountOperator.java | 90 +++++++++
.../schema/TimeSeriesSchemaScanOperator.java | 17 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 57 +++++-
.../iotdb/db/mpp/sql/constant/StatementType.java | 4 +-
.../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java | 63 ++++++
.../db/mpp/sql/planner/DistributionPlanner.java | 47 ++++-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 85 +++++++-
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 35 ++++
.../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 33 +++-
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 22 ++-
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 24 ++-
...MergeNode.java => AbstractSchemaMergeNode.java} | 55 ++----
...emaMergeNode.java => CountSchemaMergeNode.java} | 62 +-----
...{SchemaMergeNode.java => DevicesCountNode.java} | 70 +++----
.../node/metedata/read/DevicesSchemaScanNode.java | 24 +--
...ScanNode.java => LevelTimeSeriesCountNode.java} | 66 +++----
.../plan/node/metedata/read/SchemaFetchNode.java | 15 --
.../plan/node/metedata/read/SchemaScanNode.java | 28 ++-
...maMergeNode.java => SeriesSchemaMergeNode.java} | 50 +----
...hemaFetchNode.java => TimeSeriesCountNode.java} | 73 +++----
.../metedata/read/TimeSeriesSchemaScanNode.java | 16 +-
.../db/mpp/sql/statement/StatementVisitor.java | 15 ++
.../statement/metadata/CountDevicesStatement.java | 34 ++++
.../metadata/CountLevelTimeSeriesStatement.java | 41 ++++
.../statement/metadata/CountNodesStatement.java | 31 +++
.../mpp/sql/statement/metadata/CountStatement.java | 47 +++++
.../metadata/CountStorageGroupStatement.java | 30 +++
.../metadata/CountTimeSeriesStatement.java | 34 ++++
...eratorTest.java => CountMergeOperatorTest.java} | 177 ++++++-----------
...ratorTest.java => SchemaCountOperatorTest.java} | 215 +++++++++------------
.../operator/schema/SchemaScanOperatorTest.java | 3 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 4 +-
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 6 +-
.../read/DeviceSchemaScanNodeSerdeTest.java | 5 +-
...erdeTest.java => SchemaCountNodeSerdeTest.java} | 67 ++++---
.../read/TimeSeriesSchemaScanNodeSerdeTest.java | 5 +-
43 files changed, 1430 insertions(+), 640 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
index ab2febd6d8..9d3e519221 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.common.header;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
import com.google.common.primitives.Bytes;
import java.util.ArrayList;
@@ -72,6 +74,10 @@ public class DatasetHeader {
.collect(Collectors.toList());
}
+ public List<TSDataType> getRespDataTypes() {
+ return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ }
+
public List<Byte> getRespAliasColumns() {
BitSet aliasMap = new BitSet();
for (int i = 0; i < columnHeaders.size(); ++i) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
index 1751cf6a8c..f4bac2051e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
@@ -41,12 +41,52 @@ public class HeaderConstant {
public static final String COLUMN_ATTRIBUTES = "attributes";
public static final String COLUMN_IS_ALIGNED = "isAligned";
+ // column names for count statement
+ public static final String COLUMN_COLUMN = "column";
+ public static final String COLUMN_COUNT_DEVICES = "count(devices)";
+ public static final String COLUMN_COUNT_NODES = "count(nodes)";
+ public static final String COLUMN_COUNT_TIMESERIES = "count(timeseries)";
+ public static final String COLUMN_COUNT_STORAGE_GROUP = "count(storage group)";
+
// dataset header for schema statement
public static final DatasetHeader showTimeSeriesHeader;
public static final DatasetHeader showDevicesHeader;
public static final DatasetHeader showDevicesWithSgHeader;
public static final DatasetHeader showStorageGroupHeader;
+ // dataset header for count statement
+ public static final DatasetHeader countStorageGroupHeader;
+ public static final DatasetHeader countNodesHeader;
+ public static final DatasetHeader countDevicesHeader;
+ public static final DatasetHeader countTimeSeriesHeader;
+ public static final DatasetHeader countLevelTimeSeriesHeader;
+
+ static {
+ countStorageGroupHeader =
+ new DatasetHeader(
+ Collections.singletonList(
+ new ColumnHeader(COLUMN_COUNT_STORAGE_GROUP, TSDataType.INT32)),
+ true);
+ countNodesHeader =
+ new DatasetHeader(
+ Collections.singletonList(new ColumnHeader(COLUMN_COUNT_NODES, TSDataType.INT32)),
+ true);
+ countDevicesHeader =
+ new DatasetHeader(
+ Collections.singletonList(new ColumnHeader(COLUMN_COUNT_DEVICES, TSDataType.INT32)),
+ true);
+ countTimeSeriesHeader =
+ new DatasetHeader(
+ Collections.singletonList(new ColumnHeader(COLUMN_COUNT_TIMESERIES, TSDataType.INT32)),
+ true);
+ countLevelTimeSeriesHeader =
+ new DatasetHeader(
+ Arrays.asList(
+ new ColumnHeader(COLUMN_COLUMN, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_COUNT_TIMESERIES, TSDataType.INT32)),
+ true);
+ }
+
static {
showTimeSeriesHeader =
new DatasetHeader(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/CountMergeOperator.java
new file mode 100644
index 0000000000..4107c804ac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/CountMergeOperator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CountMergeOperator implements ProcessOperator {
+ private final PlanNodeId planNodeId;
+ private final OperatorContext operatorContext;
+ private boolean isFinished;
+
+ private final List<Operator> children;
+
+ public CountMergeOperator(
+ PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator> children) {
+ this.planNodeId = planNodeId;
+ this.operatorContext = operatorContext;
+ this.children = children;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ for (Operator child : children) {
+ ListenableFuture<Void> blocked = child.isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ }
+ return NOT_BLOCKED;
+ }
+
+ @Override
+ public TsBlock next() {
+ isFinished = true;
+ if (children.get(0) instanceof LevelTimeSeriesCountOperator) {
+ return nextWithGroupByLevel();
+ }
+ return nextWithoutGroupByLevel();
+ }
+
+ private TsBlock nextWithoutGroupByLevel() {
+ List<TsBlock> tsBlocks = new ArrayList<>(children.size());
+ for (Operator child : children) {
+ if (child.hasNext()) {
+ tsBlocks.add(child.next());
+ }
+ }
+ int totalCount = 0;
+ for (TsBlock tsBlock : tsBlocks) {
+ int count = tsBlock.getColumn(0).getInt(0);
+ totalCount += count;
+ }
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeInt(totalCount);
+ tsBlockBuilder.declarePosition();
+ return tsBlockBuilder.build();
+ }
+
+ private TsBlock nextWithGroupByLevel() {
+ List<TsBlock> tsBlocks = new ArrayList<>(children.size());
+ for (Operator child : children) {
+ if (child.hasNext()) {
+ tsBlocks.add(child.next());
+ }
+ }
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
+ Map<String, Integer> countMap = new HashMap<>();
+ for (TsBlock tsBlock : tsBlocks) {
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ String columnName = tsBlock.getColumn(0).getBinary(i).getStringValue();
+ int count = tsBlock.getColumn(1).getInt(i);
+ countMap.put(columnName, countMap.getOrDefault(columnName, 0) + count);
+ }
+ }
+ countMap.forEach(
+ (column, count) -> {
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(column));
+ tsBlockBuilder.getColumnBuilder(1).writeInt(count);
+ tsBlockBuilder.declarePosition();
+ });
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !isFinished;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesCountOperator.java
new file mode 100644
index 0000000000..0752708eb9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesCountOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+public class DevicesCountOperator implements SourceOperator {
+ private final PlanNodeId sourceId;
+ private final OperatorContext operatorContext;
+ private final PartialPath partialPath;
+ private final boolean isPrefixPath;
+
+ private boolean isFinished;
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+
+ public DevicesCountOperator(
+ PlanNodeId sourceId,
+ OperatorContext operatorContext,
+ PartialPath partialPath,
+ boolean isPrefixPath) {
+ this.sourceId = sourceId;
+ this.operatorContext = operatorContext;
+ this.partialPath = partialPath;
+ this.isPrefixPath = isPrefixPath;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ isFinished = true;
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(HeaderConstant.countDevicesHeader.getRespDataTypes());
+ int count = 0;
+ try {
+ count =
+ ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getDevicesNum(partialPath, isPrefixPath);
+ } catch (MetadataException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeInt(count);
+ tsBlockBuilder.declarePosition();
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !isFinished;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java
index c13fcf97d2..1f47a23df2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java
@@ -20,29 +20,19 @@ package org.apache.iotdb.db.mpp.operator.schema;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
-import java.util.Arrays;
-
public class DevicesSchemaScanOperator extends SchemaScanOperator {
private final boolean hasSgCol;
- private static final TSDataType[] RESOURCE_TYPES_WITH_SG = {
- TSDataType.TEXT, TSDataType.TEXT, TSDataType.BOOLEAN,
- };
-
- private static final TSDataType[] RESOURCE_TYPES = {
- TSDataType.TEXT, TSDataType.BOOLEAN,
- };
-
public DevicesSchemaScanOperator(
PlanNodeId sourceId,
OperatorContext operatorContext,
@@ -59,7 +49,9 @@ public class DevicesSchemaScanOperator extends SchemaScanOperator {
protected TsBlock createTsBlock() {
TsBlockBuilder builder =
new TsBlockBuilder(
- hasSgCol ? Arrays.asList(RESOURCE_TYPES_WITH_SG) : Arrays.asList(RESOURCE_TYPES));
+ hasSgCol
+ ? HeaderConstant.showDevicesWithSgHeader.getRespDataTypes()
+ : HeaderConstant.showDevicesHeader.getRespDataTypes());
try {
((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
.getSchemaRegion()
@@ -82,9 +74,9 @@ public class DevicesSchemaScanOperator extends SchemaScanOperator {
builder.getColumnBuilder(0).writeBinary(new Binary(device.getName()));
if (hasSgCol) {
builder.getColumnBuilder(1).writeBinary(new Binary(device.getSgName()));
- builder.getColumnBuilder(2).writeBoolean(device.isAligned());
+ builder.getColumnBuilder(2).writeBinary(new Binary(String.valueOf(device.isAligned())));
} else {
- builder.getColumnBuilder(1).writeBoolean(device.isAligned());
+ builder.getColumnBuilder(1).writeBinary(new Binary(String.valueOf(device.isAligned())));
}
builder.declarePosition();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/LevelTimeSeriesCountOperator.java
new file mode 100644
index 0000000000..87d422c72a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/LevelTimeSeriesCountOperator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Map;
+
+public class LevelTimeSeriesCountOperator implements SourceOperator {
+ private final PlanNodeId sourceId;
+ private final OperatorContext operatorContext;
+ private final PartialPath partialPath;
+ private final boolean isPrefixPath;
+ private final int level;
+
+ private boolean isFinished;
+
+ public LevelTimeSeriesCountOperator(
+ PlanNodeId sourceId,
+ OperatorContext operatorContext,
+ PartialPath partialPath,
+ boolean isPrefixPath,
+ int level) {
+ this.sourceId = sourceId;
+ this.operatorContext = operatorContext;
+ this.partialPath = partialPath;
+ this.isPrefixPath = isPrefixPath;
+ this.level = level;
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ isFinished = true;
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(HeaderConstant.countLevelTimeSeriesHeader.getRespDataTypes());
+ Map<PartialPath, Integer> countMap;
+ try {
+ countMap =
+ ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getMeasurementCountGroupByLevel(partialPath, level, isPrefixPath);
+ } catch (MetadataException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ countMap.forEach(
+ (path, count) -> {
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(path.getFullPath()));
+ tsBlockBuilder.getColumnBuilder(1).writeInt(count);
+ tsBlockBuilder.declarePosition();
+ });
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !isFinished;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java
index aebc7c16f6..e28a509762 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java
@@ -21,21 +21,23 @@ package org.apache.iotdb.db.mpp.operator.schema;
import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.List;
public class SchemaMergeOperator implements ProcessOperator {
-
- protected OperatorContext operatorContext;
- protected int limit;
- protected int offset;
+ private final PlanNodeId planNodeId;
+ private final OperatorContext operatorContext;
private final boolean[] noMoreTsBlocks;
- private boolean isFinished;
- private List<Operator> children;
+ private final List<Operator> children;
- public SchemaMergeOperator(OperatorContext operatorContext, List<Operator> children) {
+ public SchemaMergeOperator(
+ PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator> children) {
+ this.planNodeId = planNodeId;
this.operatorContext = operatorContext;
this.children = children;
noMoreTsBlocks = new boolean[children.size()];
@@ -48,7 +50,7 @@ public class SchemaMergeOperator implements ProcessOperator {
@Override
public TsBlock next() {
- // ToDo consider SHOW LATEST
+ // ToDo @xinzhongtianxia consider SHOW LATEST
for (int i = 0; i < children.size(); i++) {
if (!noMoreTsBlocks[i]) {
@@ -72,6 +74,19 @@ public class SchemaMergeOperator implements ProcessOperator {
return false;
}
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ for (int i = 0; i < children.size(); i++) {
+ if (!noMoreTsBlocks[i]) {
+ ListenableFuture<Void> blocked = children.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ }
+ }
+ return NOT_BLOCKED;
+ }
+
@Override
public boolean isFinished() {
return !hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesCountOperator.java
new file mode 100644
index 0000000000..77cbbfdd44
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesCountOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+public class TimeSeriesCountOperator implements SourceOperator {
+ private final PlanNodeId sourceId;
+ private final OperatorContext operatorContext;
+ private final PartialPath partialPath;
+ private final boolean isPrefixPath;
+
+ private boolean isFinished;
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+
+ public TimeSeriesCountOperator(
+ PlanNodeId sourceId,
+ OperatorContext operatorContext,
+ PartialPath partialPath,
+ boolean isPrefixPath) {
+ this.sourceId = sourceId;
+ this.operatorContext = operatorContext;
+ this.partialPath = partialPath;
+ this.isPrefixPath = isPrefixPath;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ isFinished = true;
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(HeaderConstant.countTimeSeriesHeader.getRespDataTypes());
+ int count = 0;
+ try {
+ count =
+ ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getAllTimeseriesCount(partialPath, isPrefixPath);
+ } catch (MetadataException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeInt(count);
+ tsBlockBuilder.declarePosition();
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !isFinished;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java
index 39dcf0c741..bad51e39cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java
@@ -20,17 +20,16 @@ package org.apache.iotdb.db.mpp.operator.schema;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
-import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
@@ -42,17 +41,6 @@ public class TimeSeriesSchemaScanOperator extends SchemaScanOperator {
// if is true, the result will be sorted according to the inserting frequency of the timeseries
private boolean orderByHeat;
- private static final TSDataType[] resourceTypes = {
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT
- };
-
public TimeSeriesSchemaScanOperator(
PlanNodeId planNodeId,
OperatorContext operatorContext,
@@ -89,7 +77,8 @@ public class TimeSeriesSchemaScanOperator extends SchemaScanOperator {
@Override
protected TsBlock createTsBlock() {
- TsBlockBuilder builder = new TsBlockBuilder(Arrays.asList(resourceTypes));
+ TsBlockBuilder builder =
+ new TsBlockBuilder(HeaderConstant.showTimeSeriesHeader.getRespDataTypes());
try {
((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
.getSchemaRegion()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 309e0be0b2..a0c55fdcf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaPartition;
@@ -49,6 +50,9 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountLevelTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
@@ -519,7 +523,10 @@ public class Analyzer {
SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(
- new PathPatternTree(showDevicesStatement.getPathPattern().concatNode("*")));
+ new PathPatternTree(
+ showDevicesStatement
+ .getPathPattern()
+ .concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)));
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
analysis.setRespDatasetHeader(
@@ -537,5 +544,53 @@ public class Analyzer {
analysis.setSchemaPartitionInfo(schemaFetchStatement.getSchemaPartition());
return analysis;
}
+
+ @Override
+ public Analysis visitCountDevices(
+ CountDevicesStatement countDevicesStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(countDevicesStatement);
+
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.getSchemaPartition(
+ new PathPatternTree(
+ countDevicesStatement
+ .getPartialPath()
+ .concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)));
+
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ analysis.setRespDatasetHeader(HeaderConstant.countDevicesHeader);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitCountTimeSeries(
+ CountTimeSeriesStatement countTimeSeriesStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(countTimeSeriesStatement);
+
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.getSchemaPartition(
+ new PathPatternTree(countTimeSeriesStatement.getPartialPath()));
+
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ analysis.setRespDatasetHeader(HeaderConstant.countTimeSeriesHeader);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitCountLevelTimeSeries(
+ CountLevelTimeSeriesStatement countLevelTimeSeriesStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(countLevelTimeSeriesStatement);
+
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.getSchemaPartition(
+ new PathPatternTree(countLevelTimeSeriesStatement.getPartialPath()));
+
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ analysis.setRespDatasetHeader(HeaderConstant.countLevelTimeSeriesHeader);
+ return analysis;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/constant/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/constant/StatementType.java
index 28ce35957a..6613946424 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/constant/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/constant/StatementType.java
@@ -133,5 +133,7 @@ public enum StatementType {
SHOW_QUERY_RESOURCE,
FETCH_SCHEMA,
- FETCH_SCHEMA_WITH_AUTO_CREATE
+ FETCH_SCHEMA_WITH_AUTO_CREATE,
+
+ COUNT
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
index 2f4a4d8f8a..57d1066391 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
@@ -54,6 +54,11 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.UDAFQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.UDTFQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountLevelTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountNodesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
@@ -66,6 +71,10 @@ import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.ConstantContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountDevicesContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountNodesContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountStorageGroupContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountTimeseriesContext;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.ExpressionContext;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParserBaseVisitor;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
@@ -433,6 +442,60 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return showDevicesStatement;
}
+ // Count Devices ========================================================================
+
+ @Override
+ public Statement visitCountDevices(CountDevicesContext ctx) {
+ PartialPath path;
+ if (ctx.prefixPath() != null) {
+ path = parsePrefixPath(ctx.prefixPath());
+ } else {
+ path = new PartialPath(SQLConstant.getSingleRootArray());
+ }
+ return new CountDevicesStatement(path);
+ }
+
+ // Count TimeSeries ========================================================================
+ @Override
+ public Statement visitCountTimeseries(CountTimeseriesContext ctx) {
+ PartialPath path;
+ if (ctx.prefixPath() != null) {
+ path = parsePrefixPath(ctx.prefixPath());
+ } else {
+ path = new PartialPath(SQLConstant.getSingleRootArray());
+ }
+ if (ctx.INTEGER_LITERAL() != null) {
+ int level = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
+ return new CountLevelTimeSeriesStatement(path, level);
+ }
+ return new CountTimeSeriesStatement(path);
+ }
+
+ // Count Nodes ========================================================================
+ @Override
+ public Statement visitCountNodes(CountNodesContext ctx) {
+ PartialPath path;
+ if (ctx.prefixPath() != null) {
+ path = parsePrefixPath(ctx.prefixPath());
+ } else {
+ path = new PartialPath(SQLConstant.getSingleRootArray());
+ }
+ int level = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
+ return new CountNodesStatement(path, level);
+ }
+
+ // Count StorageGroup ========================================================================
+ @Override
+ public Statement visitCountStorageGroup(CountStorageGroupContext ctx) {
+ PartialPath path;
+ if (ctx.prefixPath() != null) {
+ path = parsePrefixPath(ctx.prefixPath());
+ } else {
+ path = new PartialPath(SQLConstant.getSingleRootArray());
+ }
+ return new CountStatement(path);
+ }
+
/** Data Manipulation Language (DML) */
// Select Statement ========================================================================
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 513941954e..3c9329c469 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -33,9 +33,11 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.AbstractSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
@@ -46,8 +48,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -145,8 +149,8 @@ public class DistributionPlanner {
}
@Override
- public PlanNode visitSchemaMerge(SchemaMergeNode node, DistributionPlanContext context) {
- SchemaMergeNode root = (SchemaMergeNode) node.clone();
+ public PlanNode visitSchemaMerge(SeriesSchemaMergeNode node, DistributionPlanContext context) {
+ SeriesSchemaMergeNode root = (SeriesSchemaMergeNode) node.clone();
SchemaScanNode seed = (SchemaScanNode) node.getChildren().get(0);
TreeSet<TRegionReplicaSet> schemaRegions =
new TreeSet<>(Comparator.comparingInt(region -> region.getRegionId().getId()));
@@ -163,6 +167,7 @@ public class DistributionPlanner {
schemaRegions.forEach(
region -> {
SchemaScanNode schemaScanNode = (SchemaScanNode) seed.clone();
+ schemaScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
schemaScanNode.setRegionReplicaSet(region);
if (count > 1) {
schemaScanNode.setLimit(schemaScanNode.getOffset() + schemaScanNode.getLimit());
@@ -173,6 +178,30 @@ public class DistributionPlanner {
return root;
}
+ @Override
+ public PlanNode visitCountMerge(CountSchemaMergeNode node, DistributionPlanContext context) {
+ CountSchemaMergeNode root = (CountSchemaMergeNode) node.clone();
+ SchemaScanNode seed = (SchemaScanNode) node.getChildren().get(0);
+ Set<TRegionReplicaSet> schemaRegions = new HashSet<>();
+ analysis
+ .getSchemaPartitionInfo()
+ .getSchemaPartitionMap()
+ .forEach(
+ (storageGroup, deviceGroup) -> {
+ deviceGroup.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
+ schemaRegions.add(schemaRegionReplicaSet));
+ });
+ schemaRegions.forEach(
+ region -> {
+ SchemaScanNode schemaScanNode = (SchemaScanNode) seed.clone();
+ schemaScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ schemaScanNode.setRegionReplicaSet(region);
+ root.addChild(schemaScanNode);
+ });
+ return root;
+ }
+
// TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
@Override
public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
@@ -300,7 +329,12 @@ public class DistributionPlanner {
}
@Override
- public PlanNode visitSchemaMerge(SchemaMergeNode node, NodeGroupContext context) {
+ public PlanNode visitSchemaMerge(SeriesSchemaMergeNode node, NodeGroupContext context) {
+ return internalVisitSchemaMerge(node, context);
+ }
+
+ private PlanNode internalVisitSchemaMerge(
+ AbstractSchemaMergeNode node, NodeGroupContext context) {
node.getChildren()
.forEach(
child -> {
@@ -327,6 +361,11 @@ public class DistributionPlanner {
return newNode;
}
+ @Override
+ public PlanNode visitCountMerge(CountSchemaMergeNode node, NodeGroupContext context) {
+ return internalVisitSchemaMerge(node, context);
+ }
+
@Override
public PlanNode visitSchemaScan(SchemaScanNode node, NodeGroupContext context) {
NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 70cf5d296a..39594be9b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -37,9 +37,13 @@ import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.process.merge.ColumnMerger;
import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.operator.schema.CountMergeOperator;
+import org.apache.iotdb.db.mpp.operator.schema.DevicesCountOperator;
import org.apache.iotdb.db.mpp.operator.schema.DevicesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.operator.schema.LevelTimeSeriesCountOperator;
import org.apache.iotdb.db.mpp.operator.schema.SchemaFetchOperator;
import org.apache.iotdb.db.mpp.operator.schema.SchemaMergeOperator;
+import org.apache.iotdb.db.mpp.operator.schema.TimeSeriesCountOperator;
import org.apache.iotdb.db.mpp.operator.schema.TimeSeriesSchemaScanOperator;
import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.operator.source.ExchangeOperator;
@@ -47,9 +51,14 @@ import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.CountSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
@@ -160,6 +169,22 @@ public class LocalExecutionPlanner {
return seriesScanOperator;
}
+ @Override
+ public Operator visitSchemaScan(SchemaScanNode node, LocalExecutionPlanContext context) {
+ if (node instanceof TimeSeriesSchemaScanNode) {
+ return visitTimeSeriesSchemaScan((TimeSeriesSchemaScanNode) node, context);
+ } else if (node instanceof DevicesSchemaScanNode) {
+ return visitDevicesSchemaScan((DevicesSchemaScanNode) node, context);
+ } else if (node instanceof DevicesCountNode) {
+ return visitDevicesCount((DevicesCountNode) node, context);
+ } else if (node instanceof TimeSeriesCountNode) {
+ return visitTimeSeriesCount((TimeSeriesCountNode) node, context);
+ } else if (node instanceof LevelTimeSeriesCountNode) {
+ return visitLevelTimeSeriesCount((LevelTimeSeriesCountNode) node, context);
+ }
+ return visitPlan(node, context);
+ }
+
@Override
public Operator visitTimeSeriesSchemaScan(
TimeSeriesSchemaScanNode node, LocalExecutionPlanContext context) {
@@ -200,7 +225,8 @@ public class LocalExecutionPlanner {
}
@Override
- public Operator visitSchemaMerge(SchemaMergeNode node, LocalExecutionPlanContext context) {
+ public Operator visitSchemaMerge(
+ SeriesSchemaMergeNode node, LocalExecutionPlanContext context) {
List<Operator> children =
node.getChildren().stream()
.map(n -> n.accept(this, context))
@@ -210,7 +236,60 @@ public class LocalExecutionPlanner {
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaMergeOperator.class.getSimpleName());
- return new SchemaMergeOperator(operatorContext, children);
+ return new SchemaMergeOperator(node.getPlanNodeId(), operatorContext, children);
+ }
+
+ @Override
+ public Operator visitCountMerge(CountSchemaMergeNode node, LocalExecutionPlanContext context) {
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(n -> n.accept(this, context))
+ .collect(Collectors.toList());
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ CountSchemaMergeNode.class.getSimpleName());
+ return new CountMergeOperator(node.getPlanNodeId(), operatorContext, children);
+ }
+
+ @Override
+ public Operator visitDevicesCount(DevicesCountNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ CountSchemaMergeNode.class.getSimpleName());
+ return new DevicesCountOperator(
+ node.getPlanNodeId(), operatorContext, node.getPath(), node.isPrefixPath());
+ }
+
+ @Override
+ public Operator visitTimeSeriesCount(
+ TimeSeriesCountNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TimeSeriesCountNode.class.getSimpleName());
+ return new TimeSeriesCountOperator(
+ node.getPlanNodeId(), operatorContext, node.getPath(), node.isPrefixPath());
+ }
+
+ @Override
+ public Operator visitLevelTimeSeriesCount(
+ LevelTimeSeriesCountNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LevelTimeSeriesCountNode.class.getSimpleName());
+ return new LevelTimeSeriesCountOperator(
+ node.getPlanNodeId(),
+ operatorContext,
+ node.getPath(),
+ node.isPrefixPath(),
+ node.getLevel());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 9b16bc9ee2..230bfc8d3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -47,6 +47,9 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.UDAFQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.UDTFQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountLevelTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
@@ -307,6 +310,38 @@ public class LogicalPlanner {
return planBuilder.getRoot();
}
+ @Override
+ public PlanNode visitCountDevices(
+ CountDevicesStatement countDevicesStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ planBuilder.planDevicesCountSource(
+ countDevicesStatement.getPartialPath(), countDevicesStatement.isPrefixPath());
+ planBuilder.planCountMerge();
+ return planBuilder.getRoot();
+ }
+
+ @Override
+ public PlanNode visitCountTimeSeries(
+ CountTimeSeriesStatement countTimeSeriesStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ planBuilder.planTimeSeriesCountSource(
+ countTimeSeriesStatement.getPartialPath(), countTimeSeriesStatement.isPrefixPath());
+ planBuilder.planCountMerge();
+ return planBuilder.getRoot();
+ }
+
+ @Override
+ public PlanNode visitCountLevelTimeSeries(
+ CountLevelTimeSeriesStatement countLevelTimeSeriesStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ planBuilder.planLevelTimeSeriesCountSource(
+ countLevelTimeSeriesStatement.getPartialPath(),
+ countLevelTimeSeriesStatement.isPrefixPath(),
+ countLevelTimeSeriesStatement.getLevel());
+ planBuilder.planCountMerge();
+ return planBuilder.getRoot();
+ }
+
@Override
public PlanNode visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
index eae36dc667..db9b24f755 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -23,9 +23,13 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.CountSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
@@ -325,8 +329,8 @@ public class QueryPlanBuilder {
}
public void planSchemaMerge(boolean orderByHeat) {
- SchemaMergeNode schemaMergeNode =
- new SchemaMergeNode(context.getQueryId().genPlanNodeId(), orderByHeat);
+ SeriesSchemaMergeNode schemaMergeNode =
+ new SeriesSchemaMergeNode(context.getQueryId().genPlanNodeId(), orderByHeat);
schemaMergeNode.addChild(this.getRoot());
this.root = schemaMergeNode;
}
@@ -334,4 +338,27 @@ public class QueryPlanBuilder {
public void planSchemaFetchSource(PathPatternTree patternTree) {
this.root = new SchemaFetchNode(context.getQueryId().genPlanNodeId(), patternTree);
}
+
+ public void planCountMerge() {
+ CountSchemaMergeNode countMergeNode =
+ new CountSchemaMergeNode(context.getQueryId().genPlanNodeId());
+ countMergeNode.addChild(this.getRoot());
+ this.root = countMergeNode;
+ }
+
+ public void planDevicesCountSource(PartialPath partialPath, boolean prefixPath) {
+ this.root = new DevicesCountNode(context.getQueryId().genPlanNodeId(), partialPath, prefixPath);
+ }
+
+ public void planTimeSeriesCountSource(PartialPath partialPath, boolean prefixPath) {
+ this.root =
+ new TimeSeriesCountNode(context.getQueryId().genPlanNodeId(), partialPath, prefixPath);
+ }
+
+ public void planLevelTimeSeriesCountSource(
+ PartialPath partialPath, boolean prefixPath, int level) {
+ this.root =
+ new LevelTimeSeriesCountNode(
+ context.getQueryId().genPlanNodeId(), partialPath, prefixPath, level);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index bb02df4cb3..74d4657136 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.CountSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
@@ -77,7 +81,11 @@ public enum PlanNodeType {
TIME_SERIES_SCHEMA_SCAN((short) 23),
SCHEMA_FETCH((short) 24),
SCHEMA_MERGE((short) 25),
- STORAGE_GROUP_SCHEMA_SCAN((short) 26);
+ STORAGE_GROUP_SCHEMA_SCAN((short) 26),
+ DEVICES_COUNT((short) 27),
+ TIME_SERIES_COUNT((short) 28),
+ LEVEL_TIME_SERIES_COUNT((short) 29),
+ COUNT_MERGE((short) 30);
private final short nodeType;
@@ -156,7 +164,15 @@ public enum PlanNodeType {
case 24:
return SchemaFetchNode.deserialize(buffer);
case 25:
- return SchemaMergeNode.deserialize(buffer);
+ return SeriesSchemaMergeNode.deserialize(buffer);
+ case 27:
+ return DevicesCountNode.deserialize(buffer);
+ case 28:
+ return TimeSeriesCountNode.deserialize(buffer);
+ case 29:
+ return LevelTimeSeriesCountNode.deserialize(buffer);
+ case 30:
+ return CountSchemaMergeNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index b526c4da97..aedc0c7b8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -18,10 +18,14 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.CountSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
@@ -101,7 +105,7 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
- public R visitSchemaMerge(SchemaMergeNode node, C context) {
+ public R visitSchemaMerge(SeriesSchemaMergeNode node, C context) {
return visitPlan(node, context);
};
@@ -117,6 +121,22 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitDevicesCount(DevicesCountNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitTimeSeriesCount(TimeSeriesCountNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitLevelTimeSeriesCount(LevelTimeSeriesCountNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitCountMerge(CountSchemaMergeNode node, C context) {
+ return visitPlan(node, context);
+ }
+
public R visitFragmentSink(FragmentSinkNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/AbstractSchemaMergeNode.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/AbstractSchemaMergeNode.java
index 1914825994..733052a202 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/AbstractSchemaMergeNode.java
@@ -16,36 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-public class SchemaMergeNode extends ProcessNode {
-
- private boolean orderByHeat;
+public abstract class AbstractSchemaMergeNode extends ProcessNode {
- private List<PlanNode> children;
+ private final List<PlanNode> children;
- public SchemaMergeNode(PlanNodeId id) {
+ public AbstractSchemaMergeNode(PlanNodeId id) {
super(id);
children = new ArrayList<>();
}
- public SchemaMergeNode(PlanNodeId id, boolean orderByHeat) {
- this(id);
- this.orderByHeat = orderByHeat;
- }
-
@Override
public List<PlanNode> getChildren() {
return children;
@@ -53,12 +45,7 @@ public class SchemaMergeNode extends ProcessNode {
@Override
public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
- @Override
- public PlanNode clone() {
- return new SchemaMergeNode(getPlanNodeId(), this.orderByHeat);
+ children.add(child);
}
@Override
@@ -68,31 +55,25 @@ public class SchemaMergeNode extends ProcessNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
+ if (children.size() > 0) {
+ return children.get(0).getOutputColumnHeaders();
+ }
+ return Collections.emptyList();
}
@Override
public List<String> getOutputColumnNames() {
- return null;
+ if (children.size() > 0) {
+ return children.get(0).getOutputColumnNames();
+ }
+ return Collections.emptyList();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return null;
- }
-
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
- }
-
- public static SchemaMergeNode deserialize(ByteBuffer byteBuffer) {
- PlanNodeId id = PlanNodeId.deserialize(byteBuffer);
- return new SchemaMergeNode(id);
- }
-
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitSchemaMerge(this, context);
+ if (children.size() > 0) {
+ return children.get(0).getOutputColumnTypes();
+ }
+ return Collections.emptyList();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/CountSchemaMergeNode.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/CountSchemaMergeNode.java
index 1914825994..d5c76a0e35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/CountSchemaMergeNode.java
@@ -16,83 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SchemaMergeNode extends ProcessNode {
-
- private boolean orderByHeat;
- private List<PlanNode> children;
+public class CountSchemaMergeNode extends AbstractSchemaMergeNode {
- public SchemaMergeNode(PlanNodeId id) {
+ public CountSchemaMergeNode(PlanNodeId id) {
super(id);
- children = new ArrayList<>();
- }
-
- public SchemaMergeNode(PlanNodeId id, boolean orderByHeat) {
- this(id);
- this.orderByHeat = orderByHeat;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
}
@Override
public PlanNode clone() {
- return new SchemaMergeNode(getPlanNodeId(), this.orderByHeat);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
- @Override
- public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
- @Override
- public List<TSDataType> getOutputColumnTypes() {
- return null;
+ return new CountSchemaMergeNode(getPlanNodeId());
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
+ PlanNodeType.COUNT_MERGE.serialize(byteBuffer);
}
- public static SchemaMergeNode deserialize(ByteBuffer byteBuffer) {
- PlanNodeId id = PlanNodeId.deserialize(byteBuffer);
- return new SchemaMergeNode(id);
+ public static PlanNode deserialize(ByteBuffer byteBuffer) {
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new CountSchemaMergeNode(planNodeId);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitSchemaMerge(this, context);
+ return visitor.visitCountMerge(this, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesCountNode.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesCountNode.java
index 1914825994..50ade9de68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesCountNode.java
@@ -16,83 +16,65 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
-public class SchemaMergeNode extends ProcessNode {
-
- private boolean orderByHeat;
-
- private List<PlanNode> children;
-
- public SchemaMergeNode(PlanNodeId id) {
- super(id);
- children = new ArrayList<>();
- }
+public class DevicesCountNode extends SchemaScanNode {
- public SchemaMergeNode(PlanNodeId id, boolean orderByHeat) {
- this(id);
- this.orderByHeat = orderByHeat;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
+ public DevicesCountNode(PlanNodeId id, PartialPath partialPath, boolean isPrefixPath) {
+ super(id, partialPath, isPrefixPath);
}
@Override
public PlanNode clone() {
- return new SchemaMergeNode(getPlanNodeId(), this.orderByHeat);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
+ return new DevicesCountNode(getPlanNodeId(), path, isPrefixPath);
}
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
+ return HeaderConstant.countDevicesHeader.getColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return HeaderConstant.countDevicesHeader.getRespColumns();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return null;
+ return HeaderConstant.countDevicesHeader.getRespDataTypes();
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
- }
-
- public static SchemaMergeNode deserialize(ByteBuffer byteBuffer) {
- PlanNodeId id = PlanNodeId.deserialize(byteBuffer);
- return new SchemaMergeNode(id);
+ PlanNodeType.DEVICES_COUNT.serialize(byteBuffer);
+ ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+ ReadWriteIOUtils.write(isPrefixPath, byteBuffer);
}
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitSchemaMerge(this, context);
+ public static PlanNode deserialize(ByteBuffer buffer) {
+ String fullPath = ReadWriteIOUtils.readString(buffer);
+ PartialPath path;
+ try {
+ path = new PartialPath(fullPath);
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize DevicesSchemaScanNode", e);
+ }
+ boolean isPrefixPath = ReadWriteIOUtils.readBool(buffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+ return new DevicesCountNode(planNodeId, path, isPrefixPath);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
index a66513953a..5c93ab6087 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -50,14 +51,6 @@ public class DevicesSchemaScanNode extends SchemaScanNode {
return hasSgCol;
}
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
-
- @Override
- public void addChild(PlanNode child) {}
-
@Override
public PlanNode clone() {
return new DevicesSchemaScanNode(getPlanNodeId(), path, limit, offset, isPrefixPath, hasSgCol);
@@ -65,17 +58,26 @@ public class DevicesSchemaScanNode extends SchemaScanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
+ if (hasSgCol) {
+ return HeaderConstant.showDevicesWithSgHeader.getColumnHeaders();
+ }
+ return HeaderConstant.showDevicesHeader.getColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return null;
+ if (hasSgCol) {
+ return HeaderConstant.showDevicesWithSgHeader.getRespColumns();
+ }
+ return HeaderConstant.showDevicesHeader.getRespColumns();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return null;
+ if (hasSgCol) {
+ return HeaderConstant.showDevicesWithSgHeader.getRespDataTypes();
+ }
+ return HeaderConstant.showDevicesHeader.getRespDataTypes();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/LevelTimeSeriesCountNode.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/LevelTimeSeriesCountNode.java
index a66513953a..f1846959e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/LevelTimeSeriesCountNode.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -31,77 +33,59 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
-public class DevicesSchemaScanNode extends SchemaScanNode {
-
- private final boolean hasSgCol;
+public class LevelTimeSeriesCountNode extends SchemaScanNode {
+ private final int level;
- public DevicesSchemaScanNode(
- PlanNodeId id,
- PartialPath path,
- int limit,
- int offset,
- boolean isPrefixPath,
- boolean hasSgCol) {
- super(id, path, limit, offset, isPrefixPath);
- this.hasSgCol = hasSgCol;
+ public LevelTimeSeriesCountNode(
+ PlanNodeId id, PartialPath partialPath, boolean isPrefixPath, int level) {
+ super(id, partialPath, isPrefixPath);
+ this.level = level;
}
- public boolean isHasSgCol() {
- return hasSgCol;
+ public int getLevel() {
+ return level;
}
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
-
- @Override
- public void addChild(PlanNode child) {}
-
@Override
public PlanNode clone() {
- return new DevicesSchemaScanNode(getPlanNodeId(), path, limit, offset, isPrefixPath, hasSgCol);
+ return new LevelTimeSeriesCountNode(getPlanNodeId(), path, isPrefixPath, level);
}
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
+ return HeaderConstant.countLevelTimeSeriesHeader.getColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return HeaderConstant.countLevelTimeSeriesHeader.getRespColumns();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return null;
+ return HeaderConstant.countLevelTimeSeriesHeader.getRespDataTypes();
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.DEVICES_SCHEMA_SCAN.serialize(byteBuffer);
+ PlanNodeType.LEVEL_TIME_SERIES_COUNT.serialize(byteBuffer);
ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
- ReadWriteIOUtils.write(limit, byteBuffer);
- ReadWriteIOUtils.write(offset, byteBuffer);
ReadWriteIOUtils.write(isPrefixPath, byteBuffer);
- ReadWriteIOUtils.write(hasSgCol, byteBuffer);
+ ReadWriteIOUtils.write(level, byteBuffer);
}
- public static DevicesSchemaScanNode deserialize(ByteBuffer byteBuffer) {
- String fullPath = ReadWriteIOUtils.readString(byteBuffer);
+ public static PlanNode deserialize(ByteBuffer buffer) {
+ String fullPath = ReadWriteIOUtils.readString(buffer);
PartialPath path;
try {
path = new PartialPath(fullPath);
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Cannot deserialize DevicesSchemaScanNode", e);
}
- int limit = ReadWriteIOUtils.readInt(byteBuffer);
- int offset = ReadWriteIOUtils.readInt(byteBuffer);
- boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
- boolean hasSgCol = ReadWriteIOUtils.readBool(byteBuffer);
- PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new DevicesSchemaScanNode(planNodeId, path, limit, offset, isPrefixPath, hasSgCol);
+ boolean isPrefixPath = ReadWriteIOUtils.readBool(buffer);
+ int level = ReadWriteIOUtils.readInt(buffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+ return new LevelTimeSeriesCountNode(planNodeId, path, isPrefixPath, level);
}
@Override
@@ -115,12 +99,12 @@ public class DevicesSchemaScanNode extends SchemaScanNode {
if (!super.equals(o)) {
return false;
}
- DevicesSchemaScanNode that = (DevicesSchemaScanNode) o;
- return hasSgCol == that.hasSgCol;
+ LevelTimeSeriesCountNode that = (LevelTimeSeriesCountNode) o;
+ return level == that.level;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), hasSgCol);
+ return Objects.hash(super.hashCode(), level);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
index 2afbfa4b95..a2bbbae1df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import com.google.common.collect.ImmutableList;
-
import java.nio.ByteBuffer;
import java.util.List;
@@ -45,24 +43,11 @@ public class SchemaFetchNode extends SchemaScanNode {
return patternTree;
}
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of();
- }
-
- @Override
- public void addChild(PlanNode child) {}
-
@Override
public PlanNode clone() {
return new SchemaFetchNode(getPlanNodeId(), patternTree);
}
- @Override
- public int allowedChildCount() {
- return 0;
- }
-
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java
index d1605ba8fd..b67d887b35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java
@@ -20,9 +20,13 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
public abstract class SchemaScanNode extends SourceNode {
@@ -35,12 +39,7 @@ public abstract class SchemaScanNode extends SourceNode {
private TRegionReplicaSet schemaRegionReplicaSet;
protected SchemaScanNode(PlanNodeId id) {
- super(id);
- limit = 0;
- offset = 0;
- path = null;
- hasLimit = false;
- isPrefixPath = false;
+ this(id, null, false);
}
protected SchemaScanNode(
@@ -52,6 +51,10 @@ public abstract class SchemaScanNode extends SourceNode {
this.isPrefixPath = isPrefixPath;
}
+ protected SchemaScanNode(PlanNodeId id, PartialPath partialPath, boolean isPrefixPath) {
+ this(id, partialPath, 0, 0, isPrefixPath);
+ }
+
@Override
public void open() throws Exception {}
@@ -60,6 +63,14 @@ public abstract class SchemaScanNode extends SourceNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<PlanNode> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
@Override
public void close() throws Exception {}
@@ -114,6 +125,11 @@ public abstract class SchemaScanNode extends SourceNode {
this.hasLimit = hasLimit;
}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSchemaScan(this, context);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
similarity index 58%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
index 1914825994..ef382dcb93 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
@@ -18,67 +18,29 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-public class SchemaMergeNode extends ProcessNode {
+public class SeriesSchemaMergeNode extends AbstractSchemaMergeNode {
private boolean orderByHeat;
- private List<PlanNode> children;
-
- public SchemaMergeNode(PlanNodeId id) {
+ public SeriesSchemaMergeNode(PlanNodeId id) {
super(id);
- children = new ArrayList<>();
}
- public SchemaMergeNode(PlanNodeId id, boolean orderByHeat) {
+ public SeriesSchemaMergeNode(PlanNodeId id, boolean orderByHeat) {
this(id);
this.orderByHeat = orderByHeat;
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
@Override
public PlanNode clone() {
- return new SchemaMergeNode(getPlanNodeId(), this.orderByHeat);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
- @Override
- public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
- @Override
- public List<TSDataType> getOutputColumnTypes() {
- return null;
+ return new SeriesSchemaMergeNode(getPlanNodeId(), this.orderByHeat);
}
@Override
@@ -86,9 +48,9 @@ public class SchemaMergeNode extends ProcessNode {
PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
}
- public static SchemaMergeNode deserialize(ByteBuffer byteBuffer) {
+ public static SeriesSchemaMergeNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId id = PlanNodeId.deserialize(byteBuffer);
- return new SchemaMergeNode(id);
+ return new SeriesSchemaMergeNode(id);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesCountNode.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesCountNode.java
index 2afbfa4b95..e179a91fe6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesCountNode.java
@@ -19,85 +19,62 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
-public class SchemaFetchNode extends SchemaScanNode {
-
- private final PathPatternTree patternTree;
-
- public SchemaFetchNode(PlanNodeId id, PathPatternTree patternTree) {
- super(id);
- this.patternTree = patternTree;
- }
-
- public PathPatternTree getPatternTree() {
- return patternTree;
- }
+public class TimeSeriesCountNode extends SchemaScanNode {
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of();
+ public TimeSeriesCountNode(PlanNodeId id, PartialPath partialPath, boolean isPrefixPath) {
+ super(id, partialPath, isPrefixPath);
}
- @Override
- public void addChild(PlanNode child) {}
-
@Override
public PlanNode clone() {
- return new SchemaFetchNode(getPlanNodeId(), patternTree);
- }
-
- @Override
- public int allowedChildCount() {
- return 0;
+ return new TimeSeriesCountNode(getPlanNodeId(), path, isPrefixPath);
}
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
+ return HeaderConstant.countTimeSeriesHeader.getColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return HeaderConstant.countTimeSeriesHeader.getRespColumns();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return null;
+ return HeaderConstant.countTimeSeriesHeader.getRespDataTypes();
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.SCHEMA_FETCH.serialize(byteBuffer);
- patternTree.serialize(byteBuffer);
+ PlanNodeType.TIME_SERIES_COUNT.serialize(byteBuffer);
+ ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+ ReadWriteIOUtils.write(isPrefixPath, byteBuffer);
}
- public static SchemaFetchNode deserialize(ByteBuffer byteBuffer) {
- PathPatternTree patternTree = PathPatternTree.deserialize(byteBuffer);
- PlanNodeId id = PlanNodeId.deserialize(byteBuffer);
- return new SchemaFetchNode(id, patternTree);
- }
-
- @Override
- public void open() throws Exception {}
-
- @Override
- public void close() throws Exception {}
-
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitSchemaFetch(this, context);
+ public static PlanNode deserialize(ByteBuffer buffer) {
+ String fullPath = ReadWriteIOUtils.readString(buffer);
+ PartialPath path;
+ try {
+ path = new PartialPath(fullPath);
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize DevicesSchemaScanNode", e);
+ }
+ boolean isPrefixPath = ReadWriteIOUtils.readBool(buffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+ return new TimeSeriesCountNode(planNodeId, path, isPrefixPath);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
index c789cc64f3..702c412886 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -28,7 +29,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -109,14 +109,6 @@ public class TimeSeriesSchemaScanNode extends SchemaScanNode {
return orderByHeat;
}
- @Override
- public List<PlanNode> getChildren() {
- return Collections.emptyList();
- }
-
- @Override
- public void addChild(PlanNode child) {}
-
@Override
public PlanNode clone() {
return new TimeSeriesSchemaScanNode(
@@ -125,17 +117,17 @@ public class TimeSeriesSchemaScanNode extends SchemaScanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return null;
+ return HeaderConstant.showTimeSeriesHeader.getColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return HeaderConstant.showTimeSeriesHeader.getRespColumns();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return null;
+ return HeaderConstant.showTimeSeriesHeader.getRespDataTypes();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 1e38fcbc8b..5ef13bb304 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -34,6 +34,9 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.UDAFQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.UDTFQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountLevelTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
@@ -149,6 +152,18 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(showDevicesStatement, context);
}
+ public R visitCountDevices(CountDevicesStatement countStatement, C context) {
+ return visitStatement(countStatement, context);
+ }
+
+ public R visitCountTimeSeries(CountTimeSeriesStatement countStatement, C context) {
+ return visitStatement(countStatement, context);
+ }
+
+ public R visitCountLevelTimeSeries(CountLevelTimeSeriesStatement countStatement, C context) {
+ return visitStatement(countStatement, context);
+ }
+
public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
return visitStatement(insertRowStatement, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountDevicesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountDevicesStatement.java
new file mode 100644
index 0000000000..4539eed81b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountDevicesStatement.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.metadata;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+
+public class CountDevicesStatement extends CountStatement {
+ public CountDevicesStatement(PartialPath partialPath) {
+ super(partialPath);
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCountDevices(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountLevelTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountLevelTimeSeriesStatement.java
new file mode 100644
index 0000000000..6c225264a9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountLevelTimeSeriesStatement.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.metadata;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+
+public class CountLevelTimeSeriesStatement extends CountStatement {
+ private int level;
+
+ public CountLevelTimeSeriesStatement(PartialPath partialPath, int level) {
+ super(partialPath);
+ this.level = level;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCountLevelTimeSeries(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountNodesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountNodesStatement.java
new file mode 100644
index 0000000000..5af0257c7d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountNodesStatement.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.metadata;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+public class CountNodesStatement extends CountStatement {
+ private final int level;
+
+ public CountNodesStatement(PartialPath partialPath, int level) {
+ super(partialPath);
+ this.level = level;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountStatement.java
new file mode 100644
index 0000000000..888dbed5f4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountStatement.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.metadata;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.constant.StatementType;
+
+/**
+ * COUNT statement.
+ *
+ * <p>Here is the syntax definition:
+ *
+ * <p>COUNT {STORAGE GROUP | DEVICES | TIMESERIES | NODES} [prefixPath] [GROUP BY] LEVEL = level
+ */
+public class CountStatement extends ShowStatement {
+ protected PartialPath partialPath;
+
+ public CountStatement(PartialPath partialPath) {
+ this.partialPath = partialPath;
+ setType(StatementType.COUNT);
+ }
+
+ public PartialPath getPartialPath() {
+ return partialPath;
+ }
+
+ public void setPartialPath(PartialPath partialPath) {
+ this.partialPath = partialPath;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountStorageGroupStatement.java
new file mode 100644
index 0000000000..26f9fc5111
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountStorageGroupStatement.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.metadata;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+public class CountStorageGroupStatement extends CountStatement {
+ private PartialPath partialPath;
+
+ public CountStorageGroupStatement(PartialPath partialPath) {
+ super(partialPath);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountTimeSeriesStatement.java
new file mode 100644
index 0000000000..472b470587
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CountTimeSeriesStatement.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.metadata;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+
+public class CountTimeSeriesStatement extends CountStatement {
+ public CountTimeSeriesStatement(PartialPath partialPath) {
+ super(partialPath);
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCountTimeSeries(this, context);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/CountMergeOperatorTest.java
similarity index 51%
copy from server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
copy to server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/CountMergeOperatorTest.java
index 6b3874b11d..e377980fa2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/CountMergeOperatorTest.java
@@ -35,17 +35,10 @@ import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.commons.lang.StringUtils;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -55,24 +48,15 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_IS_ALIGNED;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TAGS;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class SchemaScanOperatorTest {
- private static final String META_SCAN_OPERATOR_TEST_SG = "root.MetaScanOperatorTest";
+public class CountMergeOperatorTest {
+ private static final String COUNT_MERGE_OPERATOR_TEST_SG = "root.CountMergeOperatorTest";
private final List<String> deviceIds = new ArrayList<>();
private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
@@ -82,7 +66,7 @@ public class SchemaScanOperatorTest {
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(
- measurementSchemas, deviceIds, seqResources, unSeqResources, META_SCAN_OPERATOR_TEST_SG);
+ measurementSchemas, deviceIds, seqResources, unSeqResources, COUNT_MERGE_OPERATOR_TEST_SG);
}
@After
@@ -91,7 +75,7 @@ public class SchemaScanOperatorTest {
}
@Test
- public void testDeviceMetaScanOperator() {
+ public void testTimeSeriesCountOperator() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -105,8 +89,8 @@ public class SchemaScanOperatorTest {
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
- 1, planNodeId, SchemaScanOperator.class.getSimpleName());
- PartialPath partialPath = new PartialPath(META_SCAN_OPERATOR_TEST_SG + ".device0");
+ 1, planNodeId, TimeSeriesCountOperator.class.getSimpleName());
+ PartialPath partialPath = new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG);
ISchemaRegion schemaRegion =
SchemaEngine.getInstance()
.getSchemaRegion(
@@ -114,43 +98,25 @@ public class SchemaScanOperatorTest {
operatorContext
.getInstanceContext()
.setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
- List<String> columns = Arrays.asList(COLUMN_DEVICES, COLUMN_STORAGE_GROUP, COLUMN_IS_ALIGNED);
- DevicesSchemaScanOperator deviceMetaScanOperator =
- new DevicesSchemaScanOperator(
+ TimeSeriesCountOperator timeSeriesCountOperator =
+ new TimeSeriesCountOperator(
+ planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), partialPath, true);
+ TsBlock tsBlock = null;
+ while (timeSeriesCountOperator.hasNext()) {
+ tsBlock = timeSeriesCountOperator.next();
+ }
+ assertNotNull(tsBlock);
+ assertEquals(100, tsBlock.getColumn(0).getInt(0));
+ TimeSeriesCountOperator timeSeriesCountOperator2 =
+ new TimeSeriesCountOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
- 10,
- 0,
- partialPath,
- false,
- true);
- while (deviceMetaScanOperator.hasNext()) {
- TsBlock tsBlock = deviceMetaScanOperator.next();
- assertEquals(3, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof BinaryColumn);
- assertEquals(1, tsBlock.getPositionCount());
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- Assert.assertEquals(0, tsBlock.getTimeByIndex(i));
- for (int j = 0; j < columns.size(); j++) {
- switch (j) {
- case 0:
- assertEquals(
- tsBlock.getColumn(j).getBinary(i).toString(),
- META_SCAN_OPERATOR_TEST_SG + ".device0");
- break;
- case 1:
- assertEquals(
- tsBlock.getColumn(j).getBinary(i).toString(), META_SCAN_OPERATOR_TEST_SG);
- break;
- case 2:
- assertFalse(tsBlock.getColumn(j).getBoolean(i));
- break;
- default:
- break;
- }
- }
- }
- }
+ new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG + ".device1.*"),
+ false);
+ tsBlock = timeSeriesCountOperator2.next();
+ assertFalse(timeSeriesCountOperator2.hasNext());
+ assertTrue(timeSeriesCountOperator2.isFinished());
+ assertEquals(10, tsBlock.getColumn(0).getInt(0));
} catch (MetadataException e) {
e.printStackTrace();
fail();
@@ -160,7 +126,7 @@ public class SchemaScanOperatorTest {
}
@Test
- public void testTimeSeriesMetaScanOperator() {
+ public void testCountMergeOperator() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -174,74 +140,47 @@ public class SchemaScanOperatorTest {
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
- 1, planNodeId, SchemaScanOperator.class.getSimpleName());
- PartialPath partialPath = new PartialPath(META_SCAN_OPERATOR_TEST_SG + ".device0.*");
+ 1, planNodeId, LevelTimeSeriesCountOperator.class.getSimpleName());
ISchemaRegion schemaRegion =
SchemaEngine.getInstance()
.getSchemaRegion(
- LocalConfigNode.getInstance().getBelongedSchemaRegionId(partialPath));
+ LocalConfigNode.getInstance()
+ .getBelongedSchemaRegionId(new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG)));
operatorContext
.getInstanceContext()
.setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
- List<String> columns =
- Arrays.asList(
- COLUMN_TIMESERIES,
- COLUMN_TIMESERIES_ALIAS,
- COLUMN_STORAGE_GROUP,
- COLUMN_TIMESERIES_DATATYPE,
- COLUMN_TIMESERIES_ENCODING,
- COLUMN_TIMESERIES_COMPRESSION,
- COLUMN_TAGS,
- COLUMN_ATTRIBUTES);
- TimeSeriesSchemaScanOperator timeSeriesMetaScanOperator =
- new TimeSeriesSchemaScanOperator(
+ LevelTimeSeriesCountOperator timeSeriesCountOperator1 =
+ new LevelTimeSeriesCountOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
- 10,
- 0,
- partialPath,
- null,
- null,
- false,
- false,
- false);
- while (timeSeriesMetaScanOperator.hasNext()) {
- TsBlock tsBlock = timeSeriesMetaScanOperator.next();
- assertEquals(8, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof BinaryColumn);
- assertEquals(10, tsBlock.getPositionCount());
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- Assert.assertEquals(0, tsBlock.getTimeByIndex(i));
- for (int j = 0; j < columns.size(); j++) {
- Binary binary =
- tsBlock.getColumn(j).isNull(i) ? null : tsBlock.getColumn(j).getBinary(i);
- String value = binary == null ? "null" : binary.toString();
- switch (j) {
- case 0:
- Assert.assertTrue(value.startsWith(META_SCAN_OPERATOR_TEST_SG + ".device0"));
- break;
- case 1:
- assertEquals("null", value);
- break;
- case 2:
- assertEquals(META_SCAN_OPERATOR_TEST_SG, value);
- break;
- case 3:
- assertEquals(TSDataType.INT32.toString(), value);
- break;
- case 4:
- assertEquals(TSEncoding.PLAIN.toString(), value);
- break;
- case 5:
- assertEquals(CompressionType.UNCOMPRESSED.toString(), value);
- break;
- case 6:
- case 7:
- assertTrue(StringUtils.isBlank(value));
- default:
- break;
- }
- }
+ new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG),
+ true,
+ 2);
+ LevelTimeSeriesCountOperator timeSeriesCountOperator2 =
+ new LevelTimeSeriesCountOperator(
+ planNodeId,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ new PartialPath(COUNT_MERGE_OPERATOR_TEST_SG + ".device2"),
+ true,
+ 2);
+ CountMergeOperator countMergeOperator =
+ new CountMergeOperator(
+ planNodeId,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ Arrays.asList(timeSeriesCountOperator1, timeSeriesCountOperator2));
+ TsBlock tsBlock = null;
+ while (countMergeOperator.hasNext()) {
+ tsBlock = countMergeOperator.next();
+ assertFalse(countMergeOperator.hasNext());
+ }
+ assertNotNull(tsBlock);
+ for (int i = 0; i < 10; i++) {
+ String path = tsBlock.getColumn(0).getBinary(i).getStringValue();
+ assertTrue(path.startsWith(COUNT_MERGE_OPERATOR_TEST_SG + ".device"));
+ if (path.equals(COUNT_MERGE_OPERATOR_TEST_SG + ".device2")) {
+ assertEquals(20, tsBlock.getColumn(1).getInt(i));
+ } else {
+ assertEquals(10, tsBlock.getColumn(1).getInt(i));
}
}
} catch (MetadataException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaCountOperatorTest.java
similarity index 52%
copy from server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
copy to server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaCountOperatorTest.java
index 6b3874b11d..3b5ce82f6d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaCountOperatorTest.java
@@ -35,44 +35,27 @@ import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.commons.lang.StringUtils;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_IS_ALIGNED;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TAGS;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class SchemaScanOperatorTest {
- private static final String META_SCAN_OPERATOR_TEST_SG = "root.MetaScanOperatorTest";
+public class SchemaCountOperatorTest {
+ private static final String SCHEMA_COUNT_OPERATOR_TEST_SG = "root.SchemaCountOperatorTest";
private final List<String> deviceIds = new ArrayList<>();
private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
@@ -82,7 +65,7 @@ public class SchemaScanOperatorTest {
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(
- measurementSchemas, deviceIds, seqResources, unSeqResources, META_SCAN_OPERATOR_TEST_SG);
+ measurementSchemas, deviceIds, seqResources, unSeqResources, SCHEMA_COUNT_OPERATOR_TEST_SG);
}
@After
@@ -91,7 +74,7 @@ public class SchemaScanOperatorTest {
}
@Test
- public void testDeviceMetaScanOperator() {
+ public void testDeviceCountOperator() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -105,8 +88,8 @@ public class SchemaScanOperatorTest {
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
- 1, planNodeId, SchemaScanOperator.class.getSimpleName());
- PartialPath partialPath = new PartialPath(META_SCAN_OPERATOR_TEST_SG + ".device0");
+ 1, planNodeId, DevicesCountOperator.class.getSimpleName());
+ PartialPath partialPath = new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG);
ISchemaRegion schemaRegion =
SchemaEngine.getInstance()
.getSchemaRegion(
@@ -114,43 +97,66 @@ public class SchemaScanOperatorTest {
operatorContext
.getInstanceContext()
.setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
- List<String> columns = Arrays.asList(COLUMN_DEVICES, COLUMN_STORAGE_GROUP, COLUMN_IS_ALIGNED);
- DevicesSchemaScanOperator deviceMetaScanOperator =
- new DevicesSchemaScanOperator(
+ DevicesCountOperator devicesCountOperator =
+ new DevicesCountOperator(
+ planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), partialPath, true);
+ TsBlock tsBlock = null;
+ while (devicesCountOperator.hasNext()) {
+ tsBlock = devicesCountOperator.next();
+ }
+ assertNotNull(tsBlock);
+ assertEquals(tsBlock.getColumn(0).getInt(0), 10);
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testTimeSeriesCountOperator() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = queryId.genPlanNodeId();
+ OperatorContext operatorContext =
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, TimeSeriesCountOperator.class.getSimpleName());
+ PartialPath partialPath = new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG);
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance()
+ .getSchemaRegion(
+ LocalConfigNode.getInstance().getBelongedSchemaRegionId(partialPath));
+ operatorContext
+ .getInstanceContext()
+ .setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ TimeSeriesCountOperator timeSeriesCountOperator =
+ new TimeSeriesCountOperator(
+ planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), partialPath, true);
+ TsBlock tsBlock = null;
+ while (timeSeriesCountOperator.hasNext()) {
+ tsBlock = timeSeriesCountOperator.next();
+ }
+ assertNotNull(tsBlock);
+ assertEquals(100, tsBlock.getColumn(0).getInt(0));
+ TimeSeriesCountOperator timeSeriesCountOperator2 =
+ new TimeSeriesCountOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
- 10,
- 0,
- partialPath,
- false,
- true);
- while (deviceMetaScanOperator.hasNext()) {
- TsBlock tsBlock = deviceMetaScanOperator.next();
- assertEquals(3, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof BinaryColumn);
- assertEquals(1, tsBlock.getPositionCount());
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- Assert.assertEquals(0, tsBlock.getTimeByIndex(i));
- for (int j = 0; j < columns.size(); j++) {
- switch (j) {
- case 0:
- assertEquals(
- tsBlock.getColumn(j).getBinary(i).toString(),
- META_SCAN_OPERATOR_TEST_SG + ".device0");
- break;
- case 1:
- assertEquals(
- tsBlock.getColumn(j).getBinary(i).toString(), META_SCAN_OPERATOR_TEST_SG);
- break;
- case 2:
- assertFalse(tsBlock.getColumn(j).getBoolean(i));
- break;
- default:
- break;
- }
- }
- }
- }
+ new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG + ".device1.*"),
+ false);
+ tsBlock = timeSeriesCountOperator2.next();
+ assertFalse(timeSeriesCountOperator2.hasNext());
+ assertTrue(timeSeriesCountOperator2.isFinished());
+ assertEquals(10, tsBlock.getColumn(0).getInt(0));
} catch (MetadataException e) {
e.printStackTrace();
fail();
@@ -160,7 +166,7 @@ public class SchemaScanOperatorTest {
}
@Test
- public void testTimeSeriesMetaScanOperator() {
+ public void testLevelTimeSeriesCountOperator() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -174,8 +180,8 @@ public class SchemaScanOperatorTest {
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
- 1, planNodeId, SchemaScanOperator.class.getSimpleName());
- PartialPath partialPath = new PartialPath(META_SCAN_OPERATOR_TEST_SG + ".device0.*");
+ 1, planNodeId, LevelTimeSeriesCountOperator.class.getSimpleName());
+ PartialPath partialPath = new PartialPath(SCHEMA_COUNT_OPERATOR_TEST_SG);
ISchemaRegion schemaRegion =
SchemaEngine.getInstance()
.getSchemaRegion(
@@ -183,67 +189,38 @@ public class SchemaScanOperatorTest {
operatorContext
.getInstanceContext()
.setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
- List<String> columns =
- Arrays.asList(
- COLUMN_TIMESERIES,
- COLUMN_TIMESERIES_ALIAS,
- COLUMN_STORAGE_GROUP,
- COLUMN_TIMESERIES_DATATYPE,
- COLUMN_TIMESERIES_ENCODING,
- COLUMN_TIMESERIES_COMPRESSION,
- COLUMN_TAGS,
- COLUMN_ATTRIBUTES);
- TimeSeriesSchemaScanOperator timeSeriesMetaScanOperator =
- new TimeSeriesSchemaScanOperator(
+ LevelTimeSeriesCountOperator timeSeriesCountOperator =
+ new LevelTimeSeriesCountOperator(
planNodeId,
fragmentInstanceContext.getOperatorContexts().get(0),
- 10,
- 0,
partialPath,
- null,
- null,
- false,
- false,
- false);
- while (timeSeriesMetaScanOperator.hasNext()) {
- TsBlock tsBlock = timeSeriesMetaScanOperator.next();
- assertEquals(8, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof BinaryColumn);
- assertEquals(10, tsBlock.getPositionCount());
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- Assert.assertEquals(0, tsBlock.getTimeByIndex(i));
- for (int j = 0; j < columns.size(); j++) {
- Binary binary =
- tsBlock.getColumn(j).isNull(i) ? null : tsBlock.getColumn(j).getBinary(i);
- String value = binary == null ? "null" : binary.toString();
- switch (j) {
- case 0:
- Assert.assertTrue(value.startsWith(META_SCAN_OPERATOR_TEST_SG + ".device0"));
- break;
- case 1:
- assertEquals("null", value);
- break;
- case 2:
- assertEquals(META_SCAN_OPERATOR_TEST_SG, value);
- break;
- case 3:
- assertEquals(TSDataType.INT32.toString(), value);
- break;
- case 4:
- assertEquals(TSEncoding.PLAIN.toString(), value);
- break;
- case 5:
- assertEquals(CompressionType.UNCOMPRESSED.toString(), value);
- break;
- case 6:
- case 7:
- assertTrue(StringUtils.isBlank(value));
- default:
- break;
- }
- }
- }
+ true,
+ 2);
+ TsBlock tsBlock = null;
+ while (timeSeriesCountOperator.hasNext()) {
+ tsBlock = timeSeriesCountOperator.next();
+ assertFalse(timeSeriesCountOperator.hasNext());
+ }
+ assertNotNull(tsBlock);
+
+ for (int i = 0; i < 10; i++) {
+ String path = tsBlock.getColumn(0).getBinary(i).getStringValue();
+ assertTrue(path.startsWith(SCHEMA_COUNT_OPERATOR_TEST_SG + ".device"));
+ assertEquals(10, tsBlock.getColumn(1).getInt(i));
+ }
+
+ LevelTimeSeriesCountOperator timeSeriesCountOperator2 =
+ new LevelTimeSeriesCountOperator(
+ planNodeId,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ partialPath,
+ true,
+ 1);
+ while (timeSeriesCountOperator2.hasNext()) {
+ tsBlock = timeSeriesCountOperator2.next();
}
+ assertNotNull(tsBlock);
+ assertEquals(100, tsBlock.getColumn(1).getInt(0));
} catch (MetadataException e) {
e.printStackTrace();
fail();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 6b3874b11d..2757747573 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -67,7 +67,6 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATA
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -143,7 +142,7 @@ public class SchemaScanOperatorTest {
tsBlock.getColumn(j).getBinary(i).toString(), META_SCAN_OPERATOR_TEST_SG);
break;
case 2:
- assertFalse(tsBlock.getColumn(j).getBoolean(i));
+ assertEquals("false", tsBlock.getColumn(j).getBinary(i).toString());
break;
default:
break;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index f8bfecbda2..63e0e59836 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
@@ -147,7 +147,7 @@ public class DistributionPlannerTest {
@Test
public void testRewriteMetaSourceNode() throws IllegalPathException {
QueryId queryId = new QueryId("test_query");
- SchemaMergeNode metaMergeNode = new SchemaMergeNode(queryId.genPlanNodeId(), false);
+ SeriesSchemaMergeNode metaMergeNode = new SeriesSchemaMergeNode(queryId.genPlanNodeId(), false);
metaMergeNode.addChild(
new TimeSeriesSchemaScanNode(
queryId.genPlanNodeId(),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index e8d5f0ec65..a0e35fc38f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
@@ -381,7 +381,7 @@ public class LogicalPlannerTest {
try {
LimitNode limitNode = (LimitNode) parseSQLToPlanNode(sql);
OffsetNode offsetNode = (OffsetNode) limitNode.getChild();
- SchemaMergeNode metaMergeNode = (SchemaMergeNode) offsetNode.getChild();
+ SeriesSchemaMergeNode metaMergeNode = (SeriesSchemaMergeNode) offsetNode.getChild();
metaMergeNode.getChildren().forEach(n -> System.out.println(n.toString()));
TimeSeriesSchemaScanNode showTimeSeriesNode =
(TimeSeriesSchemaScanNode) metaMergeNode.getChildren().get(0);
@@ -426,7 +426,7 @@ public class LogicalPlannerTest {
try {
LimitNode limitNode = (LimitNode) parseSQLToPlanNode(sql);
OffsetNode offsetNode = (OffsetNode) limitNode.getChild();
- SchemaMergeNode metaMergeNode = (SchemaMergeNode) offsetNode.getChild();
+ SeriesSchemaMergeNode metaMergeNode = (SeriesSchemaMergeNode) offsetNode.getChild();
DevicesSchemaScanNode showDevicesNode =
(DevicesSchemaScanNode) metaMergeNode.getChildren().get(0);
Assert.assertNotNull(showDevicesNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/DeviceSchemaScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/DeviceSchemaScanNodeSerdeTest.java
index 166150ffb8..3d6f428d4e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/DeviceSchemaScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/DeviceSchemaScanNodeSerdeTest.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
@@ -43,7 +43,8 @@ public class DeviceSchemaScanNodeSerdeTest {
public void testSerializeAndDeserialize() throws IllegalPathException {
OffsetNode offsetNode = new OffsetNode(new PlanNodeId("offset"), 10);
LimitNode limitNode = new LimitNode(new PlanNodeId("limit"), 10);
- SchemaMergeNode schemaMergeNode = new SchemaMergeNode(new PlanNodeId("schemaMerge"));
+ SeriesSchemaMergeNode schemaMergeNode =
+ new SeriesSchemaMergeNode(new PlanNodeId("schemaMerge"));
ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("exchange"));
DevicesSchemaScanNode devicesSchemaScanNode =
new DevicesSchemaScanNode(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/TimeSeriesSchemaScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/SchemaCountNodeSerdeTest.java
similarity index 52%
copy from server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/TimeSeriesSchemaScanNodeSerdeTest.java
copy to server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/SchemaCountNodeSerdeTest.java
index 5d322a751f..a2e1d10ed5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/TimeSeriesSchemaScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/SchemaCountNodeSerdeTest.java
@@ -25,11 +25,10 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.CountSchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesCountNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.junit.Assert;
@@ -37,43 +36,57 @@ import org.junit.Test;
import java.nio.ByteBuffer;
-public class TimeSeriesSchemaScanNodeSerdeTest {
+public class SchemaCountNodeSerdeTest {
@Test
- public void testSerializeAndDeserialize() throws IllegalPathException {
- OffsetNode offsetNode = new OffsetNode(new PlanNodeId("offset"), 10);
- LimitNode limitNode = new LimitNode(new PlanNodeId("limit"), 10);
- SchemaMergeNode schemaMergeNode = new SchemaMergeNode(new PlanNodeId("schemaMerge"));
+ public void testDevicesCountSerializeAndDeserialize() throws IllegalPathException {
+ CountSchemaMergeNode countMergeNode = new CountSchemaMergeNode(new PlanNodeId("countMerge"));
ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("exchange"));
- TimeSeriesSchemaScanNode timeSeriesSchemaScanNode =
- new TimeSeriesSchemaScanNode(
- new PlanNodeId("timeSeriesSchemaScan"),
- new PartialPath("root.sg.device.sensor"),
- null,
- null,
- 10,
- 0,
- false,
- false,
- false);
+ DevicesCountNode devicesCountNode =
+ new DevicesCountNode(
+ new PlanNodeId("devicesCount"), new PartialPath("root.sg.device"), true);
FragmentSinkNode fragmentSinkNode = new FragmentSinkNode(new PlanNodeId("fragmentSink"));
- fragmentSinkNode.addChild(timeSeriesSchemaScanNode);
+ fragmentSinkNode.addChild(devicesCountNode);
fragmentSinkNode.setDownStream(
new TEndPoint("127.0.0.1", 6667),
new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
new PlanNodeId("test"));
- exchangeNode.addChild(schemaMergeNode);
+ exchangeNode.addChild(countMergeNode);
exchangeNode.setRemoteSourceNode(fragmentSinkNode);
exchangeNode.setUpstream(
new TEndPoint("127.0.0.1", 6667),
new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
new PlanNodeId("test"));
- offsetNode.addChild(exchangeNode);
- limitNode.addChild(offsetNode);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
- limitNode.serialize(byteBuffer);
+ exchangeNode.serialize(byteBuffer);
byteBuffer.flip();
- LimitNode limitNode1 = (LimitNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
- Assert.assertEquals(limitNode, limitNode1);
+ ExchangeNode exchangeNode1 = (ExchangeNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
+ Assert.assertEquals(exchangeNode, exchangeNode1);
+ }
+
+ @Test
+ public void testTimeSeriesCountSerializeAndDeserialize() throws IllegalPathException {
+ CountSchemaMergeNode countMergeNode = new CountSchemaMergeNode(new PlanNodeId("countMerge"));
+ ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("exchange"));
+ LevelTimeSeriesCountNode levelTimeSeriesCountNode =
+ new LevelTimeSeriesCountNode(
+ new PlanNodeId("timeseriesCount"), new PartialPath("root.sg.device"), true, 10);
+ FragmentSinkNode fragmentSinkNode = new FragmentSinkNode(new PlanNodeId("fragmentSink"));
+ fragmentSinkNode.addChild(levelTimeSeriesCountNode);
+ fragmentSinkNode.setDownStream(
+ new TEndPoint("127.0.0.1", 6667),
+ new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
+ new PlanNodeId("test"));
+ exchangeNode.addChild(countMergeNode);
+ exchangeNode.setRemoteSourceNode(fragmentSinkNode);
+ exchangeNode.setUpstream(
+ new TEndPoint("127.0.0.1", 6667),
+ new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
+ new PlanNodeId("test"));
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ exchangeNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ ExchangeNode exchangeNode1 = (ExchangeNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
+ Assert.assertEquals(exchangeNode, exchangeNode1);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/TimeSeriesSchemaScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/TimeSeriesSchemaScanNodeSerdeTest.java
index 5d322a751f..b731033c8c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/TimeSeriesSchemaScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/TimeSeriesSchemaScanNodeSerdeTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SeriesSchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
@@ -43,7 +43,8 @@ public class TimeSeriesSchemaScanNodeSerdeTest {
public void testSerializeAndDeserialize() throws IllegalPathException {
OffsetNode offsetNode = new OffsetNode(new PlanNodeId("offset"), 10);
LimitNode limitNode = new LimitNode(new PlanNodeId("limit"), 10);
- SchemaMergeNode schemaMergeNode = new SchemaMergeNode(new PlanNodeId("schemaMerge"));
+ SeriesSchemaMergeNode schemaMergeNode =
+ new SeriesSchemaMergeNode(new PlanNodeId("schemaMerge"));
ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("exchange"));
TimeSeriesSchemaScanNode timeSeriesSchemaScanNode =
new TimeSeriesSchemaScanNode(