You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 02:42:07 UTC
[iotdb] branch master updated: [IOTDB-2803][new cluster] Adapt show timeseries to mpp (#5418)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4fc9c7da54 [IOTDB-2803][new cluster] Adapt show timeseries to mpp (#5418)
4fc9c7da54 is described below
commit 4fc9c7da546f7f698f251747badab72fad6b4e25
Author: xinzhongtianxia <45...@qq.com>
AuthorDate: Wed Apr 13 10:42:01 2022 +0800
[IOTDB-2803][new cluster] Adapt show timeseries to mpp (#5418)
We will fix some issues later as our talk
---
.../apache/iotdb/cluster/query/ClusterPlanner.java | 1 +
.../iotdb/commons/partition/RegionReplicaSet.java | 1 +
.../statemachine/SchemaRegionStateMachine.java | 4 +-
.../apache/iotdb/db/mpp/execution/DataDriver.java | 3 +-
.../db/mpp/execution/FragmentInstanceManager.java | 4 +-
.../db/mpp/execution/SchemaDriverContext.java | 8 +-
.../operator/schema/DevicesSchemaScanOperator.java | 97 +++++++++
.../mpp/operator/schema/SchemaMergeOperator.java | 79 +++++++
.../db/mpp/operator/schema/SchemaScanOperator.java | 107 ++++++++++
.../schema/TimeSeriesSchemaScanOperator.java | 142 ++++++++++++
.../db/mpp/operator/source/SeriesScanOperator.java | 2 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 27 +++
.../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java | 5 +
.../db/mpp/sql/planner/DistributionPlanner.java | 124 +++++++++--
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 63 +++++-
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 47 ++++
.../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 42 ++++
.../db/mpp/sql/planner/plan/PlanFragment.java | 10 +-
.../plan/SimpleFragmentParallelPlanner.java | 8 +-
.../db/mpp/sql/planner/plan/node/PlanNode.java | 1 +
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 15 +-
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 20 ++
.../node/metedata/read/DevicesSchemaScanNode.java | 104 +++++++++
.../plan/node/metedata/read/SchemaMergeNode.java | 76 +++++++
.../plan/node/metedata/read/SchemaScanNode.java | 119 +++++++++++
.../metedata/read/TimeSeriesSchemaScanNode.java | 141 ++++++++++++
.../planner/plan/node/process/ExchangeNode.java | 1 +
.../planner/plan/node/sink/FragmentSinkNode.java | 1 +
.../plan/node/source/SeriesAggregateScanNode.java | 4 +-
.../planner/plan/node/source/SeriesScanNode.java | 8 +-
.../sql/planner/plan/node/source/SourceNode.java | 4 +-
.../db/mpp/sql/statement/StatementVisitor.java | 10 +
.../db/mpp/sql/statement/crud/InsertStatement.java | 1 +
.../db/mpp/sql/statement/crud/QueryStatement.java | 1 +
.../metadata/AlterTimeSeriesStatement.java | 1 +
.../metadata/CreateAlignedTimeSeriesStatement.java | 1 +
.../metadata/CreateTimeSeriesStatement.java | 1 +
.../statement/metadata/ShowDevicesStatement.java | 6 +
.../mpp/sql/statement/metadata/ShowStatement.java | 10 +
.../metadata/ShowTimeSeriesStatement.java | 6 +
.../db/mpp/operator/SchemaScanOperatorTest.java | 237 +++++++++++++++++++++
.../db/mpp/sql/plan/DistributionPlannerTest.java | 81 +++++++
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 6 +-
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 85 ++++++++
.../sql/plan/node/process/OffsetNodeSerdeTest.java | 6 +-
.../source/SeriesAggregateScanNodeSerdeTest.java | 2 +-
46 files changed, 1669 insertions(+), 53 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
index d4a91cc588..5ee0126d3c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
public class ClusterPlanner extends Planner {
+ @Override
protected PhysicalPlan generatePhysicalPlanFromOperator(Operator operator)
throws QueryProcessException {
// from logical operator to physical plan
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index dac39d2d7c..2217f8832c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -54,6 +54,7 @@ public class RegionReplicaSet {
this.consensusGroupId = consensusGroupId;
}
+ @Override
public String toString() {
return String.format(
"RegionReplicaSet[%s-%d]: %s",
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index b67ffe8e92..e5649f2733 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.metadata.Executor.SchemaVisitor;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -56,6 +57,7 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
@Override
protected DataSet read(FragmentInstance fragmentInstance) {
logger.info("Execute read plan in SchemaRegionStateMachine");
- return null;
+ return FragmentInstanceManager.getInstance()
+ .execSchemaQueryFragmentInstance(fragmentInstance, schemaRegion);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 2f5a663d4f..55a6d566f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -284,7 +285,7 @@ public class DataDriver implements Driver {
}
}
- private ListenableFuture<Void> processInternal() throws IOException {
+ private ListenableFuture<Void> processInternal() throws IOException, IoTDBException {
ListenableFuture<Void> blocked = root.isBlocked();
if (!blocked.isDone()) {
return blocked;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index ca17860d4b..1af0abfece 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
@@ -111,7 +111,7 @@ public class FragmentInstanceManager {
}
public FragmentInstanceInfo execSchemaQueryFragmentInstance(
- FragmentInstance instance, SchemaRegion schemaRegion) {
+ FragmentInstance instance, ISchemaRegion schemaRegion) {
FragmentInstanceId instanceId = instance.getId();
FragmentInstanceExecution execution =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
index 162bd0ad59..a859df2b77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
@@ -18,19 +18,19 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
public class SchemaDriverContext extends DriverContext {
- private final SchemaRegion schemaRegion;
+ private final ISchemaRegion schemaRegion;
public SchemaDriverContext(
- FragmentInstanceContext fragmentInstanceContext, SchemaRegion schemaRegion) {
+ FragmentInstanceContext fragmentInstanceContext, ISchemaRegion schemaRegion) {
super(fragmentInstanceContext);
this.schemaRegion = schemaRegion;
}
- public SchemaRegion getSchemaRegion() {
+ public ISchemaRegion getSchemaRegion() {
return schemaRegion;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java
new file mode 100644
index 0000000000..2284239cef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class DevicesSchemaScanOperator extends SchemaScanOperator {
+ private final boolean hasSgCol;
+
+ private static final TSDataType[] RESOURCE_TYPES_WITH_SG = {
+ TSDataType.TEXT, TSDataType.TEXT, TSDataType.BOOLEAN,
+ };
+
+ private static final TSDataType[] RESOURCE_TYPES = {
+ TSDataType.TEXT, TSDataType.BOOLEAN,
+ };
+
+ public DevicesSchemaScanOperator(
+ OperatorContext operatorContext,
+ int limit,
+ int offset,
+ PartialPath partialPath,
+ boolean isPrefixPath,
+ boolean hasSgCol,
+ List<String> columns) {
+ super(operatorContext, limit, offset, partialPath, isPrefixPath, columns);
+ this.hasSgCol = hasSgCol;
+ }
+
+ @Override
+ protected TsBlock createTsBlock() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ hasSgCol ? Arrays.asList(RESOURCE_TYPES_WITH_SG) : Arrays.asList(RESOURCE_TYPES));
+ try {
+ ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getMatchedDevices(convertToPhysicalPlan())
+ .left
+ .forEach(device -> setColumns(device, builder));
+ } catch (MetadataException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ return builder.build();
+ }
+
+ // ToDo @xinzhongtianxia remove this temporary converter after mpp online
+ private ShowDevicesPlan convertToPhysicalPlan() {
+ return new ShowDevicesPlan(partialPath, limit, offset, hasSgCol);
+ }
+
+ private void setColumns(ShowDevicesResult device, TsBlockBuilder builder) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getColumnBuilder(0).writeBinary(new Binary(device.getName()));
+ if (hasSgCol) {
+ builder.getColumnBuilder(1).writeBinary(new Binary(device.getSgName()));
+ builder.getColumnBuilder(2).writeBoolean(device.isAligned());
+ } else {
+ builder.getColumnBuilder(1).writeBoolean(device.isAligned());
+ }
+ builder.declarePosition();
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java
new file mode 100644
index 0000000000..aebc7c16f6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.util.List;
+
+public class SchemaMergeOperator implements ProcessOperator {
+
+ protected OperatorContext operatorContext;
+ protected int limit;
+ protected int offset;
+ private final boolean[] noMoreTsBlocks;
+ private boolean isFinished;
+
+ private List<Operator> children;
+
+ public SchemaMergeOperator(OperatorContext operatorContext, List<Operator> children) {
+ this.operatorContext = operatorContext;
+ this.children = children;
+ noMoreTsBlocks = new boolean[children.size()];
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ // ToDo consider SHOW LATEST
+
+ for (int i = 0; i < children.size(); i++) {
+ if (!noMoreTsBlocks[i]) {
+ TsBlock tsBlock = children.get(i).next();
+ if (!children.get(i).hasNext()) {
+ noMoreTsBlocks[i] = true;
+ }
+ return tsBlock;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ for (int i = 0; i < children.size(); i++) {
+ if (!noMoreTsBlocks[i] && children.get(i).hasNext()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNext();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperator.java
new file mode 100644
index 0000000000..6fa48dc67c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.util.List;
+
+public abstract class SchemaScanOperator implements SourceOperator {
+
+ protected OperatorContext operatorContext;
+ protected TsBlock tsBlock;
+ private boolean hasCachedTsBlock;
+
+ protected int limit;
+ protected int offset;
+ protected PartialPath partialPath;
+ protected boolean isPrefixPath;
+ protected List<String> columns;
+
+ protected SchemaScanOperator(
+ OperatorContext operatorContext,
+ int limit,
+ int offset,
+ PartialPath partialPath,
+ boolean isPrefixPath,
+ List<String> columns) {
+ this.operatorContext = operatorContext;
+ this.limit = limit;
+ this.offset = offset;
+ this.partialPath = partialPath;
+ this.isPrefixPath = isPrefixPath;
+ this.columns = columns;
+ }
+
+ protected abstract TsBlock createTsBlock();
+
+ public PartialPath getPartialPath() {
+ return partialPath;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+ public boolean isPrefixPath() {
+ return isPrefixPath;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ hasCachedTsBlock = false;
+ return tsBlock;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (tsBlock == null) {
+ tsBlock = createTsBlock();
+ if (tsBlock.getPositionCount() > 0) {
+ hasCachedTsBlock = true;
+ }
+ }
+ return hasCachedTsBlock;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNext();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java
new file mode 100644
index 0000000000..0a6fbce960
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.schema;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TimeSeriesSchemaScanOperator extends SchemaScanOperator {
+ private String key;
+ private String value;
+ private boolean isContains;
+
+ // if is true, the result will be sorted according to the inserting frequency of the timeseries
+ private boolean orderByHeat;
+
+ private static final TSDataType[] resourceTypes = {
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT
+ };
+
+ public TimeSeriesSchemaScanOperator(
+ OperatorContext operatorContext,
+ int limit,
+ int offset,
+ PartialPath partialPath,
+ String key,
+ String value,
+ boolean isContains,
+ boolean orderByHeat,
+ boolean isPrefixPath,
+ List<String> columns) {
+ super(operatorContext, limit, offset, partialPath, isPrefixPath, columns);
+ this.isContains = isContains;
+ this.key = key;
+ this.value = value;
+ this.orderByHeat = orderByHeat;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public boolean isContains() {
+ return isContains;
+ }
+
+ public boolean isOrderByHeat() {
+ return orderByHeat;
+ }
+
+ @Override
+ protected TsBlock createTsBlock() {
+ TsBlockBuilder builder = new TsBlockBuilder(Arrays.asList(resourceTypes));
+ try {
+ ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .showTimeseries(convertToPhysicalPlan(), operatorContext.getInstanceContext())
+ .left
+ .forEach(series -> setColumns(series, builder));
+ } catch (MetadataException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ return builder.build();
+ }
+
+ // ToDo @xinzhongtianxia remove this temporary converter after mpp online
+ private ShowTimeSeriesPlan convertToPhysicalPlan() {
+ return new ShowTimeSeriesPlan(partialPath, isContains, key, value, limit, offset, orderByHeat);
+ }
+
+ private void setColumns(ShowTimeSeriesResult series, TsBlockBuilder builder) {
+ builder.getTimeColumnBuilder().writeLong(series.getLastTime());
+ writeValueColumn(builder, 0, series.getName());
+ writeValueColumn(builder, 1, series.getAlias());
+ writeValueColumn(builder, 2, series.getSgName());
+ writeValueColumn(builder, 3, series.getDataType().toString());
+ writeValueColumn(builder, 4, series.getEncoding().toString());
+ writeValueColumn(builder, 5, series.getCompressor().toString());
+ writeValueColumn(builder, 6, mapToString(series.getTag()));
+ writeValueColumn(builder, 7, mapToString(series.getAttribute()));
+ builder.declarePosition();
+ }
+
+ private void writeValueColumn(TsBlockBuilder builder, int columnIndex, String value) {
+ if (value == null) {
+ builder.getColumnBuilder(columnIndex).appendNull();
+ } else {
+ builder.getColumnBuilder(columnIndex).writeBinary(new Binary(value));
+ }
+ }
+
+ private String mapToString(Map<String, String> map) {
+ return map.entrySet().stream()
+ .map(e -> "\"" + e.getKey() + "\"" + ":" + "\"" + e.getValue() + "\"")
+ .collect(Collectors.joining(","));
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 8e58e36b61..9e9946c323 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -114,7 +114,7 @@ public class SeriesScanOperator implements DataSourceOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return finished || (finished = hasNext());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 65cb21cfd5..72b7104285 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -46,6 +47,8 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -260,6 +263,30 @@ public class Analyzer {
return analysis;
}
+ @Override
+ public Analysis visitShowTimeSeries(
+ ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.fetchSchemaPartitionInfo(
+ showTimeSeriesStatement.getPathPattern().getDevice());
+ Analysis analysis = new Analysis();
+ analysis.setStatement(showTimeSeriesStatement);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitShowDevices(
+ ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.fetchSchemaPartitionInfo(
+ showDevicesStatement.getPathPattern().getFullPath());
+ Analysis analysis = new Analysis();
+ analysis.setStatement(showDevicesStatement);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ return analysis;
+ }
+
@Override
public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
index d026f5a78c..f2e0efd751 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
import org.apache.iotdb.db.qp.constant.SQLConstant;
@@ -136,6 +137,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
if (ctx.DEBUG() != null) {
statement.setDebug(true);
}
+ if (statement instanceof ShowStatement) {
+ ((ShowStatement) statement)
+ .setPrefixPath(IoTDBConstant.ClientVersion.V_0_12.equals(clientVersion));
+ }
return statement;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 96341a2ca1..4ef647dc7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -22,8 +22,19 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.*;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
@@ -31,9 +42,11 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanN
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.stream.Collectors;
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -122,6 +135,36 @@ public class DistributionPlanner {
return null;
}
+ @Override
+ public PlanNode visitMetaMerge(SchemaMergeNode node, DistributionPlanContext context) {
+ SchemaMergeNode root = (SchemaMergeNode) node.clone();
+ SchemaScanNode seed = (SchemaScanNode) node.getChildren().get(0);
+ TreeSet<RegionReplicaSet> schemaRegions =
+ new TreeSet<>(Comparator.comparingInt(region -> region.getConsensusGroupId().getId()));
+ analysis
+ .getSchemaPartitionInfo()
+ .getSchemaPartitionMap()
+ .forEach(
+ (storageGroup, deviceGroup) -> {
+ deviceGroup.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
+ schemaRegions.add(schemaRegionReplicaSet));
+ });
+ int count = schemaRegions.size();
+ schemaRegions.forEach(
+ region -> {
+ SchemaScanNode metaScanNode = (SchemaScanNode) seed.clone();
+ metaScanNode.setRegionReplicaSet(region);
+ if (count > 1) {
+ metaScanNode.setLimit(metaScanNode.getOffset() + metaScanNode.getLimit());
+ metaScanNode.setOffset(0);
+ }
+ root.addChild(metaScanNode);
+ });
+ return root;
+ }
+
+ @Override
public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
TimeJoinNode root = (TimeJoinNode) node.clone();
@@ -140,7 +183,7 @@ public class DistributionPlanner {
for (RegionReplicaSet dataRegion : dataDistribution) {
SeriesScanNode split = (SeriesScanNode) handle.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- split.setDataRegionReplicaSet(dataRegion);
+ split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
} else if (child instanceof SeriesAggregateScanNode) {
@@ -158,7 +201,7 @@ public class DistributionPlanner {
// Step 2: For the source nodes, group them by the DataRegion.
Map<RegionReplicaSet, List<SeriesScanNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegionReplicaSet));
+ sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getRegionReplicaSet));
// Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
@@ -215,20 +258,59 @@ public class DistributionPlanner {
return node.cloneWithChildren(children);
}
+ @Override
+ public PlanNode visitMetaMerge(SchemaMergeNode node, NodeGroupContext context) {
+ node.getChildren()
+ .forEach(
+ child -> {
+ visit(child, context);
+ });
+ NodeDistribution nodeDistribution =
+ new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
+ PlanNode newNode = node.clone();
+ nodeDistribution.region = calculateSchemaRegionByChildren(node.getChildren(), context);
+ context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
+ node.getChildren()
+ .forEach(
+ child -> {
+ if (!nodeDistribution.region.equals(
+ context.getNodeDistribution(child.getPlanNodeId()).region)) {
+ ExchangeNode exchangeNode =
+ new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.addChild(child);
+ newNode.addChild(exchangeNode);
+ } else {
+ newNode.addChild(child);
+ }
+ });
+ return newNode;
+ }
+
+ @Override
+ public PlanNode visitMetaScan(SchemaScanNode node, NodeGroupContext context) {
+ NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
+ nodeDistribution.region = node.getRegionReplicaSet();
+ context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
+ return node;
+ }
+
+ @Override
public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
node.getPlanNodeId(),
- new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
+ new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
return node.clone();
}
+ @Override
public PlanNode visitSeriesAggregate(SeriesAggregateScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
node.getPlanNodeId(),
- new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
+ new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
return node.clone();
}
+ @Override
public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
TimeJoinNode newNode = (TimeJoinNode) node.clone();
List<PlanNode> visitedChildren = new ArrayList<>();
@@ -256,7 +338,7 @@ public class DistributionPlanner {
// parent.
visitedChildren.forEach(
child -> {
- if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).dataRegion)) {
+ if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) {
ExchangeNode exchangeNode =
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
@@ -272,7 +354,13 @@ public class DistributionPlanner {
List<PlanNode> children, NodeGroupContext context) {
// We always make the dataRegion of TimeJoinNode to be the same as its first child.
// TODO: (xingtanzjr) We need to implement more suitable policies here
- return context.getNodeDistribution(children.get(0).getPlanNodeId()).dataRegion;
+ return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
+ }
+
+ private RegionReplicaSet calculateSchemaRegionByChildren(
+ List<PlanNode> children, NodeGroupContext context) {
+ // We always make the schemaRegion of MetaMergeNode to be the same as its first child.
+ return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
}
private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
@@ -280,7 +368,7 @@ public class DistributionPlanner {
NodeDistribution first = context.getNodeDistribution(children.get(0).getPlanNodeId());
for (int i = 1; i < children.size(); i++) {
NodeDistribution next = context.getNodeDistribution(children.get(i).getPlanNodeId());
- if (first.dataRegion == null || !first.dataRegion.equals(next.dataRegion)) {
+ if (first.region == null || !first.region.equals(next.region)) {
return false;
}
}
@@ -294,19 +382,19 @@ public class DistributionPlanner {
private class NodeGroupContext {
private MPPQueryContext queryContext;
- private Map<PlanNodeId, NodeDistribution> nodeDistribution;
+ private Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
public NodeGroupContext(MPPQueryContext queryContext) {
this.queryContext = queryContext;
- this.nodeDistribution = new HashMap<>();
+ this.nodeDistributionMap = new HashMap<>();
}
public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution distribution) {
- this.nodeDistribution.put(nodeId, distribution);
+ this.nodeDistributionMap.put(nodeId, distribution);
}
public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
- return this.nodeDistribution.get(nodeId);
+ return this.nodeDistributionMap.get(nodeId);
}
}
@@ -319,11 +407,15 @@ public class DistributionPlanner {
private class NodeDistribution {
private NodeDistributionType type;
- private RegionReplicaSet dataRegion;
+ private RegionReplicaSet region;
+
+ private NodeDistribution(NodeDistributionType type, RegionReplicaSet region) {
+ this.type = type;
+ this.region = region;
+ }
- private NodeDistribution(NodeDistributionType type, RegionReplicaSet dataRegion) {
+ private NodeDistribution(NodeDistributionType type) {
this.type = type;
- this.dataRegion = dataRegion;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 89112f2deb..74d796f8fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
@@ -36,11 +36,17 @@ import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.operator.schema.DevicesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.operator.schema.SchemaMergeOperator;
+import org.apache.iotdb.db.mpp.operator.schema.TimeSeriesSchemaScanOperator;
import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
@@ -102,7 +108,7 @@ public class LocalExecutionPlanner {
}
public SchemaDriver plan(
- PlanNode plan, FragmentInstanceContext instanceContext, SchemaRegion schemaRegion) {
+ PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion) {
SchemaDriverContext schemaDriverContext =
new SchemaDriverContext(instanceContext, schemaRegion);
@@ -150,6 +156,59 @@ public class LocalExecutionPlanner {
return seriesScanOperator;
}
+ @Override
+ public Operator visitTimeSeriesMetaScan(
+ TimeSeriesSchemaScanNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TimeSeriesSchemaScanOperator.class.getSimpleName());
+ return new TimeSeriesSchemaScanOperator(
+ operatorContext,
+ node.getLimit(),
+ node.getOffset(),
+ node.getPath(),
+ node.getKey(),
+ node.getValue(),
+ node.isContains(),
+ node.isOrderByHeat(),
+ node.isPrefixPath(),
+ node.getOutputColumnNames());
+ }
+
+ @Override
+ public Operator visitDevicesMetaScan(
+ DevicesSchemaScanNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ DevicesSchemaScanOperator.class.getSimpleName());
+ return new DevicesSchemaScanOperator(
+ operatorContext,
+ node.getLimit(),
+ node.getOffset(),
+ node.getPath(),
+ node.isPrefixPath(),
+ node.isHasSgCol(),
+ node.getOutputColumnNames());
+ }
+
+ @Override
+ public Operator visitMetaMerge(SchemaMergeNode node, LocalExecutionPlanContext context) {
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(n -> n.accept(this, context))
+ .collect(Collectors.toList());
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SchemaMergeOperator.class.getSimpleName());
+ return new SchemaMergeOperator(operatorContext, children);
+ }
+
@Override
public Operator visitSeriesAggregate(
SeriesAggregateScanNode node, LocalExecutionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 56fef149c9..8f6dba3cfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -38,12 +38,22 @@ import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.GroupByFillQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.GroupByQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.LastQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.UDAFQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.UDTFQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
@@ -261,6 +271,43 @@ public class LogicalPlanner {
insertRowStatement.getValues());
}
+ @Override
+ public PlanNode visitShowTimeSeries(
+ ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ planBuilder.planTimeSeriesMetaSource(
+ showTimeSeriesStatement.getPathPattern(),
+ showTimeSeriesStatement.getKey(),
+ showTimeSeriesStatement.getValue(),
+ showTimeSeriesStatement.getLimit(),
+ showTimeSeriesStatement.getOffset(),
+ showTimeSeriesStatement.isOrderByHeat(),
+ showTimeSeriesStatement.isContains(),
+ showTimeSeriesStatement.isPrefixPath());
+ planBuilder.planMetaMerge(showTimeSeriesStatement.isOrderByHeat());
+ if (showTimeSeriesStatement.getLimit() > 0) {
+ planBuilder.planOffset(showTimeSeriesStatement.getOffset());
+ planBuilder.planLimit(showTimeSeriesStatement.getLimit());
+ }
+ return planBuilder.getRoot();
+ }
+
+ @Override
+ public PlanNode visitShowDevices(
+ ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ planBuilder.planDeviceMetaSource(
+ showDevicesStatement.getPathPattern(),
+ showDevicesStatement.getLimit(),
+ showDevicesStatement.getOffset(),
+ showDevicesStatement.isPrefixPath(),
+ showDevicesStatement.hasSgCol());
+ planBuilder.planMetaMerge(false);
+ planBuilder.planOffset(showDevicesStatement.getOffset());
+ planBuilder.planLimit(showDevicesStatement.getLimit());
+ return planBuilder.getRoot();
+ }
+
@Override
public PlanNode visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
return getNewAuthorNode(authorStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
index 788998f87b..2675206afb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -274,4 +277,43 @@ public class QueryPlanBuilder {
this.root = new OffsetNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowOffset);
}
+
+ /** Meta Query* */
+ public void planTimeSeriesMetaSource(
+ PartialPath pathPattern,
+ String key,
+ String value,
+ int limit,
+ int offset,
+ boolean orderByHeat,
+ boolean contains,
+ boolean prefixPath) {
+ TimeSeriesSchemaScanNode timeSeriesMetaScanNode =
+ new TimeSeriesSchemaScanNode(
+ context.getQueryId().genPlanNodeId(),
+ pathPattern,
+ key,
+ value,
+ limit,
+ offset,
+ orderByHeat,
+ contains,
+ prefixPath);
+ this.root = timeSeriesMetaScanNode;
+ }
+
+ public void planDeviceMetaSource(
+ PartialPath pathPattern, int limit, int offset, boolean prefixPath, boolean hasSgCol) {
+ DevicesSchemaScanNode devicesMetaScanNode =
+ new DevicesSchemaScanNode(
+ context.getQueryId().genPlanNodeId(), pathPattern, limit, offset, prefixPath, hasSgCol);
+ this.root = devicesMetaScanNode;
+ }
+
+ public void planMetaMerge(boolean orderByHeat) {
+ SchemaMergeNode metaMergeNode =
+ new SchemaMergeNode(context.getQueryId().genPlanNodeId(), orderByHeat);
+ metaMergeNode.addChild(this.getRoot());
+ this.root = metaMergeNode;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 550ad1e0f8..0f00703c86 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -62,16 +62,16 @@ public class PlanFragment {
// In current version, one PlanFragment should contain at least one SourceNode,
// and the DataRegions of all SourceNodes should be same in one PlanFragment.
// So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion.
- public RegionReplicaSet getTargetDataRegion() {
- return getNodeDataRegion(root);
+ public RegionReplicaSet getTargetRegion() {
+ return getNodeRegion(root);
}
- private RegionReplicaSet getNodeDataRegion(PlanNode root) {
+ private RegionReplicaSet getNodeRegion(PlanNode root) {
if (root instanceof SourceNode) {
- return ((SourceNode) root).getDataRegionReplicaSet();
+ return ((SourceNode) root).getRegionReplicaSet();
}
for (PlanNode child : root.getChildren()) {
- RegionReplicaSet result = getNodeDataRegion(child);
+ RegionReplicaSet result = getNodeRegion(child);
if (result != null) {
return result;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 2818661995..d3357ec275 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -92,19 +92,19 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
timeFilter,
queryContext.getQueryType());
- // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
+ // Get the target region for origin PlanFragment, then its instance will be distributed one
// of them.
- RegionReplicaSet dataRegion = fragment.getTargetDataRegion();
+ RegionReplicaSet regionReplicaSet = fragment.getTargetRegion();
// Set DataRegion and target host for the instance
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
- fragmentInstance.setRegionReplicaSet(dataRegion);
+ fragmentInstance.setRegionReplicaSet(regionReplicaSet);
// TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
// instance
- fragmentInstance.setHostEndpoint(dataRegion.getDataNodeList().get(0).getEndPoint());
+ fragmentInstance.setHostEndpoint(regionReplicaSet.getDataNodeList().get(0).getEndPoint());
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 3a17a08a75..5df9022b84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -55,6 +55,7 @@ public abstract class PlanNode {
public abstract void addChild(PlanNode child);
+ @Override
public abstract PlanNode clone();
public PlanNode cloneWithChildren(List<PlanNode> children) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index d9de5f02c9..85d9f78d2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
@@ -65,12 +67,15 @@ public enum PlanNodeType {
INSERT_ROWS((short) 15),
INSERT_ROWS_OF_ONE_DEVICE((short) 16),
INSERT_MULTI_TABLET((short) 17),
- SHOW_DEVICES((short) 18),
+ DEVICES_SCHEMA_SCAN((short) 18),
CREATE_TIME_SERIES((short) 19),
EXCHANGE((short) 20),
AUTHOR((short) 21),
ALTER_TIME_SERIES((short) 22),
- CREATE_ALIGNED_TIME_SERIES((short) 23);
+ CREATE_ALIGNED_TIME_SERIES((short) 23),
+ TIME_SERIES_SCHEMA_SCAN((short) 24),
+ // TODO @xinzhongtianxia remove this
+ SHOW_DEVICES((short) 25);
private final short nodeType;
@@ -122,7 +127,7 @@ public enum PlanNodeType {
case 17:
return InsertMultiTabletsNode.deserialize(buffer);
case 18:
- return ShowDevicesNode.deserialize(buffer);
+ return DevicesSchemaScanNode.deserialize(buffer);
case 19:
return CreateTimeSeriesNode.deserialize(buffer);
case 20:
@@ -133,6 +138,10 @@ public enum PlanNodeType {
return AlterTimeSeriesNode.deserialize(buffer);
case 23:
return CreateAlignedTimeSeriesNode.deserialize(buffer);
+ case 24:
+ return TimeSeriesSchemaScanNode.deserialize(buffer);
+ case 25:
+ return ShowDevicesNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 76eecb8215..953db9f514 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
@@ -94,6 +98,22 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitMetaMerge(SchemaMergeNode node, C context) {
+ return visitPlan(node, context);
+ };
+
+ public R visitMetaScan(SchemaScanNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitTimeSeriesMetaScan(TimeSeriesSchemaScanNode node, C context) {
+ return visitMetaScan(node, context);
+ }
+
+ public R visitDevicesMetaScan(DevicesSchemaScanNode node, C context) {
+ return visitMetaScan(node, context);
+ }
+
public R visitFragmentSink(FragmentSinkNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
new file mode 100644
index 0000000000..da381349ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_IS_ALIGNED;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+
+public class DevicesSchemaScanNode extends SchemaScanNode {
+
+ private final boolean hasSgCol;
+
+ public DevicesSchemaScanNode(
+ PlanNodeId id,
+ PartialPath path,
+ int limit,
+ int offset,
+ boolean isPrefixPath,
+ boolean hasSgCol) {
+ super(id, path, limit, offset, isPrefixPath);
+ this.hasSgCol = hasSgCol;
+ }
+
+ public boolean isHasSgCol() {
+ return hasSgCol;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
+ @Override
+ public PlanNode clone() {
+ return new DevicesSchemaScanNode(getPlanNodeId(), path, limit, offset, isPrefixPath, hasSgCol);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ if (hasSgCol) {
+ return Arrays.asList(COLUMN_DEVICES, COLUMN_STORAGE_GROUP, COLUMN_IS_ALIGNED);
+ }
+ return Arrays.asList(COLUMN_DEVICES, COLUMN_IS_ALIGNED);
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ PlanNodeType.DEVICES_SCHEMA_SCAN.serialize(byteBuffer);
+ ReadWriteIOUtils.write(getPlanNodeId().getId(), byteBuffer);
+ ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+ ReadWriteIOUtils.write(limit, byteBuffer);
+ ReadWriteIOUtils.write(offset, byteBuffer);
+ ReadWriteIOUtils.write(isPrefixPath, byteBuffer);
+ ReadWriteIOUtils.write(hasSgCol, byteBuffer);
+ }
+
+ public static DevicesSchemaScanNode deserialize(ByteBuffer byteBuffer)
+ throws IllegalPathException {
+ String id = ReadWriteIOUtils.readString(byteBuffer);
+ PlanNodeId planNodeId = new PlanNodeId(id);
+ String fullPath = ReadWriteIOUtils.readString(byteBuffer);
+ PartialPath path = new PartialPath(fullPath);
+ int limit = ReadWriteIOUtils.readInt(byteBuffer);
+ int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
+ boolean hasSgCol = ReadWriteIOUtils.readBool(byteBuffer);
+ return new DevicesSchemaScanNode(planNodeId, path, limit, offset, isPrefixPath, hasSgCol);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
new file mode 100644
index 0000000000..c315da6a7b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SchemaMergeNode extends ProcessNode {
+
+ private boolean orderByHeat;
+
+ private List<PlanNode> children;
+
+ public SchemaMergeNode(PlanNodeId id) {
+ super(id);
+ children = new ArrayList<>();
+ }
+
+ public SchemaMergeNode(PlanNodeId id, boolean orderByHeat) {
+ this(id);
+ this.orderByHeat = orderByHeat;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ this.children.add(child);
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new SchemaMergeNode(getPlanNodeId(), this.orderByHeat);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return CHILD_COUNT_NO_LIMIT;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {}
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {}
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitMetaMerge(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java
new file mode 100644
index 0000000000..04c2bdea11
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public abstract class SchemaScanNode extends SourceNode {
+ protected int limit;
+ protected int offset;
+ protected PartialPath path;
+ private boolean hasLimit;
+ protected boolean isPrefixPath;
+
+ private RegionReplicaSet schemaRegionReplicaSet;
+
+ protected SchemaScanNode(
+ PlanNodeId id, PartialPath partialPath, int limit, int offset, boolean isPrefixPath) {
+ super(id);
+ this.path = partialPath;
+ setLimit(limit);
+ this.offset = offset;
+ this.isPrefixPath = isPrefixPath;
+ }
+
+ @Override
+ public void open() throws Exception {}
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ public boolean isPrefixPath() {
+ return isPrefixPath;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ if (limit == 0) {
+ hasLimit = false;
+ } else {
+ hasLimit = true;
+ }
+ }
+
+ @Override
+ public RegionReplicaSet getRegionReplicaSet() {
+ return schemaRegionReplicaSet;
+ }
+
+ @Override
+ public void setRegionReplicaSet(RegionReplicaSet schemaRegionReplicaSet) {
+ this.schemaRegionReplicaSet = schemaRegionReplicaSet;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+ public PartialPath getPath() {
+ return path;
+ }
+
+ public void setPath(PartialPath path) {
+ this.path = path;
+ }
+
+ public boolean isHasLimit() {
+ return hasLimit;
+ }
+
+ public void setHasLimit(boolean hasLimit) {
+ this.hasLimit = hasLimit;
+ }
+
+ public abstract List<String> getOutputColumnNames();
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {}
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitMetaScan(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
new file mode 100644
index 0000000000..c7feee1a82
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TAGS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
+
+public class TimeSeriesSchemaScanNode extends SchemaScanNode {
+
+ private final String key;
+ private final String value;
+ private final boolean isContains;
+
+ // if is true, the result will be sorted according to the inserting frequency of the timeseries
+ private final boolean orderByHeat;
+
+ public TimeSeriesSchemaScanNode(
+ PlanNodeId id,
+ PartialPath partialPath,
+ String key,
+ String value,
+ int limit,
+ int offset,
+ boolean orderByHeat,
+ boolean isContains,
+ boolean isPrefixPath) {
+ super(id, partialPath, limit, offset, isPrefixPath);
+ this.key = key;
+ this.value = value;
+ this.orderByHeat = orderByHeat;
+ this.isContains = isContains;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ PlanNodeType.TIME_SERIES_SCHEMA_SCAN.serialize(byteBuffer);
+ ReadWriteIOUtils.write(getPlanNodeId().getId(), byteBuffer);
+ ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+ ReadWriteIOUtils.write(key, byteBuffer);
+ ReadWriteIOUtils.write(value, byteBuffer);
+ ReadWriteIOUtils.write(limit, byteBuffer);
+ ReadWriteIOUtils.write(offset, byteBuffer);
+ ReadWriteIOUtils.write(orderByHeat, byteBuffer);
+ ReadWriteIOUtils.write(isContains, byteBuffer);
+ ReadWriteIOUtils.write(isPrefixPath, byteBuffer);
+ }
+
+ public static TimeSeriesSchemaScanNode deserialize(ByteBuffer byteBuffer)
+ throws IllegalPathException {
+ String id = ReadWriteIOUtils.readString(byteBuffer);
+ PlanNodeId planNodeId = new PlanNodeId(id);
+ String fullPath = ReadWriteIOUtils.readString(byteBuffer);
+ PartialPath path = new PartialPath(fullPath);
+ String key = ReadWriteIOUtils.readString(byteBuffer);
+ String value = ReadWriteIOUtils.readString(byteBuffer);
+ int limit = ReadWriteIOUtils.readInt(byteBuffer);
+ int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ boolean oderByHeat = ReadWriteIOUtils.readBool(byteBuffer);
+ boolean isContains = ReadWriteIOUtils.readBool(byteBuffer);
+ boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
+ return new TimeSeriesSchemaScanNode(
+ planNodeId, path, key, value, limit, offset, oderByHeat, isContains, isPrefixPath);
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public boolean isContains() {
+ return isContains;
+ }
+
+ public boolean isOrderByHeat() {
+ return orderByHeat;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
+ @Override
+ public PlanNode clone() {
+ return new TimeSeriesSchemaScanNode(
+ getPlanNodeId(), path, key, value, limit, offset, orderByHeat, isContains, isPrefixPath);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return Arrays.asList(
+ COLUMN_TIMESERIES,
+ COLUMN_TIMESERIES_ALIAS,
+ COLUMN_STORAGE_GROUP,
+ COLUMN_TIMESERIES_DATATYPE,
+ COLUMN_TIMESERIES_ENCODING,
+ COLUMN_TIMESERIES_COMPRESSION,
+ COLUMN_TAGS,
+ COLUMN_ATTRIBUTES);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 41965c2273..ec0cb655d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -119,6 +119,7 @@ public class ExchangeNode extends PlanNode {
this.child = child;
}
+ @Override
public String toString() {
return String.format(
"ExchangeNode-%s: [SourceAddress:%s]", getPlanNodeId(), getSourceAddress());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 3c8f49637c..0fe26258e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -67,6 +67,7 @@ public class FragmentSinkNode extends SinkNode {
return sinkNode;
}
+ @Override
public void addChild(PlanNode child) {
this.child = child;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 619625f54e..a3002ebeb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -146,12 +146,12 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
public void open() throws Exception {}
@Override
- public RegionReplicaSet getDataRegionReplicaSet() {
+ public RegionReplicaSet getRegionReplicaSet() {
return this.regionReplicaSet;
}
@Override
- public void setDataRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+ public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
this.regionReplicaSet = regionReplicaSet;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index b6ff876dc3..3511a4c9c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -116,11 +116,12 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
public void open() throws Exception {}
@Override
- public RegionReplicaSet getDataRegionReplicaSet() {
+ public RegionReplicaSet getRegionReplicaSet() {
return regionReplicaSet;
}
- public void setDataRegionReplicaSet(RegionReplicaSet dataRegion) {
+ @Override
+ public void setRegionReplicaSet(RegionReplicaSet dataRegion) {
this.regionReplicaSet = dataRegion;
}
@@ -270,10 +271,11 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
return valueFilter;
}
+ @Override
public String toString() {
return String.format(
"SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
- this.getPlanNodeId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
+ this.getPlanNodeId(), this.getSeriesPath(), this.getRegionReplicaSet());
}
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 8c3f2f48d1..5d674371e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -30,7 +30,7 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
public abstract void open() throws Exception;
- public abstract RegionReplicaSet getDataRegionReplicaSet();
+ public abstract RegionReplicaSet getRegionReplicaSet();
- public abstract void setDataRegionReplicaSet(RegionReplicaSet regionReplicaSet);
+ public abstract void setRegionReplicaSet(RegionReplicaSet regionReplicaSet);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 77c796ce0a..afeaa71b6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
/**
@@ -207,6 +209,14 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(authorStatement, context);
}
+ public R visitShowTimeSeries(ShowTimeSeriesStatement showTimeSeriesStatement, C context) {
+ return visitStatement(showTimeSeriesStatement, context);
+ }
+
+ public R visitShowDevices(ShowDevicesStatement showDevicesStatement, C context) {
+ return visitStatement(showDevicesStatement, context);
+ }
+
public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
return visitStatement(insertRowStatement, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertStatement.java
index 1bcea3dec4..a85ebb8a32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertStatement.java
@@ -83,6 +83,7 @@ public class InsertStatement extends Statement {
isAligned = aligned;
}
+ @Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsert(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
index 3aa83a7c0d..0dcc1a397e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
@@ -284,6 +284,7 @@ public class QueryStatement extends Statement {
}
}
+ @Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitQuery(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/AlterTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/AlterTimeSeriesStatement.java
index ce56ed0461..5a4386fe16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/AlterTimeSeriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/AlterTimeSeriesStatement.java
@@ -104,6 +104,7 @@ public class AlterTimeSeriesStatement extends Statement {
this.attributesMap = attributesMap;
}
+ @Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitAlterTimeseries(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateAlignedTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateAlignedTimeSeriesStatement.java
index 38773d1e90..2ec0f1f2b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateAlignedTimeSeriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateAlignedTimeSeriesStatement.java
@@ -166,6 +166,7 @@ public class CreateAlignedTimeSeriesStatement extends Statement {
this.tagOffsets.add(tagsOffset);
}
+ @Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitCreateAlignedTimeseries(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateTimeSeriesStatement.java
index d66d5164c4..c875262e17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateTimeSeriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/CreateTimeSeriesStatement.java
@@ -118,6 +118,7 @@ public class CreateTimeSeriesStatement extends Statement {
this.tags = tags;
}
+ @Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitCreateTimeseries(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowDevicesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowDevicesStatement.java
index eafda50186..74a861e36e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowDevicesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowDevicesStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.sql.statement.metadata;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
/**
* SHOW DEVICES statement.
@@ -49,4 +50,9 @@ public class ShowDevicesStatement extends ShowStatement {
public boolean hasSgCol() {
return hasSgCol;
}
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitShowDevices(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStatement.java
index d60a9648f7..6baad7ad00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStatement.java
@@ -27,6 +27,8 @@ public class ShowStatement extends Statement {
int limit = 0;
int offset = 0;
+ protected boolean isPrefixPath;
+
public ShowStatement() {
super();
statementType = StatementType.SHOW;
@@ -47,4 +49,12 @@ public class ShowStatement extends Statement {
public void setOffset(int offset) {
this.offset = offset;
}
+
+ public boolean isPrefixPath() {
+ return isPrefixPath;
+ }
+
+ public void setPrefixPath(boolean prefixPath) {
+ this.isPrefixPath = prefixPath;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowTimeSeriesStatement.java
index 5e5c4fc964..7f519b7e7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowTimeSeriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowTimeSeriesStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.sql.statement.metadata;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
/**
* SHOW TIMESERIES statement.
@@ -78,4 +79,9 @@ public class ShowTimeSeriesStatement extends ShowStatement {
public boolean isOrderByHeat() {
return orderByHeat;
}
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitShowTimeSeries(this, context);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SchemaScanOperatorTest.java
new file mode 100644
index 0000000000..ab960f0d22
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SchemaScanOperatorTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.LocalConfigManager;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
+import org.apache.iotdb.db.mpp.operator.schema.DevicesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.operator.schema.SchemaScanOperator;
+import org.apache.iotdb.db.mpp.operator.schema.TimeSeriesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.lang.StringUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_IS_ALIGNED;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TAGS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SchemaScanOperatorTest {
+ private static final String META_SCAN_OPERATOR_TEST_SG = "root.MetaScanOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ @Before
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources, META_SCAN_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void testDeviceMetaScanOperator() {
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ AtomicReference<FragmentInstanceState> state =
+ new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceContext fragmentInstanceContext =
+ new FragmentInstanceContext(
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ OperatorContext operatorContext =
+ fragmentInstanceContext.addOperatorContext(
+ 1, new PlanNodeId("1"), SchemaScanOperator.class.getSimpleName());
+ PartialPath partialPath = new PartialPath(META_SCAN_OPERATOR_TEST_SG + ".device0");
+ ISchemaRegion schemaRegion =
+ LocalConfigManager.getInstance().getBelongedSchemaRegion(partialPath);
+ operatorContext
+ .getInstanceContext()
+ .setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ List<String> columns = Arrays.asList(COLUMN_DEVICES, COLUMN_STORAGE_GROUP, COLUMN_IS_ALIGNED);
+ DevicesSchemaScanOperator deviceMetaScanOperator =
+ new DevicesSchemaScanOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ 10,
+ 0,
+ partialPath,
+ false,
+ true,
+ columns);
+ while (deviceMetaScanOperator.hasNext()) {
+ TsBlock tsBlock = deviceMetaScanOperator.next();
+ assertEquals(3, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof BinaryColumn);
+ assertEquals(1, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ Assert.assertEquals(0, tsBlock.getTimeByIndex(i));
+ for (int j = 0; j < columns.size(); j++) {
+ switch (j) {
+ case 0:
+ assertEquals(
+ tsBlock.getColumn(j).getBinary(i).toString(),
+ META_SCAN_OPERATOR_TEST_SG + ".device0");
+ break;
+ case 1:
+ assertEquals(
+ tsBlock.getColumn(j).getBinary(i).toString(), META_SCAN_OPERATOR_TEST_SG);
+ break;
+ case 2:
+ assertFalse(tsBlock.getColumn(j).getBoolean(i));
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testTimeSeriesMetaScanOperator() {
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ AtomicReference<FragmentInstanceState> state =
+ new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceContext fragmentInstanceContext =
+ new FragmentInstanceContext(
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ OperatorContext operatorContext =
+ fragmentInstanceContext.addOperatorContext(
+ 1, new PlanNodeId("1"), SchemaScanOperator.class.getSimpleName());
+ PartialPath partialPath = new PartialPath(META_SCAN_OPERATOR_TEST_SG + ".device0.*");
+ ISchemaRegion schemaRegion =
+ LocalConfigManager.getInstance().getBelongedSchemaRegion(partialPath);
+ operatorContext
+ .getInstanceContext()
+ .setDriverContext(new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ List<String> columns =
+ Arrays.asList(
+ COLUMN_TIMESERIES,
+ COLUMN_TIMESERIES_ALIAS,
+ COLUMN_STORAGE_GROUP,
+ COLUMN_TIMESERIES_DATATYPE,
+ COLUMN_TIMESERIES_ENCODING,
+ COLUMN_TIMESERIES_COMPRESSION,
+ COLUMN_TAGS,
+ COLUMN_ATTRIBUTES);
+ TimeSeriesSchemaScanOperator timeSeriesMetaScanOperator =
+ new TimeSeriesSchemaScanOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ 10,
+ 0,
+ partialPath,
+ null,
+ null,
+ false,
+ false,
+ false,
+ columns);
+ while (timeSeriesMetaScanOperator.hasNext()) {
+ TsBlock tsBlock = timeSeriesMetaScanOperator.next();
+ assertEquals(8, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof BinaryColumn);
+ assertEquals(10, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ Assert.assertEquals(0, tsBlock.getTimeByIndex(i));
+ for (int j = 0; j < columns.size(); j++) {
+ Binary binary =
+ tsBlock.getColumn(j).isNull(i) ? null : tsBlock.getColumn(j).getBinary(i);
+ String value = binary == null ? "null" : binary.toString();
+ switch (j) {
+ case 0:
+ Assert.assertTrue(value.startsWith(META_SCAN_OPERATOR_TEST_SG + ".device0"));
+ break;
+ case 1:
+ assertEquals("null", value);
+ break;
+ case 2:
+ assertEquals(META_SCAN_OPERATOR_TEST_SG, value);
+ break;
+ case 3:
+ assertEquals(TSDataType.INT32.toString(), value);
+ break;
+ case 4:
+ assertEquals(TSEncoding.PLAIN.toString(), value);
+ break;
+ case 5:
+ assertEquals(CompressionType.UNCOMPRESSED.toString(), value);
+ break;
+ case 6:
+ case 7:
+ assertTrue(StringUtils.isBlank(value));
+ default:
+ break;
+ }
+ }
+ }
+ }
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 03db5bb4ff..51a2313f08 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -22,12 +22,15 @@ package org.apache.iotdb.db.mpp.sql.plan;
import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -37,6 +40,9 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -91,6 +97,53 @@ public class DistributionPlannerTest {
assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
}
+ @Test
+ public void testRewriteMetaSourceNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ SchemaMergeNode metaMergeNode = new SchemaMergeNode(queryId.genPlanNodeId(), false);
+ metaMergeNode.addChild(
+ new TimeSeriesSchemaScanNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1.s1"),
+ null,
+ null,
+ 10,
+ 0,
+ false,
+ false,
+ false));
+ metaMergeNode.addChild(
+ new TimeSeriesSchemaScanNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1.s2"),
+ null,
+ null,
+ 10,
+ 0,
+ false,
+ false,
+ false));
+ metaMergeNode.addChild(
+ new TimeSeriesSchemaScanNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d22.s1"),
+ null,
+ null,
+ 10,
+ 0,
+ false,
+ false,
+ false));
+ LimitNode root2 = new LimitNode(queryId.genPlanNodeId(), metaMergeNode, 10);
+ Analysis analysis = constructAnalysis();
+ DistributionPlanner planner2 =
+ new DistributionPlanner(
+ analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root2));
+ PlanNode newRoot2 = planner2.rewriteSource();
+ System.out.println(PlanNodeUtil.nodeToString(newRoot2));
+ assertEquals(newRoot2.getChildren().get(0).getChildren().size(), 2);
+ }
+
@Test
public void TestAddExchangeNode() throws IllegalPathException {
QueryId queryId = new QueryId("test_query");
@@ -265,6 +318,34 @@ public class DistributionPlannerTest {
dataPartition.setDataPartitionMap(dataPartitionMap);
analysis.setDataPartitionInfo(dataPartition);
+
+ // construct schema partition
+ SchemaPartition schemaPartition = new SchemaPartition();
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap = new HashMap<>();
+
+ RegionReplicaSet schemaRegion1 =
+ new RegionReplicaSet(
+ new SchemaRegionId(11),
+ Arrays.asList(
+ new DataNodeLocation(11, new Endpoint("192.0.1.1", 9000)),
+ new DataNodeLocation(12, new Endpoint("192.0.1.2", 9000))));
+ Map<SeriesPartitionSlot, RegionReplicaSet> schemaRegionMap = new HashMap<>();
+
+ RegionReplicaSet schemaRegion2 =
+ new RegionReplicaSet(
+ new SchemaRegionId(21),
+ Arrays.asList(
+ new DataNodeLocation(21, new Endpoint("192.0.1.1", 9000)),
+ new DataNodeLocation(22, new Endpoint("192.0.1.2", 9000))));
+
+ schemaRegionMap.put(new SeriesPartitionSlot(device1.length()), schemaRegion1);
+ schemaRegionMap.put(new SeriesPartitionSlot(device2.length()), schemaRegion2);
+ schemaRegionMap.put(new SeriesPartitionSlot(device3.length()), schemaRegion2);
+ schemaPartitionMap.put("root.sg", schemaRegionMap);
+ schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+ analysis.setDataPartitionInfo(dataPartition);
+ analysis.setSchemaPartitionInfo(schemaPartition);
return analysis;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index ce29fee307..ad1f71af7f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -119,17 +119,17 @@ public class FragmentInstanceSerdeTest {
timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
SeriesScanNode seriesScanNode1 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
- seriesScanNode1.setDataRegionReplicaSet(
+ seriesScanNode1.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
seriesScanNode1.setScanOrder(OrderBy.TIMESTAMP_DESC);
SeriesScanNode seriesScanNode2 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode2"), new MeasurementPath("root.sg.d2.s1"));
- seriesScanNode2.setDataRegionReplicaSet(
+ seriesScanNode2.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(2), new ArrayList<>()));
seriesScanNode2.setScanOrder(OrderBy.TIMESTAMP_DESC);
SeriesScanNode seriesScanNode3 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode3"), new MeasurementPath("root.sg.d2.s2"));
- seriesScanNode3.setDataRegionReplicaSet(
+ seriesScanNode3.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(3), new ArrayList<>()));
seriesScanNode3.setScanOrder(OrderBy.TIMESTAMP_DESC);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index f6b1a26cc2..b2c24aed5d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -29,10 +29,15 @@ import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
@@ -365,6 +370,86 @@ public class LogicalPlannerTest {
}
}
+ @Test
+ public void testShowTimeSeries() {
+ String sql =
+ "SHOW LATEST TIMESERIES root.ln.wf01.wt01.status WHERE tagK = tagV limit 20 offset 10";
+
+ try {
+ LimitNode limitNode = (LimitNode) parseSQLToPlanNode(sql);
+ OffsetNode offsetNode = (OffsetNode) limitNode.getChild();
+ SchemaMergeNode metaMergeNode = (SchemaMergeNode) offsetNode.getChild();
+ metaMergeNode.getChildren().forEach(n -> System.out.println(n.toString()));
+ TimeSeriesSchemaScanNode showTimeSeriesNode =
+ (TimeSeriesSchemaScanNode) metaMergeNode.getChildren().get(0);
+ Assert.assertNotNull(showTimeSeriesNode);
+ Assert.assertEquals(
+ new PartialPath("root.ln.wf01.wt01.status"), showTimeSeriesNode.getPath());
+ Assert.assertEquals("root.ln.wf01.wt01", showTimeSeriesNode.getPath().getDevice());
+ Assert.assertTrue(showTimeSeriesNode.isOrderByHeat());
+ Assert.assertFalse(showTimeSeriesNode.isContains());
+ Assert.assertEquals("tagK", showTimeSeriesNode.getKey());
+ Assert.assertEquals("tagV", showTimeSeriesNode.getValue());
+ Assert.assertEquals(20, showTimeSeriesNode.getLimit());
+ Assert.assertEquals(10, showTimeSeriesNode.getOffset());
+ Assert.assertTrue(showTimeSeriesNode.isHasLimit());
+
+ // test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ showTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ TimeSeriesSchemaScanNode showTimeSeriesNode2 =
+ (TimeSeriesSchemaScanNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertNotNull(showTimeSeriesNode2);
+ Assert.assertEquals(
+ new PartialPath("root.ln.wf01.wt01.status"), showTimeSeriesNode2.getPath());
+ Assert.assertEquals("root.ln.wf01.wt01", showTimeSeriesNode2.getPath().getDevice());
+ Assert.assertTrue(showTimeSeriesNode2.isOrderByHeat());
+ Assert.assertFalse(showTimeSeriesNode2.isContains());
+ Assert.assertEquals("tagK", showTimeSeriesNode2.getKey());
+ Assert.assertEquals("tagV", showTimeSeriesNode2.getValue());
+ Assert.assertEquals(20, showTimeSeriesNode2.getLimit());
+ Assert.assertEquals(10, showTimeSeriesNode2.getOffset());
+ Assert.assertTrue(showTimeSeriesNode2.isHasLimit());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testShowDevices() {
+ String sql = "SHOW DEVICES root.ln.wf01.wt01 WITH STORAGE GROUP limit 20 offset 10";
+ try {
+ LimitNode limitNode = (LimitNode) parseSQLToPlanNode(sql);
+ OffsetNode offsetNode = (OffsetNode) limitNode.getChild();
+ SchemaMergeNode metaMergeNode = (SchemaMergeNode) offsetNode.getChild();
+ DevicesSchemaScanNode showDevicesNode =
+ (DevicesSchemaScanNode) metaMergeNode.getChildren().get(0);
+ Assert.assertNotNull(showDevicesNode);
+ Assert.assertEquals(new PartialPath("root.ln.wf01.wt01"), showDevicesNode.getPath());
+ Assert.assertTrue(showDevicesNode.isHasSgCol());
+ Assert.assertEquals(20, showDevicesNode.getLimit());
+ Assert.assertEquals(10, showDevicesNode.getOffset());
+ Assert.assertTrue(showDevicesNode.isHasLimit());
+
+ // test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ showDevicesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ DevicesSchemaScanNode showDevicesNode2 =
+ (DevicesSchemaScanNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertNotNull(showDevicesNode2);
+ Assert.assertEquals(new PartialPath("root.ln.wf01.wt01"), showDevicesNode2.getPath());
+ Assert.assertEquals(20, showDevicesNode2.getLimit());
+ Assert.assertEquals(10, showDevicesNode2.getOffset());
+ Assert.assertTrue(showDevicesNode2.isHasLimit());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
@Test
public void authorTest() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
index 201800a4a3..0924bee3fd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
@@ -155,17 +155,17 @@ public class OffsetNodeSerdeTest {
timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
SeriesScanNode seriesScanNode1 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
- seriesScanNode1.setDataRegionReplicaSet(
+ seriesScanNode1.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
seriesScanNode1.setScanOrder(OrderBy.TIMESTAMP_DESC);
SeriesScanNode seriesScanNode2 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode2"), new MeasurementPath("root.sg.d2.s1"));
- seriesScanNode2.setDataRegionReplicaSet(
+ seriesScanNode2.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(2), new ArrayList<>()));
seriesScanNode2.setScanOrder(OrderBy.TIMESTAMP_DESC);
SeriesScanNode seriesScanNode3 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode3"), new MeasurementPath("root.sg.d2.s2"));
- seriesScanNode3.setDataRegionReplicaSet(
+ seriesScanNode3.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(3), new ArrayList<>()));
seriesScanNode3.setScanOrder(OrderBy.TIMESTAMP_DESC);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
index 39ccbe1193..b66c3e4e9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
@@ -58,7 +58,7 @@ public class SeriesAggregateScanNodeSerdeTest {
OrderBy.TIMESTAMP_ASC,
new In<String>(st, VALUE_FILTER, true),
null);
- seriesAggregateScanNode.setDataRegionReplicaSet(
+ seriesAggregateScanNode.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);