You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/03/23 08:11:17 UTC

[iotdb] branch master updated: [IOTDB-1240]support show node command in cluster version (#2866)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 77cdc38  [IOTDB-1240]support show node command in cluster version (#2866)
77cdc38 is described below

commit 77cdc38eed2560fbac4a526acff3d3f6da772b3e
Author: Yifu Zhou <ef...@gmail.com>
AuthorDate: Tue Mar 23 16:10:54 2021 +0800

    [IOTDB-1240]support show node command in cluster version (#2866)
    
    * support show child nodes in cluster version
    
    * do not use * to import
    
    * fix bug
    
    * rerun ci
    
    * add IT
    
    Co-authored-by: YIFU ZHOU <>
---
 .../cluster/client/sync/SyncClientAdaptor.java     | 17 +++++
 .../apache/iotdb/cluster/metadata/CMManager.java   |  4 +
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 86 ++++++++++++++++++++++
 .../iotdb/cluster/server/DataClusterServer.java    | 13 ++++
 .../caller/GetChildNodeNextLevelHandler.java       | 59 +++++++++++++++
 .../cluster/server/service/DataAsyncService.java   | 11 +++
 .../cluster/server/service/DataSyncService.java    | 10 +++
 .../cluster/client/sync/SyncClientAdaptorTest.java |  9 +++
 thrift/src/main/thrift/cluster.thrift              |  5 ++
 9 files changed, 214 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index d518c62..6d73265 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.handlers.caller.GetChildNodeNextLevelHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GetChildNodeNextLevelPathHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GetDevicesHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GetNodesListHandler;
@@ -180,6 +181,22 @@ public class SyncClientAdaptor {
     return response.get();
   }
 
+  public static Set<String> getChildNodeInNextLevel(
+      AsyncDataClient client, Node header, String path) throws TException, InterruptedException {
+    GetChildNodeNextLevelHandler handler = new GetChildNodeNextLevelHandler();
+    AtomicReference<Set<String>> response = new AtomicReference<>(null);
+    handler.setResponse(response);
+    handler.setContact(client.getNode());
+
+    client.getChildNodeInNextLevel(header, path, handler);
+    synchronized (response) {
+      if (response.get() == null) {
+        response.wait(RaftServer.getReadOperationTimeoutMS());
+      }
+    }
+    return response.get();
+  }
+
   public static Set<String> getNextChildren(AsyncDataClient client, Node header, String path)
       throws TException, InterruptedException {
     GetChildNodeNextLevelPathHandler handler = new GetChildNodeNextLevelPathHandler();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index bf121d2..97904bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1411,6 +1411,10 @@ public class CMManager extends MManager {
         .collect(Collectors.toList());
   }
 
+  public Set<String> getChildNodeInNextLevel(String path) throws MetadataException {
+    return getChildNodeInNextLevel(new PartialPath(path));
+  }
+
   public Set<String> getChildNodePathInNextLevel(String path) throws MetadataException {
     return getChildNodePathInNextLevel(new PartialPath(path));
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index fd5f638..ca16cfb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -383,6 +383,92 @@ public class ClusterPlanExecutor extends PlanExecutor {
   }
 
   @Override
+  protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
+    ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
+    List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+    ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+    List<Future<Void>> futureList = new ArrayList<>();
+    for (PartitionGroup group : globalGroups) {
+      futureList.add(
+          pool.submit(
+              () -> {
+                Set<String> nextChildrenNodes = null;
+                try {
+                  nextChildrenNodes = getChildNodeInNextLevel(group, path);
+                } catch (CheckConsistencyException e) {
+                  logger.error("Fail to get next children nodes of {} from {}", path, group, e);
+                }
+                if (nextChildrenNodes != null) {
+                  resultSet.addAll(nextChildrenNodes);
+                } else {
+                  logger.error("Fail to get next children nodes of {} from {}", path, group);
+                }
+                return null;
+              }));
+    }
+    waitForThreadPool(futureList, pool, "getChildNodeInNextLevel()");
+    return resultSet;
+  }
+
+  private Set<String> getChildNodeInNextLevel(PartitionGroup group, PartialPath path)
+      throws CheckConsistencyException {
+    if (group.contains(metaGroupMember.getThisNode())) {
+      return getLocalChildNodeInNextLevel(group, path);
+    } else {
+      return getRemoteChildNodeInNextLevel(group, path);
+    }
+  }
+
+  private Set<String> getLocalChildNodeInNextLevel(PartitionGroup group, PartialPath path)
+      throws CheckConsistencyException {
+    Node header = group.getHeader();
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+    localDataMember.syncLeaderWithConsistencyCheck(false);
+    try {
+      return IoTDB.metaManager.getChildNodeInNextLevel(path);
+    } catch (MetadataException e) {
+      logger.error("Cannot not get next children nodes of {} from {} locally", path, group);
+      return Collections.emptySet();
+    }
+  }
+
+  private Set<String> getRemoteChildNodeInNextLevel(PartitionGroup group, PartialPath path) {
+    Set<String> nextChildrenNodes = null;
+    for (Node node : group) {
+      try {
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          AsyncDataClient client =
+              metaGroupMember
+                  .getClientProvider()
+                  .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+          nextChildrenNodes =
+              SyncClientAdaptor.getChildNodeInNextLevel(
+                  client, group.getHeader(), path.getFullPath());
+        } else {
+          try (SyncDataClient syncDataClient =
+              metaGroupMember
+                  .getClientProvider()
+                  .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
+            nextChildrenNodes =
+                syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
+          }
+        }
+        if (nextChildrenNodes != null) {
+          break;
+        }
+      } catch (IOException e) {
+        logger.error(LOG_FAIL_CONNECT, node, e);
+      } catch (TException e) {
+        logger.error("Error occurs when getting node lists in node {}.", node, e);
+      } catch (InterruptedException e) {
+        logger.error("Interrupted when getting node lists in node {}.", node, e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    return nextChildrenNodes;
+  }
+
+  @Override
   protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
     ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
     ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 03a78a9..de815b3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -437,6 +437,14 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public void getChildNodeInNextLevel(
+      Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+    DataAsyncService service =
+        getDataAsyncService(header, resultHandler, "Get child node in next level");
+    service.getChildNodeInNextLevel(header, path, resultHandler);
+  }
+
+  @Override
   public void getChildNodePathInNextLevel(
       Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
     DataAsyncService service =
@@ -767,6 +775,11 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public Set<String> getChildNodeInNextLevel(Node header, String path) throws TException {
+    return getDataSyncService(header).getChildNodeInNextLevel(header, path);
+  }
+
+  @Override
   public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
     return getDataSyncService(header).getChildNodePathInNextLevel(header, path);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetChildNodeNextLevelHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetChildNodeNextLevelHandler.java
new file mode 100644
index 0000000..f3bad14
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetChildNodeNextLevelHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.caller;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class GetChildNodeNextLevelHandler implements AsyncMethodCallback<Set<String>> {
+
+  private static final Logger logger = LoggerFactory.getLogger(GetChildNodeNextLevelHandler.class);
+
+  private Node contact;
+  private AtomicReference<Set<String>> result;
+
+  @Override
+  public void onComplete(Set<String> resp) {
+    logger.info("Received child node next level from {}", contact);
+    synchronized (result) {
+      result.set(resp);
+      result.notifyAll();
+    }
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    logger.warn("Cannot get child node next level from {}, because", contact, exception);
+  }
+
+  public void setResponse(AtomicReference<Set<String>> response) {
+    this.result = response;
+  }
+
+  public void setContact(Node contact) {
+    this.contact = contact;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 99fe858..256feb2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -270,6 +270,17 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
+  public void getChildNodeInNextLevel(
+      Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+    try {
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
+      resultHandler.onComplete(((CMManager) IoTDB.metaManager).getChildNodeInNextLevel(path));
+    } catch (CheckConsistencyException | MetadataException e) {
+      resultHandler.onError(e);
+    }
+  }
+
+  @Override
   public void getChildNodePathInNextLevel(
       Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
     try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index b3c5610..e0a5dab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -263,6 +263,16 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
+  public Set<String> getChildNodeInNextLevel(Node header, String path) throws TException {
+    try {
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
+      return ((CMManager) IoTDB.metaManager).getChildNodeInNextLevel(path);
+    } catch (CheckConsistencyException | MetadataException e) {
+      throw new TException(e);
+    }
+  }
+
+  @Override
   public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 608c8a0..0b37d9d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -180,6 +180,12 @@ public class SyncClientAdaptorTest {
           }
 
           @Override
+          public void getChildNodeInNextLevel(
+              Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+            resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3")));
+          }
+
+          @Override
           public void getChildNodePathInNextLevel(
               Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
             resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3")));
@@ -358,6 +364,9 @@ public class SyncClientAdaptorTest {
         SyncClientAdaptor.getNodeList(dataClient, TestUtils.getNode(0), "root", 0));
     assertEquals(
         new HashSet<>(Arrays.asList("1", "2", "3")),
+        SyncClientAdaptor.getChildNodeInNextLevel(dataClient, TestUtils.getNode(0), "root"));
+    assertEquals(
+        new HashSet<>(Arrays.asList("1", "2", "3")),
         SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), "root"));
     assertEquals(
         getAllMeasurementSchemaResult,
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index a8d3baf..1c2b95a 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -373,6 +373,11 @@ service TSDataService extends RaftService {
 
   list<string> getNodeList(1:Node header, 2:string path, 3:int nodeLevel)
 
+  /**
+   * Given path patterns(paths with wildcard), return all children nodes they match
+   **/
+  set<string> getChildNodeInNextLevel(1: Node header, 2: string path)
+
   set<string> getChildNodePathInNextLevel(1: Node header, 2: string path)
 
   binary getAllMeasurementSchema(1: Node header, 2: binary planBinary)