You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/03/23 08:11:17 UTC
[iotdb] branch master updated: [IOTDB-1240]support show node
command in cluster version (#2866)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 77cdc38 [IOTDB-1240]support show node command in cluster version (#2866)
77cdc38 is described below
commit 77cdc38eed2560fbac4a526acff3d3f6da772b3e
Author: Yifu Zhou <ef...@gmail.com>
AuthorDate: Tue Mar 23 16:10:54 2021 +0800
[IOTDB-1240]support show node command in cluster version (#2866)
* support show child nodes in cluster version
* do not use * to import
* fix bug
* rerun ci
* add IT
Co-authored-by: YIFU ZHOU <>
---
.../cluster/client/sync/SyncClientAdaptor.java | 17 +++++
.../apache/iotdb/cluster/metadata/CMManager.java | 4 +
.../iotdb/cluster/query/ClusterPlanExecutor.java | 86 ++++++++++++++++++++++
.../iotdb/cluster/server/DataClusterServer.java | 13 ++++
.../caller/GetChildNodeNextLevelHandler.java | 59 +++++++++++++++
.../cluster/server/service/DataAsyncService.java | 11 +++
.../cluster/server/service/DataSyncService.java | 10 +++
.../cluster/client/sync/SyncClientAdaptorTest.java | 9 +++
thrift/src/main/thrift/cluster.thrift | 5 ++
9 files changed, 214 insertions(+)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index d518c62..6d73265 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.handlers.caller.GetChildNodeNextLevelHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GetChildNodeNextLevelPathHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GetDevicesHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GetNodesListHandler;
@@ -180,6 +181,22 @@ public class SyncClientAdaptor {
return response.get();
}
+ public static Set<String> getChildNodeInNextLevel(
+ AsyncDataClient client, Node header, String path) throws TException, InterruptedException {
+ GetChildNodeNextLevelHandler handler = new GetChildNodeNextLevelHandler();
+ AtomicReference<Set<String>> response = new AtomicReference<>(null);
+ handler.setResponse(response);
+ handler.setContact(client.getNode());
+
+ client.getChildNodeInNextLevel(header, path, handler);
+ synchronized (response) {
+ if (response.get() == null) {
+ response.wait(RaftServer.getReadOperationTimeoutMS());
+ }
+ }
+ return response.get();
+ }
+
public static Set<String> getNextChildren(AsyncDataClient client, Node header, String path)
throws TException, InterruptedException {
GetChildNodeNextLevelPathHandler handler = new GetChildNodeNextLevelPathHandler();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index bf121d2..97904bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1411,6 +1411,10 @@ public class CMManager extends MManager {
.collect(Collectors.toList());
}
+ public Set<String> getChildNodeInNextLevel(String path) throws MetadataException {
+ return getChildNodeInNextLevel(new PartialPath(path));
+ }
+
public Set<String> getChildNodePathInNextLevel(String path) throws MetadataException {
return getChildNodePathInNextLevel(new PartialPath(path));
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index fd5f638..ca16cfb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -383,6 +383,92 @@ public class ClusterPlanExecutor extends PlanExecutor {
}
@Override
+ protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
+ ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
+ List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+ ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+ List<Future<Void>> futureList = new ArrayList<>();
+ for (PartitionGroup group : globalGroups) {
+ futureList.add(
+ pool.submit(
+ () -> {
+ Set<String> nextChildrenNodes = null;
+ try {
+ nextChildrenNodes = getChildNodeInNextLevel(group, path);
+ } catch (CheckConsistencyException e) {
+ logger.error("Fail to get next children nodes of {} from {}", path, group, e);
+ }
+ if (nextChildrenNodes != null) {
+ resultSet.addAll(nextChildrenNodes);
+ } else {
+ logger.error("Fail to get next children nodes of {} from {}", path, group);
+ }
+ return null;
+ }));
+ }
+ waitForThreadPool(futureList, pool, "getChildNodeInNextLevel()");
+ return resultSet;
+ }
+
+ private Set<String> getChildNodeInNextLevel(PartitionGroup group, PartialPath path)
+ throws CheckConsistencyException {
+ if (group.contains(metaGroupMember.getThisNode())) {
+ return getLocalChildNodeInNextLevel(group, path);
+ } else {
+ return getRemoteChildNodeInNextLevel(group, path);
+ }
+ }
+
+ private Set<String> getLocalChildNodeInNextLevel(PartitionGroup group, PartialPath path)
+ throws CheckConsistencyException {
+ Node header = group.getHeader();
+ DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+ localDataMember.syncLeaderWithConsistencyCheck(false);
+ try {
+ return IoTDB.metaManager.getChildNodeInNextLevel(path);
+ } catch (MetadataException e) {
+ logger.error("Cannot not get next children nodes of {} from {} locally", path, group);
+ return Collections.emptySet();
+ }
+ }
+
+ private Set<String> getRemoteChildNodeInNextLevel(PartitionGroup group, PartialPath path) {
+ Set<String> nextChildrenNodes = null;
+ for (Node node : group) {
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ AsyncDataClient client =
+ metaGroupMember
+ .getClientProvider()
+ .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+ nextChildrenNodes =
+ SyncClientAdaptor.getChildNodeInNextLevel(
+ client, group.getHeader(), path.getFullPath());
+ } else {
+ try (SyncDataClient syncDataClient =
+ metaGroupMember
+ .getClientProvider()
+ .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
+ nextChildrenNodes =
+ syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
+ }
+ }
+ if (nextChildrenNodes != null) {
+ break;
+ }
+ } catch (IOException e) {
+ logger.error(LOG_FAIL_CONNECT, node, e);
+ } catch (TException e) {
+ logger.error("Error occurs when getting node lists in node {}.", node, e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted when getting node lists in node {}.", node, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ return nextChildrenNodes;
+ }
+
+ @Override
protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 03a78a9..de815b3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -437,6 +437,14 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public void getChildNodeInNextLevel(
+ Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+ DataAsyncService service =
+ getDataAsyncService(header, resultHandler, "Get child node in next level");
+ service.getChildNodeInNextLevel(header, path, resultHandler);
+ }
+
+ @Override
public void getChildNodePathInNextLevel(
Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
DataAsyncService service =
@@ -767,6 +775,11 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public Set<String> getChildNodeInNextLevel(Node header, String path) throws TException {
+ return getDataSyncService(header).getChildNodeInNextLevel(header, path);
+ }
+
+ @Override
public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
return getDataSyncService(header).getChildNodePathInNextLevel(header, path);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetChildNodeNextLevelHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetChildNodeNextLevelHandler.java
new file mode 100644
index 0000000..f3bad14
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetChildNodeNextLevelHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.caller;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class GetChildNodeNextLevelHandler implements AsyncMethodCallback<Set<String>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(GetChildNodeNextLevelHandler.class);
+
+ private Node contact;
+ private AtomicReference<Set<String>> result;
+
+ @Override
+ public void onComplete(Set<String> resp) {
+ logger.info("Received child node next level from {}", contact);
+ synchronized (result) {
+ result.set(resp);
+ result.notifyAll();
+ }
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ logger.warn("Cannot get child node next level from {}, because", contact, exception);
+ }
+
+ public void setResponse(AtomicReference<Set<String>> response) {
+ this.result = response;
+ }
+
+ public void setContact(Node contact) {
+ this.contact = contact;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 99fe858..256feb2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -270,6 +270,17 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
}
@Override
+ public void getChildNodeInNextLevel(
+ Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+ try {
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
+ resultHandler.onComplete(((CMManager) IoTDB.metaManager).getChildNodeInNextLevel(path));
+ } catch (CheckConsistencyException | MetadataException e) {
+ resultHandler.onError(e);
+ }
+ }
+
+ @Override
public void getChildNodePathInNextLevel(
Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index b3c5610..e0a5dab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -263,6 +263,16 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
}
@Override
+ public Set<String> getChildNodeInNextLevel(Node header, String path) throws TException {
+ try {
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
+ return ((CMManager) IoTDB.metaManager).getChildNodeInNextLevel(path);
+ } catch (CheckConsistencyException | MetadataException e) {
+ throw new TException(e);
+ }
+ }
+
+ @Override
public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 608c8a0..0b37d9d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -180,6 +180,12 @@ public class SyncClientAdaptorTest {
}
@Override
+ public void getChildNodeInNextLevel(
+ Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+ resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3")));
+ }
+
+ @Override
public void getChildNodePathInNextLevel(
Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3")));
@@ -358,6 +364,9 @@ public class SyncClientAdaptorTest {
SyncClientAdaptor.getNodeList(dataClient, TestUtils.getNode(0), "root", 0));
assertEquals(
new HashSet<>(Arrays.asList("1", "2", "3")),
+ SyncClientAdaptor.getChildNodeInNextLevel(dataClient, TestUtils.getNode(0), "root"));
+ assertEquals(
+ new HashSet<>(Arrays.asList("1", "2", "3")),
SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), "root"));
assertEquals(
getAllMeasurementSchemaResult,
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index a8d3baf..1c2b95a 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -373,6 +373,11 @@ service TSDataService extends RaftService {
list<string> getNodeList(1:Node header, 2:string path, 3:int nodeLevel)
+ /**
+ * Given path patterns(paths with wildcard), return all children nodes they match
+ **/
+ set<string> getChildNodeInNextLevel(1: Node header, 2: string path)
+
set<string> getChildNodePathInNextLevel(1: Node header, 2: string path)
binary getAllMeasurementSchema(1: Node header, 2: binary planBinary)