You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/28 07:06:10 UTC
[iotdb] branch master updated: [IOTDB-3258] implement count nodes (#6002)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 94bd0fe924 [IOTDB-3258] implement count nodes (#6002)
94bd0fe924 is described below
commit 94bd0fe924af8015e91383b09ff11fa2df5ce1e4
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Sat May 28 15:06:06 2022 +0800
[IOTDB-3258] implement count nodes (#6002)
---
.../consensus/request/ConfigRequest.java | 6 +-
.../consensus/request/ConfigRequestType.java | 3 +-
.../request/read/GetChildPathsPartitionReq.java | 68 -----------------
...itionReq.java => GetNodePathsPartitionReq.java} | 31 ++++++--
.../iotdb/confignode/manager/ConfigManager.java | 28 ++-----
.../apache/iotdb/confignode/manager/Manager.java | 9 +--
.../iotdb/confignode/manager/PartitionManager.java | 23 +-----
.../confignode/persistence/ClusterSchemaInfo.java | 15 ++++
.../executor/ConfigRequestExecutor.java | 54 +++++++-------
.../thrift/ConfigNodeRPCServiceProcessor.java | 10 +--
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 59 +++++++++++++--
.../mtree/traverser/collector/MNodeCollector.java | 9 ++-
.../schema/NodeManageMemoryMergeOperator.java | 19 ++---
...Operator.java => NodePathsConvertOperator.java} | 50 ++++++-------
...anOperator.java => NodePathsCountOperator.java} | 65 +++++++---------
...rator.java => NodePathsSchemaScanOperator.java} | 34 ++++++---
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 31 ++++++--
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 18 +++--
.../mpp/plan/analyze/FakePartitionFetcherImpl.java | 5 +-
.../db/mpp/plan/analyze/IPartitionFetcher.java | 10 ++-
.../plan/analyze/StandalonePartitionFetcher.java | 5 +-
.../memory/StatementMemorySourceVisitor.java | 14 ++++
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 61 ++++++++-------
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 31 +++++---
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 22 ++++--
.../planner/distribution/ExchangeNodeAdder.java | 5 ++
.../mpp/plan/planner/plan/node/PlanNodeType.java | 18 +++--
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 15 ++--
.../metedata/read/ChildNodesSchemaScanNode.java | 87 ----------------------
.../read/NodeManagementMemoryMergeNode.java | 20 +----
...oryMergeNode.java => NodePathsConvertNode.java} | 45 ++---------
...emoryMergeNode.java => NodePathsCountNode.java} | 45 ++---------
...aScanNode.java => NodePathsSchemaScanNode.java} | 24 ++++--
.../db/mpp/plan/statement/StatementVisitor.java | 5 ++
.../statement/metadata/CountNodesStatement.java | 10 +++
.../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java | 45 ++++++++---
.../NodeManagementMemoryMergeNodeSerdeTest.java | 81 ++++++++++----------
.../src/main/thrift/confignode.thrift | 7 +-
38 files changed, 514 insertions(+), 573 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index fd7d01ab92..92228a7d26 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.confignode.consensus.request;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
-import org.apache.iotdb.confignode.consensus.request.read.GetChildPathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
@@ -191,8 +191,8 @@ public abstract class ConfigRequest implements IConsensusRequest {
case CreateFunction:
req = new CreateFunctionReq();
break;
- case GetChildPathsPartition:
- req = new GetChildPathsPartitionReq();
+ case GetNodePathsPartition:
+ req = new GetNodePathsPartitionReq();
break;
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index 11fbc14ff4..f0cb20ea8c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -60,6 +60,5 @@ public enum ConfigRequestType {
ListRoleUsers,
ApplyConfigNode,
CreateFunction,
- GetChildPathsPartition,
- GetChildNodesPartition;
+ GetNodePathsPartition;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetChildPathsPartitionReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetChildPathsPartitionReq.java
deleted file mode 100644
index 292801bfd7..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetChildPathsPartitionReq.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.consensus.request.read;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
-import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
-import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-public class GetChildPathsPartitionReq extends ConfigRequest {
- private PartialPath partialPath;
-
- public GetChildPathsPartitionReq() {
- super(ConfigRequestType.GetChildPathsPartition);
- }
-
- public PartialPath getPartialPath() {
- return partialPath;
- }
-
- public void setPartialPath(PartialPath partialPath) {
- this.partialPath = partialPath;
- }
-
- @Override
- protected void serializeImpl(ByteBuffer buffer) {
- partialPath.serialize(buffer);
- }
-
- @Override
- protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- partialPath = (PartialPath) PathDeserializeUtil.deserialize(buffer);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- GetChildPathsPartitionReq that = (GetChildPathsPartitionReq) o;
- return partialPath.equals(that.partialPath);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partialPath);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetChildNodesPartitionReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionReq.java
similarity index 72%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetChildNodesPartitionReq.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionReq.java
index 519b8a27e9..cb7abd836e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetChildNodesPartitionReq.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionReq.java
@@ -28,11 +28,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class GetChildNodesPartitionReq extends ConfigRequest {
+public class GetNodePathsPartitionReq extends ConfigRequest {
private PartialPath partialPath;
+ private int level = -1;
- public GetChildNodesPartitionReq() {
- super(ConfigRequestType.GetChildNodesPartition);
+ public GetNodePathsPartitionReq() {
+ super(ConfigRequestType.GetNodePathsPartition);
}
public PartialPath getPartialPath() {
@@ -43,26 +44,40 @@ public class GetChildNodesPartitionReq extends ConfigRequest {
this.partialPath = partialPath;
}
+ public int getLevel() {
+ return level;
+ }
+
+ public void setLevel(int level) {
+ this.level = level;
+ }
+
@Override
protected void serializeImpl(ByteBuffer buffer) {
partialPath.serialize(buffer);
+ buffer.putInt(level);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
partialPath = (PartialPath) PathDeserializeUtil.deserialize(buffer);
+ level = buffer.getInt();
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- GetChildNodesPartitionReq that = (GetChildNodesPartitionReq) o;
- return partialPath.equals(that.partialPath);
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GetNodePathsPartitionReq that = (GetNodePathsPartitionReq) o;
+ return level == that.level && Objects.equals(partialPath, that.partialPath);
}
@Override
public int hashCode() {
- return Objects.hash(partialPath);
+ return Objects.hash(partialPath, level);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 83775e0fd8..23154f06a0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -29,10 +29,9 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
-import org.apache.iotdb.confignode.consensus.request.read.GetChildNodesPartitionReq;
-import org.apache.iotdb.confignode.consensus.request.read.GetChildPathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
@@ -386,26 +385,15 @@ public class ConfigManager implements Manager {
}
@Override
- public DataSet getChildPathsPartition(PartialPath partialPath) {
+ public DataSet getNodePathsPartition(PartialPath partialPath, Integer level) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- GetChildPathsPartitionReq getChildPathsPartitionReq = new GetChildPathsPartitionReq();
- getChildPathsPartitionReq.setPartialPath(partialPath);
- return partitionManager.getChildPathsPartition(getChildPathsPartitionReq);
- } else {
- SchemaNodeManagementResp dataSet = new SchemaNodeManagementResp();
- dataSet.setStatus(status);
- return dataSet;
- }
- }
-
- @Override
- public DataSet getChildNodesPartition(PartialPath partialPath) {
- TSStatus status = confirmLeader();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- GetChildNodesPartitionReq getChildNodesPartitionReq = new GetChildNodesPartitionReq();
- getChildNodesPartitionReq.setPartialPath(partialPath);
- return partitionManager.getChildNodesPartition(getChildNodesPartitionReq);
+ GetNodePathsPartitionReq getNodePathsPartitionReq = new GetNodePathsPartitionReq();
+ getNodePathsPartitionReq.setPartialPath(partialPath);
+ if (null != level) {
+ getNodePathsPartitionReq.setLevel(level);
+ }
+ return partitionManager.getNodePathsPartition(getNodePathsPartitionReq);
} else {
SchemaNodeManagementResp dataSet = new SchemaNodeManagementResp();
dataSet.setStatus(status);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 47892d1fbc..b689f52258 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -167,14 +167,7 @@ public interface Manager {
*
* @return SchemaNodeManagementPartitionDataSet
*/
- DataSet getChildPathsPartition(PartialPath partialPath);
-
- /**
- * create SchemaNodeManagementPartition for child nodes node management
- *
- * @return SchemaNodeManagementPartitionDataSet
- */
- DataSet getChildNodesPartition(PartialPath partialPath);
+ DataSet getNodePathsPartition(PartialPath partialPath, Integer level);
/**
* Get DataPartition
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index a969a5ee62..d672f68cd4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -29,9 +29,8 @@ import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.request.read.GetChildNodesPartitionReq;
-import org.apache.iotdb.confignode.consensus.request.read.GetChildPathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
@@ -359,27 +358,13 @@ public class PartitionManager {
}
/**
- * Get ChildPathsPartition
+ * GetNodePathsPartition
*
- * @param physicalPlan GetChildNodesPartitionReq
+ * @param physicalPlan GetNodesPathsPartitionReq
* @return SchemaNodeManagementPartitionDataSet that contains only existing matched
* SchemaPartition and matched child paths aboveMtree
*/
- public DataSet getChildPathsPartition(GetChildPathsPartitionReq physicalPlan) {
- SchemaNodeManagementResp schemaNodeManagementResp;
- ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
- schemaNodeManagementResp = (SchemaNodeManagementResp) consensusReadResponse.getDataset();
- return schemaNodeManagementResp;
- }
-
- /**
- * Get ChildNodesPartition
- *
- * @param physicalPlan GetChildNodesPartitionReq
- * @return SchemaNodeManagementPartitionDataSet that contains only existing matched
- * SchemaPartition and matched child nodes aboveMtree
- */
- public DataSet getChildNodesPartition(GetChildNodesPartitionReq physicalPlan) {
+ public DataSet getNodePathsPartition(GetNodePathsPartitionReq physicalPlan) {
SchemaNodeManagementResp schemaNodeManagementResp;
ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
schemaNodeManagementResp = (SchemaNodeManagementResp) consensusReadResponse.getDataset();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
index 53efa7453c..569789b0ec 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
@@ -497,6 +497,21 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
}
}
+ public Pair<List<PartialPath>, Set<PartialPath>> getNodesListInGivenLevel(
+ PartialPath partialPath, int level) {
+ Pair<List<PartialPath>, Set<PartialPath>> matchedPathsInNextLevel =
+ new Pair(new HashSet<>(), new HashSet<>());
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ matchedPathsInNextLevel = mTree.getNodesListInGivenLevel(partialPath, level, true, null);
+ } catch (MetadataException e) {
+ LOGGER.error("Error get matched paths in given level.", e);
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ return matchedPathsInNextLevel;
+ }
+
public Pair<Set<String>, Set<PartialPath>> getChildNodePathInNextLevel(PartialPath partialPath) {
Pair<Set<String>, Set<PartialPath>> matchedPathsInNextLevel =
new Pair(new HashSet<>(), new HashSet<>());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 1a008ffbcf..5db1567af4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -23,13 +23,11 @@ import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
-import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
-import org.apache.iotdb.confignode.consensus.request.read.GetChildNodesPartitionReq;
-import org.apache.iotdb.confignode.consensus.request.read.GetChildPathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
@@ -70,6 +68,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
public class ConfigRequestExecutor {
@@ -129,9 +128,8 @@ public class ConfigRequestExecutor {
return authorInfo.executeListUserRoles((AuthorReq) req);
case ListRoleUsers:
return authorInfo.executeListRoleUsers((AuthorReq) req);
- case GetChildPathsPartition:
- case GetChildNodesPartition:
- return getSchemaNodeManagementPartiton(req);
+ case GetNodePathsPartition:
+ return getSchemaNodeManagementPartition(req);
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
@@ -260,35 +258,41 @@ public class ConfigRequestExecutor {
});
}
- private DataSet getSchemaNodeManagementPartiton(ConfigRequest req)
+ private DataSet getSchemaNodeManagementPartition(ConfigRequest req)
throws UnknownPhysicalPlanTypeException {
- Pair<Set<String>, Set<PartialPath>> matchedChildInNextLevel;
+ int level = -1;
+ PartialPath partialPath;
+ Set<String> alreadyMatchedNode;
+ Set<PartialPath> needMatchedNode;
List<String> matchedStorageGroups = new ArrayList<>();
- if (req.getType() == ConfigRequestType.GetChildPathsPartition) {
- GetChildPathsPartitionReq getChildPathsPartitionReq = (GetChildPathsPartitionReq) req;
- // Pair.left means already find matched child paths from aboveMtree,
- // Pair.right means need more info from DataNode's schemaRegion.
- matchedChildInNextLevel =
- clusterSchemaInfo.getChildNodePathInNextLevel(getChildPathsPartitionReq.getPartialPath());
- } else if (req.getType() == ConfigRequestType.GetChildNodesPartition) {
- GetChildNodesPartitionReq getChildNodesPartitionReq = (GetChildNodesPartitionReq) req;
-
- // Pair.left means already find matched child paths from aboveMtree,
- // Pair.right means need more info from DataNode's schemaRegion.
- matchedChildInNextLevel =
- clusterSchemaInfo.getChildNodeNameInNextLevel(getChildNodesPartitionReq.getPartialPath());
+ GetNodePathsPartitionReq getNodePathsPartitionReq = (GetNodePathsPartitionReq) req;
+ partialPath = getNodePathsPartitionReq.getPartialPath();
+ level = getNodePathsPartitionReq.getLevel();
+ if (-1 == level) {
+ // get child paths
+ Pair<Set<String>, Set<PartialPath>> matchedChildInNextLevel =
+ clusterSchemaInfo.getChildNodePathInNextLevel(partialPath);
+ alreadyMatchedNode = matchedChildInNextLevel.left;
+ needMatchedNode = matchedChildInNextLevel.right;
} else {
- throw new UnknownPhysicalPlanTypeException(req.getType());
+ // count nodes
+ Pair<List<PartialPath>, Set<PartialPath>> matchedChildInNextLevel =
+ clusterSchemaInfo.getNodesListInGivenLevel(partialPath, level);
+ alreadyMatchedNode =
+ matchedChildInNextLevel.left.stream()
+ .map(PartialPath::getFullPath)
+ .collect(Collectors.toSet());
+ needMatchedNode = matchedChildInNextLevel.right;
}
- matchedChildInNextLevel.right.forEach(
- childPath -> matchedStorageGroups.add(childPath.getFullPath()));
+
+ needMatchedNode.forEach(nodePath -> matchedStorageGroups.add(nodePath.getFullPath()));
SchemaNodeManagementResp schemaNodeManagementResp =
(SchemaNodeManagementResp)
partitionInfo.getSchemaNodeManagementPartition(matchedStorageGroups);
if (schemaNodeManagementResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- schemaNodeManagementResp.setMatchedNode(matchedChildInNextLevel.left);
+ schemaNodeManagementResp.setMatchedNode(alreadyMatchedNode);
}
return schemaNodeManagementResp;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index cbb0e54424..1d46688414 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -50,7 +50,6 @@ import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
@@ -272,13 +271,8 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
PartialPath partialPath = patternTree.splitToPathList().get(0);
SchemaNodeManagementResp schemaNodeManagementResp;
- if (req.getType() == NodeManagementType.CHILD_PATHS) {
- schemaNodeManagementResp =
- (SchemaNodeManagementResp) configManager.getChildPathsPartition(partialPath);
- } else {
- schemaNodeManagementResp =
- (SchemaNodeManagementResp) configManager.getChildNodesPartition(partialPath);
- }
+ schemaNodeManagementResp =
+ (SchemaNodeManagementResp) configManager.getNodePathsPartition(partialPath, req.getLevel());
TSchemaNodeManagementResp resp = new TSchemaNodeManagementResp();
schemaNodeManagementResp.convertToRpcSchemaNodeManagementPartitionResp(resp);
return resp;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 87ad5c0374..186863a878 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -33,6 +33,10 @@ import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
@@ -49,6 +53,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
@@ -90,6 +96,11 @@ public class ConfigNodeRPCServiceProcessorTest {
@BeforeClass
public static void beforeClass() throws StartupException, ConfigurationException, IOException {
+ final ConfigNodeConf configNodeConf = ConfigNodeDescriptor.getInstance().getConf();
+ UDFExecutableManager.setupAndGetInstance(
+ configNodeConf.getTemporaryLibDir(), configNodeConf.getUdfLibDir());
+ UDFClassLoaderManager.setupAndGetInstance(configNodeConf.getUdfLibDir());
+ UDFRegistrationService.setupAndGetInstance(configNodeConf.getSystemUdfDir());
ConfigNodeStartupCheck.getInstance().startUpCheck();
}
@@ -109,6 +120,9 @@ public class ConfigNodeRPCServiceProcessorTest {
@AfterClass
public static void afterClass() throws IOException {
+ UDFExecutableManager.getInstance().stop();
+ UDFClassLoaderManager.getInstance().stop();
+ UDFRegistrationService.getInstance().stop();
FileUtils.deleteFully(new File(ConfigNodeConstant.DATA_DIR));
}
@@ -147,7 +161,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void registerAndQueryDataNodeTest() throws TException {
+ public void testRegisterAndQueryDataNode() throws TException {
registerDataNodes();
// test success re-register
@@ -201,7 +215,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void setAndQueryStorageGroupTest() throws TException {
+ public void testSetAndQueryStorageGroup() throws TException {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -303,7 +317,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void getAndCreateSchemaPartitionTest()
+ public void testGetAndCreateSchemaPartition()
throws TException, IOException, IllegalPathException {
final String sg = "root.sg";
final String sg0 = "root.sg0";
@@ -508,7 +522,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void getAndCreateDataPartitionTest() throws TException {
+ public void testGetAndCreateDataPartition() throws TException {
final String sg = "root.sg";
final int storageGroupNum = 2;
final int seriesPartitionSlotNum = 4;
@@ -575,7 +589,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void permissionTest() throws TException {
+ public void testPermission() throws TException {
TSStatus status;
List<String> userList = new ArrayList<>();
@@ -912,7 +926,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void deleteStorageGroupTest() throws TException {
+ public void testDeleteStorageGroup() throws TException {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -938,7 +952,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void deleteStorageGroupInvalidateCacheFailedTest() throws TException {
+ public void testDeleteStorageGroupInvalidateCacheFailed() throws TException {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -1016,4 +1030,35 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
}
}
+
+ @Test
+ public void testGetSchemaNodeManagementPartition()
+ throws TException, IllegalPathException, IOException {
+ final String sg = "root.sg";
+ final int storageGroupNum = 2;
+
+ TSStatus status;
+ TSchemaNodeManagementReq nodeManagementReq;
+ TSchemaNodeManagementResp nodeManagementResp;
+
+ // register DataNodes
+ registerDataNodes();
+
+ // set StorageGroups
+ for (int i = 0; i < storageGroupNum; i++) {
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+ status = processor.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+
+ ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
+ nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
+ nodeManagementReq.setLevel(-1);
+ nodeManagementResp = processor.getSchemaNodeManagementPartition(nodeManagementReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
+ Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
+ Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
+ Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/collector/MNodeCollector.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/collector/MNodeCollector.java
index 3deff71fad..f748631d55 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/collector/MNodeCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/collector/MNodeCollector.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.metadata.LocalSchemaProcessor.StorageGroupFilter;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mtree.store.IMTreeStore;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.HashSet;
import java.util.Set;
@@ -70,8 +72,10 @@ public abstract class MNodeCollector<T> extends CollectorTraverser<T> {
if (level < targetLevel) {
return false;
}
+ Deque<IMNode> stack = new ArrayDeque<>();
while (level > targetLevel) {
- node = node.getParent();
+ node = traverseContext.pop();
+ stack.push(node);
level--;
}
// record processed node so they will not be processed twice
@@ -79,6 +83,9 @@ public abstract class MNodeCollector<T> extends CollectorTraverser<T> {
processedNodes.add(node);
transferToResult(node);
}
+ while (!stack.isEmpty()) {
+ traverseContext.push(stack.pop());
+ }
return true;
} else {
transferToResult(node);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index f6833b8f35..5532092483 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.schema;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -39,15 +38,13 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
private final OperatorContext operatorContext;
private Set<String> data;
private final Operator child;
- private NodeManagementType type;
private boolean isFinished;
public NodeManageMemoryMergeOperator(
- OperatorContext operatorContext, Set<String> data, Operator child, NodeManagementType type) {
+ OperatorContext operatorContext, Set<String> data, Operator child) {
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.data = data;
this.child = requireNonNull(child, "child operator is null");
- this.type = type;
isFinished = false;
}
@@ -65,18 +62,14 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
public TsBlock next() {
isFinished = true;
TsBlock block = child.next();
- TsBlockBuilder tsBlockBuilder;
- if (type == NodeManagementType.CHILD_PATHS) {
- tsBlockBuilder = new TsBlockBuilder(HeaderConstant.showChildPathsHeader.getRespDataTypes());
- } else {
- tsBlockBuilder = new TsBlockBuilder(HeaderConstant.showChildNodesHeader.getRespDataTypes());
- }
- Set<String> childPaths = new TreeSet<>(data);
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(HeaderConstant.showChildPathsHeader.getRespDataTypes());
+ Set<String> nodePaths = new TreeSet<>(data);
for (int i = 0; i < block.getPositionCount(); i++) {
- childPaths.add(block.getColumn(0).getBinary(i).toString());
+ nodePaths.add(block.getColumn(0).getBinary(i).toString());
}
- childPaths.forEach(
+ nodePaths.forEach(
path -> {
tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(path));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index f6833b8f35..3ad0619b7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,7 +19,8 @@
package org.apache.iotdb.db.mpp.execution.operator.schema;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -29,25 +30,22 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.Set;
-import java.util.TreeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
-public class NodeManageMemoryMergeOperator implements ProcessOperator {
+public class NodePathsConvertOperator implements ProcessOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NodePathsConvertOperator.class);
+
private final OperatorContext operatorContext;
- private Set<String> data;
private final Operator child;
- private NodeManagementType type;
private boolean isFinished;
- public NodeManageMemoryMergeOperator(
- OperatorContext operatorContext, Set<String> data, Operator child, NodeManagementType type) {
+ public NodePathsConvertOperator(OperatorContext operatorContext, Operator child) {
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
- this.data = data;
this.child = requireNonNull(child, "child operator is null");
- this.type = type;
isFinished = false;
}
@@ -65,23 +63,23 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
public TsBlock next() {
isFinished = true;
TsBlock block = child.next();
- TsBlockBuilder tsBlockBuilder;
- if (type == NodeManagementType.CHILD_PATHS) {
- tsBlockBuilder = new TsBlockBuilder(HeaderConstant.showChildPathsHeader.getRespDataTypes());
- } else {
- tsBlockBuilder = new TsBlockBuilder(HeaderConstant.showChildNodesHeader.getRespDataTypes());
- }
- Set<String> childPaths = new TreeSet<>(data);
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(HeaderConstant.showChildNodesHeader.getRespDataTypes());
+
for (int i = 0; i < block.getPositionCount(); i++) {
- childPaths.add(block.getColumn(0).getBinary(i).toString());
+ String path = block.getColumn(0).getBinary(i).toString();
+ PartialPath partialPath;
+ try {
+ partialPath = new PartialPath(path);
+ } catch (IllegalPathException e) {
+ LOGGER.warn("Failed to convert node path to PartialPath {}", path);
+ continue;
+ }
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(partialPath.getTailNode()));
+ tsBlockBuilder.declarePosition();
}
- childPaths.forEach(
- path -> {
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(path));
- tsBlockBuilder.declarePosition();
- });
return tsBlockBuilder.build();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/ChildNodesSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
similarity index 50%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/ChildNodesSchemaScanOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index f8af069197..6276c4d5eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/ChildNodesSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -19,33 +19,27 @@
package org.apache.iotdb.db.mpp.execution.operator.schema;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
-import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
-public class ChildNodesSchemaScanOperator implements SourceOperator {
- private final PlanNodeId sourceId;
+import static java.util.Objects.requireNonNull;
- private final OperatorContext operatorContext;
-
- private final PartialPath partialPath;
+public class NodePathsCountOperator implements ProcessOperator {
+ private final OperatorContext operatorContext;
+ private final Operator child;
private boolean isFinished;
- public ChildNodesSchemaScanOperator(
- PlanNodeId sourceId, OperatorContext operatorContext, PartialPath partialPath) {
- this.sourceId = sourceId;
- this.operatorContext = operatorContext;
- this.partialPath = partialPath;
+ public NodePathsCountOperator(OperatorContext operatorContext, Operator child) {
+ this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+ this.child = requireNonNull(child, "child operator is null");
+ isFinished = false;
}
@Override
@@ -53,41 +47,36 @@ public class ChildNodesSchemaScanOperator implements SourceOperator {
return operatorContext;
}
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return child.isBlocked();
+ }
+
@Override
public TsBlock next() {
isFinished = true;
+ TsBlock block = child.next();
TsBlockBuilder tsBlockBuilder =
- new TsBlockBuilder(HeaderConstant.showChildNodesHeader.getRespDataTypes());
- Set<String> childNames;
- try {
- childNames =
- ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
- .getSchemaRegion()
- .getChildNodeNameInNextLevel(partialPath);
- } catch (MetadataException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- childNames.forEach(
- (path) -> {
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(path));
- tsBlockBuilder.declarePosition();
- });
+ new TsBlockBuilder(HeaderConstant.countNodesHeader.getRespDataTypes());
+
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeInt(block.getPositionCount());
+ tsBlockBuilder.declarePosition();
return tsBlockBuilder.build();
}
@Override
public boolean hasNext() {
- return !isFinished;
+ return child.hasNext();
}
@Override
- public boolean isFinished() {
- return isFinished;
+ public void close() throws Exception {
+ child.close();
}
@Override
- public PlanNodeId getSourceId() {
- return sourceId;
+ public boolean isFinished() {
+ return isFinished || child.isFinished();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/ChildPathsSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/ChildPathsSchemaScanOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
index ba187535ec..a35dff908b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/ChildPathsSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
@@ -31,21 +31,25 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import java.util.Set;
+import java.util.stream.Collectors;
-public class ChildPathsSchemaScanOperator implements SourceOperator {
+public class NodePathsSchemaScanOperator implements SourceOperator {
private final PlanNodeId sourceId;
private final OperatorContext operatorContext;
private final PartialPath partialPath;
+ private final int level;
+
private boolean isFinished;
- public ChildPathsSchemaScanOperator(
- PlanNodeId sourceId, OperatorContext operatorContext, PartialPath partialPath) {
+ public NodePathsSchemaScanOperator(
+ PlanNodeId sourceId, OperatorContext operatorContext, PartialPath partialPath, int level) {
this.sourceId = sourceId;
this.operatorContext = operatorContext;
this.partialPath = partialPath;
+ this.level = level;
}
@Override
@@ -58,16 +62,28 @@ public class ChildPathsSchemaScanOperator implements SourceOperator {
isFinished = true;
TsBlockBuilder tsBlockBuilder =
new TsBlockBuilder(HeaderConstant.showChildPathsHeader.getRespDataTypes());
- Set<String> childPaths;
+ Set<String> nodePaths;
+
try {
- childPaths =
- ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
- .getSchemaRegion()
- .getChildNodePathInNextLevel(partialPath);
+ if (-1 == level) {
+ // show child paths
+ nodePaths =
+ ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getChildNodePathInNextLevel(partialPath);
+ } else {
+ nodePaths =
+ ((SchemaDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion().getNodesListInGivenLevel(partialPath, level, true, null).stream()
+ .map(PartialPath::getFullPath)
+ .collect(Collectors.toSet());
+ }
+
} catch (MetadataException e) {
throw new RuntimeException(e.getMessage(), e);
}
- childPaths.forEach(
+
+ nodePaths.forEach(
(path) -> {
tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(path));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 2e2bca5fbf..eb7810e57b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -62,6 +61,7 @@ import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
@@ -1285,6 +1285,29 @@ public class Analyzer {
return analysis;
}
+ @Override
+ public Analysis visitCountNodes(CountNodesStatement countStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(countStatement);
+
+ SchemaNodeManagementPartition schemaNodeManagementPartition =
+ partitionFetcher.getSchemaNodeManagementPartitionWithLevel(
+ new PathPatternTree(countStatement.getPartialPath()), countStatement.getLevel());
+
+ if (schemaNodeManagementPartition == null) {
+ return analysis;
+ }
+ if (!schemaNodeManagementPartition.getMatchedNode().isEmpty()
+ && schemaNodeManagementPartition.getSchemaPartition().getSchemaPartitionMap().size()
+ == 0) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ }
+ analysis.setMatchedNodes(schemaNodeManagementPartition.getMatchedNode());
+ analysis.setSchemaPartitionInfo(schemaNodeManagementPartition.getSchemaPartition());
+ analysis.setRespDatasetHeader(HeaderConstant.countNodesHeader);
+ return analysis;
+ }
+
@Override
public Analysis visitShowChildPaths(
ShowChildPathsStatement showChildPathsStatement, MPPQueryContext context) {
@@ -1293,8 +1316,7 @@ public class Analyzer {
SchemaNodeManagementPartition schemaNodeManagementPartition =
partitionFetcher.getSchemaNodeManagementPartition(
- new PathPatternTree(showChildPathsStatement.getPartialPath()),
- NodeManagementType.CHILD_PATHS);
+ new PathPatternTree(showChildPathsStatement.getPartialPath()));
if (schemaNodeManagementPartition == null) {
return analysis;
@@ -1318,8 +1340,7 @@ public class Analyzer {
SchemaNodeManagementPartition schemaNodeManagementPartition =
partitionFetcher.getSchemaNodeManagementPartition(
- new PathPatternTree(showChildNodesStatement.getPartialPath()),
- NodeManagementType.CHILD_NODES);
+ new PathPatternTree(showChildNodesStatement.getPartialPath()));
if (schemaNodeManagementPartition == null) {
return analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 874530b04c..153b167c8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -151,14 +150,14 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
@Override
- public SchemaNodeManagementPartition getSchemaNodeManagementPartition(
- PathPatternTree patternTree, NodeManagementType type) {
+ public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, Integer level) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
patternTree.constructTree();
TSchemaNodeManagementResp schemaNodeManagementResp =
client.getSchemaNodeManagementPartition(
- constructSchemaNodeManagementPartitionReq(patternTree, type));
+ constructSchemaNodeManagementPartitionReq(patternTree, level));
return parseSchemaNodeManagementPartitionResp(schemaNodeManagementResp);
} catch (TException | IOException e) {
@@ -360,14 +359,21 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
private TSchemaNodeManagementReq constructSchemaNodeManagementPartitionReq(
- PathPatternTree patternTree, NodeManagementType type) {
+ PathPatternTree patternTree, Integer level) {
PublicBAOS baos = new PublicBAOS();
try {
patternTree.serialize(baos);
ByteBuffer serializedPatternTree = ByteBuffer.allocate(baos.size());
serializedPatternTree.put(baos.getBuf(), 0, baos.size());
serializedPatternTree.flip();
- return new TSchemaNodeManagementReq(serializedPatternTree, type);
+ TSchemaNodeManagementReq schemaNodeManagementReq =
+ new TSchemaNodeManagementReq(serializedPatternTree);
+ if (null == level) {
+ schemaNodeManagementReq.setLevel(-1);
+ } else {
+ schemaNodeManagementReq.setLevel(level);
+ }
+ return schemaNodeManagementReq;
} catch (IOException e) {
throw new StatementAnalyzeException("An error occurred when serializing pattern tree");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
index 24add1c3c6..8738ae9cd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
@@ -53,8 +52,8 @@ public class FakePartitionFetcherImpl implements IPartitionFetcher {
}
@Override
- public SchemaNodeManagementPartition getSchemaNodeManagementPartition(
- PathPatternTree patternTree, NodeManagementType type) {
+ public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, Integer level) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
index b6a6dce930..93f0b6a202 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import java.util.List;
@@ -34,8 +33,13 @@ public interface IPartitionFetcher {
SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree);
- SchemaNodeManagementPartition getSchemaNodeManagementPartition(
- PathPatternTree patternTree, NodeManagementType type);
+ default SchemaNodeManagementPartition getSchemaNodeManagementPartition(
+ PathPatternTree patternTree) {
+ return getSchemaNodeManagementPartitionWithLevel(patternTree, null);
+ }
+
+ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, Integer level);
DataPartition getDataPartition(Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
index 34d2c545fb..bc0f36f78f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.DataRegionException;
@@ -68,8 +67,8 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
}
@Override
- public SchemaNodeManagementPartition getSchemaNodeManagementPartition(
- PathPatternTree patternTree, NodeManagementType type) {
+ public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, Integer level) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
index 220a952189..dfbbdd616b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -72,4 +73,17 @@ public class StatementMemorySourceVisitor
return new StatementMemorySource(
tsBlockBuilder.build(), context.getAnalysis().getRespDatasetHeader());
}
+
+ @Override
+ public StatementMemorySource visitCountNodes(
+ CountNodesStatement countStatement, StatementMemorySourceContext context) {
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(HeaderConstant.countNodesHeader.getRespDataTypes());
+ Set<String> matchedChildNodes = new TreeSet<>(context.getAnalysis().getMatchedNodes());
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeInt(matchedChildNodes.size());
+ tsBlockBuilder.declarePosition();
+ return new StatementMemorySource(
+ tsBlockBuilder.build(), context.getAnalysis().getRespDatasetHeader());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index e2a3176515..b5ee12fd41 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -82,13 +82,14 @@ import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerge
import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.schema.ChildNodesSchemaScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.schema.ChildPathsSchemaScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.LevelTimeSeriesCountOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.NodeManageMemoryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsConvertOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsSchemaScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperator;
@@ -106,13 +107,14 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsCountNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
@@ -355,10 +357,8 @@ public class LocalExecutionPlanner {
return visitTimeSeriesCount((TimeSeriesCountNode) node, context);
} else if (node instanceof LevelTimeSeriesCountNode) {
return visitLevelTimeSeriesCount((LevelTimeSeriesCountNode) node, context);
- } else if (node instanceof ChildPathsSchemaScanNode) {
- return visitChildPathsSchemaScan((ChildPathsSchemaScanNode) node, context);
- } else if (node instanceof ChildNodesSchemaScanNode) {
- return visitChildNodesSchemaScan((ChildNodesSchemaScanNode) node, context);
+ } else if (node instanceof NodePathsSchemaScanNode) {
+ return visitNodePathsSchemaScan((NodePathsSchemaScanNode) node, context);
}
return visitPlan(node, context);
}
@@ -471,41 +471,52 @@ public class LocalExecutionPlanner {
}
@Override
- public Operator visitChildPathsSchemaScan(
- ChildPathsSchemaScanNode node, LocalExecutionPlanContext context) {
+ public Operator visitNodePathsSchemaScan(
+ NodePathsSchemaScanNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context.instanceContext.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- ChildPathsSchemaScanNode.class.getSimpleName());
- return new ChildPathsSchemaScanOperator(
- node.getPlanNodeId(), operatorContext, node.getPrefixPath());
+ NodePathsSchemaScanNode.class.getSimpleName());
+ return new NodePathsSchemaScanOperator(
+ node.getPlanNodeId(), operatorContext, node.getPrefixPath(), node.getLevel());
}
@Override
- public Operator visitChildNodesSchemaScan(
- ChildNodesSchemaScanNode node, LocalExecutionPlanContext context) {
- OperatorContext operatorContext =
+ public Operator visitNodeManagementMemoryMerge(
+ NodeManagementMemoryMergeNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ return new NodeManageMemoryMergeOperator(
context.instanceContext.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- ChildNodesSchemaScanNode.class.getSimpleName());
- return new ChildNodesSchemaScanOperator(
- node.getPlanNodeId(), operatorContext, node.getPrefixPath());
+ NodeManageMemoryMergeOperator.class.getSimpleName()),
+ node.getData(),
+ child);
}
@Override
- public Operator visitNodeManagementMemoryMerge(
- NodeManagementMemoryMergeNode node, LocalExecutionPlanContext context) {
+ public Operator visitNodePathConvert(
+ NodePathsConvertNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
- return new NodeManageMemoryMergeOperator(
+ return new NodePathsConvertOperator(
context.instanceContext.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
NodeManageMemoryMergeOperator.class.getSimpleName()),
- node.getData(),
- child,
- node.getType());
+ child);
+ }
+
+ @Override
+ public Operator visitNodePathsCount(
+ NodePathsCountNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ return new NodePathsCountOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ NodeManageMemoryMergeOperator.class.getSimpleName()),
+ child);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index b6b94da6b9..15ae9ccc84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
@@ -33,13 +32,14 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsCountNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
@@ -641,20 +641,31 @@ public class LogicalPlanBuilder {
return this;
}
- public LogicalPlanBuilder planChildPathsSchemaSource(PartialPath partialPath) {
- this.root = new ChildPathsSchemaScanNode(context.getQueryId().genPlanNodeId(), partialPath);
+ public LogicalPlanBuilder planNodePathsSchemaSource(PartialPath partialPath, Integer level) {
+ this.root =
+ new NodePathsSchemaScanNode(context.getQueryId().genPlanNodeId(), partialPath, level);
+ return this;
+ }
+
+ public LogicalPlanBuilder planNodePathsConvert() {
+ NodePathsConvertNode nodePathsConvertNode =
+ new NodePathsConvertNode(context.getQueryId().genPlanNodeId());
+ nodePathsConvertNode.addChild(this.getRoot());
+ this.root = nodePathsConvertNode;
return this;
}
- public LogicalPlanBuilder planChildNodesSchemaSource(PartialPath partialPath) {
- this.root = new ChildNodesSchemaScanNode(context.getQueryId().genPlanNodeId(), partialPath);
+ public LogicalPlanBuilder planNodePathsCount() {
+ NodePathsCountNode nodePathsCountNode =
+ new NodePathsCountNode(context.getQueryId().genPlanNodeId());
+ nodePathsCountNode.addChild(this.getRoot());
+ this.root = nodePathsCountNode;
return this;
}
- public LogicalPlanBuilder planNodeManagementMemoryMerge(
- Set<String> data, NodeManagementType type) {
+ public LogicalPlanBuilder planNodeManagementMemoryMerge(Set<String> data) {
NodeManagementMemoryMergeNode memorySourceNode =
- new NodeManagementMemoryMergeNode(context.getQueryId().genPlanNodeId(), data, type);
+ new NodeManagementMemoryMergeNode(context.getQueryId().genPlanNodeId(), data);
memorySourceNode.addChild(this.getRoot());
this.root = memorySourceNode;
return this;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index e247423ca5..1f008ab26d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.metadata.utils.TimeseriesVersionUtil;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -51,6 +50,7 @@ import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
@@ -488,6 +488,17 @@ public class LogicalPlanner {
.getRoot();
}
+ @Override
+ public PlanNode visitCountNodes(CountNodesStatement countStatement, MPPQueryContext context) {
+ LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+ return planBuilder
+ .planNodePathsSchemaSource(countStatement.getPartialPath(), countStatement.getLevel())
+ .planSchemaQueryMerge(false)
+ .planNodeManagementMemoryMerge(analysis.getMatchedNodes())
+ .planNodePathsCount()
+ .getRoot();
+ }
+
@Override
public PlanNode visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
@@ -584,9 +595,9 @@ public class LogicalPlanner {
ShowChildPathsStatement showChildPathsStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
return planBuilder
- .planChildPathsSchemaSource(showChildPathsStatement.getPartialPath())
+ .planNodePathsSchemaSource(showChildPathsStatement.getPartialPath(), -1)
.planSchemaQueryMerge(false)
- .planNodeManagementMemoryMerge(analysis.getMatchedNodes(), NodeManagementType.CHILD_PATHS)
+ .planNodeManagementMemoryMerge(analysis.getMatchedNodes())
.getRoot();
}
@@ -595,9 +606,10 @@ public class LogicalPlanner {
ShowChildNodesStatement showChildNodesStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
return planBuilder
- .planChildNodesSchemaSource(showChildNodesStatement.getPartialPath())
+ .planNodePathsSchemaSource(showChildNodesStatement.getPartialPath(), -1)
.planSchemaQueryMerge(false)
- .planNodeManagementMemoryMerge(analysis.getMatchedNodes(), NodeManagementType.CHILD_NODES)
+ .planNodeManagementMemoryMerge(analysis.getMatchedNodes())
+ .planNodePathsConvert()
.getRoot();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index f85931231b..c41c9b3ac0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -156,11 +156,13 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processNoChildSourceNode(node, context);
}
+ @Override
public PlanNode visitSeriesAggregationScan(
SeriesAggregationScanNode node, NodeGroupContext context) {
return processNoChildSourceNode(node, context);
}
+ @Override
public PlanNode visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, NodeGroupContext context) {
return processNoChildSourceNode(node, context);
@@ -173,6 +175,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return node.clone();
}
+ @Override
public PlanNode visitDeleteData(DeleteDataNode node, NodeGroupContext context) {
context.putNodeDistribution(
node.getPlanNodeId(),
@@ -180,6 +183,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return node;
}
+ @Override
public PlanNode visitDeleteTimeseries(DeleteTimeSeriesNode node, NodeGroupContext context) {
List<PlanNode> visitedChildren = new ArrayList<>();
node.getChildren()
@@ -219,6 +223,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processMultiChildNode(node, context);
}
+ @Override
public PlanNode visitRowBasedSeriesAggregate(AggregationNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 5d344f1f95..4abac32e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -19,13 +19,14 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsCountNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
@@ -111,15 +112,16 @@ public enum PlanNodeType {
TRANSFORM((short) 37),
DELETE_REGION((short) 38),
CREATE_MULTI_TIME_SERIES((short) 39),
- CHILD_PATHS_SCAN((short) 40),
- CHILD_NODES_SCAN((short) 41),
+ NODE_PATHS_SCAN((short) 40),
+ NODE_PATHS_CONVERT((short) 41),
NODE_MANAGEMENT_MEMORY_MERGE((short) 42),
INVALIDATE_SCHEMA_CACHE((short) 43),
DELETE_DATA((short) 44),
DELETE_TIMESERIES((short) 45),
LAST_QUERY_SCAN((short) 46),
ALIGNED_LAST_QUERY_SCAN((short) 47),
- LAST_QUERY_MERGE((short) 48);
+ LAST_QUERY_MERGE((short) 48),
+ NODE_PATHS_COUNT((short) 49);
private final short nodeType;
@@ -224,9 +226,9 @@ public enum PlanNodeType {
case 39:
return CreateMultiTimeSeriesNode.deserialize(buffer);
case 40:
- return ChildPathsSchemaScanNode.deserialize(buffer);
+ return NodePathsSchemaScanNode.deserialize(buffer);
case 41:
- return ChildNodesSchemaScanNode.deserialize(buffer);
+ return NodePathsConvertNode.deserialize(buffer);
case 42:
return NodeManagementMemoryMergeNode.deserialize(buffer);
case 43:
@@ -241,6 +243,8 @@ public enum PlanNodeType {
return AlignedLastQueryScanNode.deserialize(buffer);
case 48:
return LastQueryMergeNode.deserialize(buffer);
+ case 49:
+ return NodePathsCountNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 5f5e09d228..20ea3ab8aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -18,13 +18,14 @@
*/
package org.apache.iotdb.db.mpp.plan.planner.plan.node;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsCountNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
@@ -234,15 +235,19 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
- public R visitChildPathsSchemaScan(ChildPathsSchemaScanNode node, C context) {
+ public R visitNodePathsSchemaScan(NodePathsSchemaScanNode node, C context) {
return visitPlan(node, context);
}
- public R visitChildNodesSchemaScan(ChildNodesSchemaScanNode node, C context) {
+ public R visitNodeManagementMemoryMerge(NodeManagementMemoryMergeNode node, C context) {
return visitPlan(node, context);
}
- public R visitNodeManagementMemoryMerge(NodeManagementMemoryMergeNode node, C context) {
+ public R visitNodePathConvert(NodePathsConvertNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitNodePathsCount(NodePathsCountNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/ChildNodesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/ChildNodesSchemaScanNode.java
deleted file mode 100644
index ca42cf4cee..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/ChildNodesSchemaScanNode.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
-import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-
-public class ChildNodesSchemaScanNode extends SchemaQueryScanNode {
- // the path could be a prefix path with wildcard
- private PartialPath prefixPath;
-
- public ChildNodesSchemaScanNode(PlanNodeId id, PartialPath prefixPath) {
- super(id);
- this.prefixPath = prefixPath;
- }
-
- public PartialPath getPrefixPath() {
- return prefixPath;
- }
-
- @Override
- public PlanNode clone() {
- return new ChildNodesSchemaScanNode(getPlanNodeId(), prefixPath);
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return HeaderConstant.showChildNodesHeader.getRespColumns();
- }
-
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.CHILD_NODES_SCAN.serialize(byteBuffer);
- prefixPath.serialize(byteBuffer);
- }
-
- public static PlanNode deserialize(ByteBuffer buffer) {
- PartialPath path = (PartialPath) PathDeserializeUtil.deserialize(buffer);
- PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
- return new ChildNodesSchemaScanNode(planNodeId, path);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- ChildNodesSchemaScanNode that = (ChildNodesSchemaScanNode) o;
- return prefixPath == that.prefixPath;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), prefixPath);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java
index c202a120ed..f697d1c84d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -39,26 +38,15 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
private PlanNode child;
- private NodeManagementType type;
-
- public NodeManagementMemoryMergeNode(PlanNodeId id, Set<String> data, NodeManagementType type) {
+ public NodeManagementMemoryMergeNode(PlanNodeId id, Set<String> data) {
super(id);
this.data = data;
- this.type = type;
}
public Set<String> getData() {
return data;
}
- public NodeManagementType getType() {
- return type;
- }
-
- public void setType(NodeManagementType type) {
- this.type = type;
- }
-
public PlanNode getChild() {
return child;
}
@@ -75,7 +63,7 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new NodeManagementMemoryMergeNode(getPlanNodeId(), this.data, this.type);
+ return new NodeManagementMemoryMergeNode(getPlanNodeId(), this.data);
}
@Override
@@ -99,7 +87,6 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
int size = data.size();
ReadWriteIOUtils.write(size, byteBuffer);
data.forEach(node -> ReadWriteIOUtils.write(node, byteBuffer));
- byteBuffer.put((byte) type.ordinal());
}
public static NodeManagementMemoryMergeNode deserialize(ByteBuffer byteBuffer) {
@@ -108,8 +95,7 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
for (int i = 0; i < size; i++) {
data.add(ReadWriteIOUtils.readString(byteBuffer));
}
- NodeManagementType type = NodeManagementType.findByValue(byteBuffer.get());
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new NodeManagementMemoryMergeNode(planNodeId, data, type);
+ return new NodeManagementMemoryMergeNode(planNodeId, data);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsConvertNode.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsConvertNode.java
index c202a120ed..fd530bff84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsConvertNode.java
@@ -19,44 +19,23 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProcessNode;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-public class NodeManagementMemoryMergeNode extends ProcessNode {
- private final Set<String> data;
+public class NodePathsConvertNode extends ProcessNode {
private PlanNode child;
- private NodeManagementType type;
-
- public NodeManagementMemoryMergeNode(PlanNodeId id, Set<String> data, NodeManagementType type) {
+ public NodePathsConvertNode(PlanNodeId id) {
super(id);
- this.data = data;
- this.type = type;
- }
-
- public Set<String> getData() {
- return data;
- }
-
- public NodeManagementType getType() {
- return type;
- }
-
- public void setType(NodeManagementType type) {
- this.type = type;
}
public PlanNode getChild() {
@@ -75,7 +54,7 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new NodeManagementMemoryMergeNode(getPlanNodeId(), this.data, this.type);
+ return new NodePathsConvertNode(getPlanNodeId());
}
@Override
@@ -90,26 +69,16 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitNodeManagementMemoryMerge(this, context);
+ return visitor.visitNodePathConvert(this, context);
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.NODE_MANAGEMENT_MEMORY_MERGE.serialize(byteBuffer);
- int size = data.size();
- ReadWriteIOUtils.write(size, byteBuffer);
- data.forEach(node -> ReadWriteIOUtils.write(node, byteBuffer));
- byteBuffer.put((byte) type.ordinal());
+ PlanNodeType.NODE_PATHS_CONVERT.serialize(byteBuffer);
}
- public static NodeManagementMemoryMergeNode deserialize(ByteBuffer byteBuffer) {
- Set<String> data = new HashSet<>();
- int size = byteBuffer.getInt();
- for (int i = 0; i < size; i++) {
- data.add(ReadWriteIOUtils.readString(byteBuffer));
- }
- NodeManagementType type = NodeManagementType.findByValue(byteBuffer.get());
+ public static NodePathsConvertNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new NodeManagementMemoryMergeNode(planNodeId, data, type);
+ return new NodePathsConvertNode(planNodeId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsCountNode.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsCountNode.java
index c202a120ed..238039ded7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodeManagementMemoryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsCountNode.java
@@ -19,44 +19,23 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProcessNode;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-public class NodeManagementMemoryMergeNode extends ProcessNode {
- private final Set<String> data;
+public class NodePathsCountNode extends ProcessNode {
private PlanNode child;
- private NodeManagementType type;
-
- public NodeManagementMemoryMergeNode(PlanNodeId id, Set<String> data, NodeManagementType type) {
+ public NodePathsCountNode(PlanNodeId id) {
super(id);
- this.data = data;
- this.type = type;
- }
-
- public Set<String> getData() {
- return data;
- }
-
- public NodeManagementType getType() {
- return type;
- }
-
- public void setType(NodeManagementType type) {
- this.type = type;
}
public PlanNode getChild() {
@@ -75,7 +54,7 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new NodeManagementMemoryMergeNode(getPlanNodeId(), this.data, this.type);
+ return new NodePathsCountNode(getPlanNodeId());
}
@Override
@@ -90,26 +69,16 @@ public class NodeManagementMemoryMergeNode extends ProcessNode {
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitNodeManagementMemoryMerge(this, context);
+ return visitor.visitNodePathsCount(this, context);
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.NODE_MANAGEMENT_MEMORY_MERGE.serialize(byteBuffer);
- int size = data.size();
- ReadWriteIOUtils.write(size, byteBuffer);
- data.forEach(node -> ReadWriteIOUtils.write(node, byteBuffer));
- byteBuffer.put((byte) type.ordinal());
+ PlanNodeType.NODE_PATHS_COUNT.serialize(byteBuffer);
}
- public static NodeManagementMemoryMergeNode deserialize(ByteBuffer byteBuffer) {
- Set<String> data = new HashSet<>();
- int size = byteBuffer.getInt();
- for (int i = 0; i < size; i++) {
- data.add(ReadWriteIOUtils.readString(byteBuffer));
- }
- NodeManagementType type = NodeManagementType.findByValue(byteBuffer.get());
+ public static NodePathsCountNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new NodeManagementMemoryMergeNode(planNodeId, data, type);
+ return new NodePathsCountNode(planNodeId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/ChildPathsSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsSchemaScanNode.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/ChildPathsSchemaScanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsSchemaScanNode.java
index db6218b201..fe6c8318d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/ChildPathsSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/NodePathsSchemaScanNode.java
@@ -30,22 +30,28 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
-public class ChildPathsSchemaScanNode extends SchemaQueryScanNode {
+public class NodePathsSchemaScanNode extends SchemaQueryScanNode {
// the path could be a prefix path with wildcard
private PartialPath prefixPath;
+ private int level = -1;
- public ChildPathsSchemaScanNode(PlanNodeId id, PartialPath prefixPath) {
+ public NodePathsSchemaScanNode(PlanNodeId id, PartialPath prefixPath, int level) {
super(id);
this.prefixPath = prefixPath;
+ this.level = level;
}
public PartialPath getPrefixPath() {
return prefixPath;
}
+ public int getLevel() {
+ return level;
+ }
+
@Override
public PlanNode clone() {
- return new ChildPathsSchemaScanNode(getPlanNodeId(), prefixPath);
+ return new NodePathsSchemaScanNode(getPlanNodeId(), prefixPath, level);
}
@Override
@@ -55,14 +61,16 @@ public class ChildPathsSchemaScanNode extends SchemaQueryScanNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.CHILD_PATHS_SCAN.serialize(byteBuffer);
+ PlanNodeType.NODE_PATHS_SCAN.serialize(byteBuffer);
prefixPath.serialize(byteBuffer);
+ byteBuffer.putInt(level);
}
public static PlanNode deserialize(ByteBuffer buffer) {
PartialPath path = (PartialPath) PathDeserializeUtil.deserialize(buffer);
+ int level = buffer.getInt();
PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
- return new ChildPathsSchemaScanNode(planNodeId, path);
+ return new NodePathsSchemaScanNode(planNodeId, path, level);
}
@Override
@@ -76,12 +84,12 @@ public class ChildPathsSchemaScanNode extends SchemaQueryScanNode {
if (!super.equals(o)) {
return false;
}
- ChildPathsSchemaScanNode that = (ChildPathsSchemaScanNode) o;
- return prefixPath == that.prefixPath;
+ NodePathsSchemaScanNode that = (NodePathsSchemaScanNode) o;
+ return level == that.level && Objects.equals(prefixPath, that.prefixPath);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), prefixPath);
+ return Objects.hash(super.hashCode(), prefixPath, level);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index e279ba9cb5..80a1111a02 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
@@ -187,6 +188,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(countStatement, context);
}
+ public R visitCountNodes(CountNodesStatement countStatement, C context) {
+ return visitStatement(countStatement, context);
+ }
+
public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
return visitStatement(insertRowStatement, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CountNodesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CountNodesStatement.java
index 618e700c9f..740eeed7e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CountNodesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CountNodesStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.statement.metadata;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
public class CountNodesStatement extends CountStatement {
private final int level;
@@ -28,4 +29,13 @@ public class CountNodesStatement extends CountStatement {
super(partialPath);
this.level = level;
}
+
+ public int getLevel() {
+ return level;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCountNodes(this, context);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index 06919ca3bf..1482ca1f93 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -32,10 +32,11 @@ import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsCountNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -63,6 +64,7 @@ import java.util.Map;
import static org.apache.iotdb.db.mpp.plan.plan.QueryLogicalPlanUtil.querySQLs;
import static org.apache.iotdb.db.mpp.plan.plan.QueryLogicalPlanUtil.sqlToPlanMap;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
public class LogicalPlannerTest {
@@ -545,6 +547,26 @@ public class LogicalPlannerTest {
}
}
+ @Test
+ public void testCountNodes() {
+ String sql = "COUNT NODES root.ln LEVEL=1";
+ try {
+ NodePathsCountNode nodePathsCountNode = (NodePathsCountNode) parseSQLToPlanNode(sql);
+ NodeManagementMemoryMergeNode nodeManagementMemoryMergeNode =
+ (NodeManagementMemoryMergeNode) nodePathsCountNode.getChildren().get(0);
+ SchemaQueryMergeNode schemaQueryMergeNode =
+ (SchemaQueryMergeNode) nodeManagementMemoryMergeNode.getChildren().get(0);
+ NodePathsSchemaScanNode nodePathsSchemaScanNode =
+ (NodePathsSchemaScanNode) schemaQueryMergeNode.getChildren().get(0);
+ Assert.assertNotNull(nodePathsSchemaScanNode);
+ Assert.assertEquals(new PartialPath("root.ln"), nodePathsSchemaScanNode.getPrefixPath());
+ Assert.assertEquals(1, nodePathsSchemaScanNode.getLevel());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
@Test
public void testShowChildPaths() {
String sql = "SHOW CHILD PATHS root.ln";
@@ -553,10 +575,10 @@ public class LogicalPlannerTest {
(NodeManagementMemoryMergeNode) parseSQLToPlanNode(sql);
SchemaQueryMergeNode schemaQueryMergeNode =
(SchemaQueryMergeNode) memorySourceNode.getChildren().get(0);
- ChildPathsSchemaScanNode childPathsSchemaScanNode =
- (ChildPathsSchemaScanNode) schemaQueryMergeNode.getChildren().get(0);
- Assert.assertNotNull(childPathsSchemaScanNode);
- Assert.assertEquals(new PartialPath("root.ln"), childPathsSchemaScanNode.getPrefixPath());
+ NodePathsSchemaScanNode nodePathsSchemaScanNode =
+ (NodePathsSchemaScanNode) schemaQueryMergeNode.getChildren().get(0);
+ Assert.assertNotNull(nodePathsSchemaScanNode);
+ Assert.assertEquals(new PartialPath("root.ln"), nodePathsSchemaScanNode.getPrefixPath());
} catch (Exception e) {
e.printStackTrace();
fail();
@@ -567,14 +589,15 @@ public class LogicalPlannerTest {
public void testShowChildNodes() {
String sql = "SHOW CHILD NODES root.ln";
try {
+ NodePathsConvertNode nodePathsConvertNode = (NodePathsConvertNode) parseSQLToPlanNode(sql);
NodeManagementMemoryMergeNode memorySourceNode =
- (NodeManagementMemoryMergeNode) parseSQLToPlanNode(sql);
+ (NodeManagementMemoryMergeNode) nodePathsConvertNode.getChildren().get(0);
SchemaQueryMergeNode schemaQueryMergeNode =
(SchemaQueryMergeNode) memorySourceNode.getChildren().get(0);
- ChildNodesSchemaScanNode childNodesSchemaScanNode =
- (ChildNodesSchemaScanNode) schemaQueryMergeNode.getChildren().get(0);
- Assert.assertNotNull(childNodesSchemaScanNode);
- Assert.assertEquals(new PartialPath("root.ln"), childNodesSchemaScanNode.getPrefixPath());
+ NodePathsSchemaScanNode nodePathsSchemaScanNode =
+ (NodePathsSchemaScanNode) schemaQueryMergeNode.getChildren().get(0);
+ assertNotNull(nodePathsConvertNode);
+ Assert.assertEquals(new PartialPath("root.ln"), nodePathsSchemaScanNode.getPrefixPath());
} catch (Exception e) {
e.printStackTrace();
fail();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/NodeManagementMemoryMergeNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/NodeManagementMemoryMergeNodeSerdeTest.java
index d6dba5bd10..21ed8442e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/NodeManagementMemoryMergeNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/NodeManagementMemoryMergeNodeSerdeTest.java
@@ -22,14 +22,14 @@ package org.apache.iotdb.db.mpp.plan.plan.node.metadata.read;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsCountNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
@@ -44,30 +44,8 @@ import java.util.Set;
public class NodeManagementMemoryMergeNodeSerdeTest {
@Test
- public void testChildPathsSerializeAndDeserialize() throws IllegalPathException {
- Set<String> data = new HashSet<>();
- data.add("root.ln");
- data.add("root.abc");
- NodeManagementMemoryMergeNode memorySourceNode =
- new NodeManagementMemoryMergeNode(
- new PlanNodeId("nodeManagementMerge"), data, NodeManagementType.CHILD_PATHS);
- SchemaQueryMergeNode schemaMergeNode = new SchemaQueryMergeNode(new PlanNodeId("schemaMerge"));
- ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("exchange"));
- ChildPathsSchemaScanNode childPathsSchemaScanNode =
- new ChildPathsSchemaScanNode(new PlanNodeId("childPathsScan"), new PartialPath("root.ln"));
- FragmentSinkNode fragmentSinkNode = new FragmentSinkNode(new PlanNodeId("fragmentSink"));
- fragmentSinkNode.addChild(childPathsSchemaScanNode);
- fragmentSinkNode.setDownStream(
- new TEndPoint("127.0.0.1", 6667),
- new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
- new PlanNodeId("test"));
- exchangeNode.addChild(schemaMergeNode);
- exchangeNode.setRemoteSourceNode(fragmentSinkNode);
- exchangeNode.setUpstream(
- new TEndPoint("127.0.0.1", 6667),
- new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
- new PlanNodeId("test"));
- memorySourceNode.addChild(exchangeNode);
+ public void testNodePathsSerializeAndDeserialize() throws IllegalPathException {
+ NodeManagementMemoryMergeNode memorySourceNode = createNodeManagementMemoryMergeNode();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
memorySourceNode.serialize(byteBuffer);
byteBuffer.flip();
@@ -77,19 +55,45 @@ public class NodeManagementMemoryMergeNodeSerdeTest {
}
@Test
- public void testChildNodesSerializeAndDeserialize() throws IllegalPathException {
+ public void testNodeConvertSerializeAndDeserialize() throws IllegalPathException {
+ NodeManagementMemoryMergeNode memorySourceNode = createNodeManagementMemoryMergeNode();
+ NodePathsConvertNode node = new NodePathsConvertNode(new PlanNodeId("nodePathConvert"));
+ node.addChild(memorySourceNode);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ node.serialize(byteBuffer);
+ byteBuffer.flip();
+ NodePathsConvertNode node1 =
+ (NodePathsConvertNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
+ Assert.assertEquals(node, node1);
+ }
+
+ @Test
+ public void testNodeCountSerializeAndDeserialize() throws IllegalPathException {
+ NodeManagementMemoryMergeNode memorySourceNode = createNodeManagementMemoryMergeNode();
+ NodePathsCountNode node = new NodePathsCountNode(new PlanNodeId("nodePathCount"));
+ node.addChild(memorySourceNode);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ node.serialize(byteBuffer);
+ byteBuffer.flip();
+ NodePathsCountNode node1 =
+ (NodePathsCountNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
+ Assert.assertEquals(node, node1);
+ }
+
+ private NodeManagementMemoryMergeNode createNodeManagementMemoryMergeNode()
+ throws IllegalPathException {
Set<String> data = new HashSet<>();
- data.add("ln");
- data.add("abc");
+ data.add("root.ln");
+ data.add("root.abc");
NodeManagementMemoryMergeNode memorySourceNode =
- new NodeManagementMemoryMergeNode(
- new PlanNodeId("nodeManagementMerge"), data, NodeManagementType.CHILD_NODES);
+ new NodeManagementMemoryMergeNode(new PlanNodeId("nodeManagementMerge"), data);
SchemaQueryMergeNode schemaMergeNode = new SchemaQueryMergeNode(new PlanNodeId("schemaMerge"));
ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("exchange"));
- ChildNodesSchemaScanNode childNodesSchemaScanNode =
- new ChildNodesSchemaScanNode(new PlanNodeId("childNodesScan"), new PartialPath("root.ln"));
+ NodePathsSchemaScanNode childPathsSchemaScanNode =
+ new NodePathsSchemaScanNode(
+ new PlanNodeId("NodePathsScan"), new PartialPath("root.ln"), -1);
FragmentSinkNode fragmentSinkNode = new FragmentSinkNode(new PlanNodeId("fragmentSink"));
- fragmentSinkNode.addChild(childNodesSchemaScanNode);
+ fragmentSinkNode.addChild(childPathsSchemaScanNode);
fragmentSinkNode.setDownStream(
new TEndPoint("127.0.0.1", 6667),
new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
@@ -101,11 +105,6 @@ public class NodeManagementMemoryMergeNodeSerdeTest {
new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
new PlanNodeId("test"));
memorySourceNode.addChild(exchangeNode);
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
- memorySourceNode.serialize(byteBuffer);
- byteBuffer.flip();
- NodeManagementMemoryMergeNode memorySourceNode1 =
- (NodeManagementMemoryMergeNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
- Assert.assertEquals(memorySourceNode, memorySourceNode1);
+ return memorySourceNode;
}
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index ab90f5bb33..42378a0985 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -119,14 +119,9 @@ struct TSchemaPartitionResp {
// Node Management
-enum NodeManagementType {
-CHILD_PATHS,
-CHILD_NODES
-}
-
struct TSchemaNodeManagementReq {
1: required binary pathPatternTree
- 2: required NodeManagementType type
+ 2: optional i32 level
}
struct TSchemaNodeManagementResp {