You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/01/21 11:43:10 UTC

[iotdb] branch master updated: [IOTDB-2462] Fix ALIGN BY DEVICE query in cluster mode (#4947)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ae8e9da  [IOTDB-2462] Fix ALIGN BY DEVICE query in cluster mode (#4947)
ae8e9da is described below

commit ae8e9dad8173df7dc7b94aba67fa1aba46b31dc1
Author: BaiJian <er...@hotmail.com>
AuthorDate: Fri Jan 21 19:42:36 2022 +0800

    [IOTDB-2462] Fix ALIGN BY DEVICE query in cluster mode (#4947)
---
 .../cluster/client/sync/SyncClientAdaptor.java     |  4 +--
 .../apache/iotdb/cluster/metadata/CMManager.java   | 42 +++++++++++++---------
 .../cluster/server/service/DataAsyncService.java   |  7 ++--
 .../server/service/DataGroupServiceImpls.java      | 14 +++++---
 .../cluster/server/service/DataSyncService.java    |  5 +--
 .../cluster/client/sync/SyncClientAdaptorTest.java |  7 ++--
 .../org/apache/iotdb/db/metadata/MManager.java     | 12 +------
 .../db/tools/virtualsg/DeviceMappingViewer.java    |  2 +-
 .../iotdb/db/metadata/MManagerBasicTest.java       | 10 +++---
 thrift-cluster/src/main/thrift/cluster.thrift      |  2 +-
 10 files changed, 59 insertions(+), 46 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 36ca50e..7f16551 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -381,12 +381,12 @@ public class SyncClientAdaptor {
   }
 
   public static Set<String> getAllDevices(
-      AsyncDataClient client, RaftNode header, List<String> pathsToQuery)
+      AsyncDataClient client, RaftNode header, List<String> pathsToQuery, boolean isPrefixMatch)
       throws InterruptedException, TException {
     AtomicReference<Set<String>> remoteResult = new AtomicReference<>();
     GenericHandler<Set<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
 
-    client.getAllDevices(header, pathsToQuery, handler);
+    client.getAllDevices(header, pathsToQuery, isPrefixMatch, handler);
     return handler.getResult(ClusterConstant.getReadOperationTimeoutMS());
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index dcdfcf1..6b2e15d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -923,12 +923,15 @@ public class CMManager extends MManager {
   /**
    * Get all devices after removing wildcards in the path
    *
-   * @param originPath a path potentially with wildcard
-   * @return all paths after removing wildcards in the path
+   * @param originPath a path potentially with wildcard.
+   * @param isPrefixMatch if true, the path pattern is used to match prefix path
+   * @return all paths after removing wildcards in the path.
    */
-  public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException {
+  @Override
+  public Set<PartialPath> getMatchedDevices(PartialPath originPath, boolean isPrefixMatch)
+      throws MetadataException {
     Map<String, List<PartialPath>> sgPathMap = groupPathByStorageGroup(originPath);
-    Set<PartialPath> ret = getMatchedDevices(sgPathMap);
+    Set<PartialPath> ret = getMatchedDevices(sgPathMap, isPrefixMatch);
     logger.debug("The devices of path {} are {}", originPath, ret);
     return ret;
   }
@@ -1088,10 +1091,11 @@ public class CMManager extends MManager {
    *
    * @param sgPathMap the key is the storage group name and the value is the path pattern to be
    *     queried with storage group added
+   * @param isPrefixMatch if true, the path pattern is used to match prefix path
    * @return a collection of all queried devices
    */
-  private Set<PartialPath> getMatchedDevices(Map<String, List<PartialPath>> sgPathMap)
-      throws MetadataException {
+  private Set<PartialPath> getMatchedDevices(
+      Map<String, List<PartialPath>> sgPathMap, boolean isPrefixMatch) throws MetadataException {
     Set<PartialPath> result = new HashSet<>();
     // split the paths by the data group they belong to
     Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
@@ -1113,7 +1117,7 @@ public class CMManager extends MManager {
         }
         Set<PartialPath> allDevices = new HashSet<>();
         for (PartialPath path : paths) {
-          allDevices.addAll(super.getMatchedDevices(path));
+          allDevices.addAll(super.getMatchedDevices(path, isPrefixMatch));
         }
         logger.debug(
             "{}: get matched paths of {} locally, result {}",
@@ -1136,19 +1140,21 @@ public class CMManager extends MManager {
       PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
       List<String> pathsToQuery = partitionGroupPathEntry.getValue();
 
-      result.addAll(getMatchedDevices(partitionGroup, pathsToQuery));
+      result.addAll(getMatchedDevices(partitionGroup, pathsToQuery, isPrefixMatch));
     }
 
     return result;
   }
 
   private Set<PartialPath> getMatchedDevices(
-      PartitionGroup partitionGroup, List<String> pathsToQuery) throws MetadataException {
+      PartitionGroup partitionGroup, List<String> pathsToQuery, boolean isPrefixMatch)
+      throws MetadataException {
     // choose the node with lowest latency or highest throughput
     List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
     for (Node node : coordinatedNodes) {
       try {
-        Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
+        Set<String> paths =
+            getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery, isPrefixMatch);
         logger.debug(
             "{}: get matched paths of {} from {}, result {} for {}",
             metaGroupMember.getName(),
@@ -1175,14 +1181,15 @@ public class CMManager extends MManager {
     return Collections.emptySet();
   }
 
-  private Set<String> getMatchedDevices(Node node, RaftNode header, List<String> pathsToQuery)
+  private Set<String> getMatchedDevices(
+      Node node, RaftNode header, List<String> pathsToQuery, boolean isPrefixMatch)
       throws IOException, TException, InterruptedException {
     Set<String> paths;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       AsyncDataClient client =
           ClusterIoTDB.getInstance()
               .getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
-      paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery);
+      paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery, isPrefixMatch);
     } else {
       SyncDataClient syncDataClient = null;
       try {
@@ -1190,7 +1197,7 @@ public class CMManager extends MManager {
             ClusterIoTDB.getInstance()
                 .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
         try {
-          paths = syncDataClient.getAllDevices(header, pathsToQuery);
+          paths = syncDataClient.getAllDevices(header, pathsToQuery, isPrefixMatch);
         } catch (TException e) {
           // the connection may be broken, close it to avoid it being reused
           syncDataClient.close();
@@ -1321,12 +1328,15 @@ public class CMManager extends MManager {
   /**
    * Get the local devices that match any path in "paths". The result is deduplicated.
    *
-   * @param paths paths potentially contain wildcards
+   * @param paths paths potentially contain wildcards.
+   * @param isPrefixMatch if true, the path pattern is used to match prefix path.
+   * @return A HashSet instance which stores devices paths matching the given path pattern.
    */
-  public Set<String> getAllDevices(List<String> paths) throws MetadataException {
+  public Set<String> getAllDevices(List<String> paths, boolean isPrefixMatch)
+      throws MetadataException {
     Set<String> results = new HashSet<>();
     for (String path : paths) {
-      this.getMatchedDevices(new PartialPath(path)).stream()
+      this.getMatchedDevices(new PartialPath(path), isPrefixMatch).stream()
           .map(PartialPath::getFullPath)
           .forEach(results::add);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 2a218b8..eaa3ca8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -304,10 +304,13 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
 
   @Override
   public void getAllDevices(
-      RaftNode header, List<String> path, AsyncMethodCallback<Set<String>> resultHandler) {
+      RaftNode header,
+      List<String> path,
+      boolean isPrefixMatch,
+      AsyncMethodCallback<Set<String>> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
-      resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllDevices(path));
+      resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllDevices(path, isPrefixMatch));
     } catch (MetadataException | CheckConsistencyException e) {
       resultHandler.onError(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index 2fec073..9b15dc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -282,11 +282,14 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getAllDevices(
-      RaftNode header, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler) {
+      RaftNode header,
+      List<String> paths,
+      boolean isPrefixMatch,
+      AsyncMethodCallback<Set<String>> resultHandler) {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Get all devices");
     if (service != null) {
-      service.getAllDevices(header, paths, resultHandler);
+      service.getAllDevices(header, paths, isPrefixMatch, resultHandler);
     }
   }
 
@@ -519,8 +522,11 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public Set<String> getAllDevices(RaftNode header, List<String> path) throws TException {
-    return DataGroupEngine.getInstance().getDataSyncService(header).getAllDevices(header, path);
+  public Set<String> getAllDevices(RaftNode header, List<String> path, boolean isPrefixMatch)
+      throws TException {
+    return DataGroupEngine.getInstance()
+        .getDataSyncService(header)
+        .getAllDevices(header, path, isPrefixMatch);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index d023b79..6d15e48 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -293,10 +293,11 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public Set<String> getAllDevices(RaftNode header, List<String> path) throws TException {
+  public Set<String> getAllDevices(RaftNode header, List<String> path, boolean isPrefixMatch)
+      throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
-      return ((CMManager) IoTDB.metaManager).getAllDevices(path);
+      return ((CMManager) IoTDB.metaManager).getAllDevices(path, isPrefixMatch);
     } catch (MetadataException | CheckConsistencyException e) {
       throw new TException(e);
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 1d14e9c..9618947 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -263,7 +263,10 @@ public class SyncClientAdaptorTest {
 
           @Override
           public void getAllDevices(
-              RaftNode header, List<String> path, AsyncMethodCallback<Set<String>> resultHandler) {
+              RaftNode header,
+              List<String> path,
+              boolean isPrefixMatch,
+              AsyncMethodCallback<Set<String>> resultHandler) {
             resultHandler.onComplete(new HashSet<>(path));
           }
 
@@ -405,7 +408,7 @@ public class SyncClientAdaptorTest {
         (int) SyncClientAdaptor.getPathCount(dataClient, TestUtils.getRaftNode(0, 0), paths, 0));
     assertEquals(
         new HashSet<>(paths),
-        SyncClientAdaptor.getAllDevices(dataClient, TestUtils.getRaftNode(0, 0), paths));
+        SyncClientAdaptor.getAllDevices(dataClient, TestUtils.getRaftNode(0, 0), paths, false));
     assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
     assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
     assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 0c5f234..d1aaf1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1223,7 +1223,7 @@ public class MManager {
    * to match prefix path. All timeseries start with the matched prefix path will be collected.
    *
    * @param pathPattern the pattern of the target devices.
-   * @param isPrefixMatch if true, the path pattern is used to match prefix path
+   * @param isPrefixMatch if true, the path pattern is used to match prefix path.
    * @return A HashSet instance which stores devices paths matching the given path pattern.
    */
   public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
@@ -1232,16 +1232,6 @@ public class MManager {
   }
 
   /**
-   * Get all device paths matching the path pattern.
-   *
-   * @param pathPattern the pattern of the target devices.
-   * @return A HashSet instance which stores devices paths matching the given path pattern.
-   */
-  public Set<PartialPath> getMatchedDevices(PartialPath pathPattern) throws MetadataException {
-    return getMatchedDevices(pathPattern, false);
-  }
-
-  /**
    * Get all device paths and according storage group paths as ShowDevicesResult.
    *
    * @param plan ShowDevicesPlan which contains the path pattern and restriction params.
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java b/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
index cbfd948..33eb93d 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
@@ -43,7 +43,7 @@ public class DeviceMappingViewer {
     MManager mManager = MManager.getInstance();
     mManager.init();
 
-    Set<PartialPath> partialPathSet = mManager.getMatchedDevices(new PartialPath("root.**"));
+    Set<PartialPath> partialPathSet = mManager.getMatchedDevices(new PartialPath("root.**"), false);
 
     if (partialPathSet.isEmpty() && args.length == 1) {
       System.out.println("no mlog in given system schema dir: " + args[0] + " please have a check");
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index bdf8542..71c6233 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -479,7 +479,7 @@ public class MManagerBasicTest {
       // prefix with *
       assertEquals(
           devices,
-          manager.getMatchedDevices(new PartialPath("root.**")).stream()
+          manager.getMatchedDevices(new PartialPath("root.**"), false).stream()
               .map(PartialPath::getFullPath)
               .collect(Collectors.toSet()));
 
@@ -492,7 +492,7 @@ public class MManagerBasicTest {
       // prefix with *
       assertEquals(
           devices,
-          manager.getMatchedDevices(new PartialPath("root.**")).stream()
+          manager.getMatchedDevices(new PartialPath("root.**"), false).stream()
               .map(PartialPath::getFullPath)
               .collect(Collectors.toSet()));
 
@@ -506,7 +506,7 @@ public class MManagerBasicTest {
       // prefix with *
       assertEquals(
           devices,
-          recoverManager.getMatchedDevices(new PartialPath("root.**")).stream()
+          recoverManager.getMatchedDevices(new PartialPath("root.**"), false).stream()
               .map(PartialPath::getFullPath)
               .collect(Collectors.toSet()));
 
@@ -745,7 +745,7 @@ public class MManagerBasicTest {
       // usual condition
       assertEquals(
           devices,
-          manager.getMatchedDevices(new PartialPath("root.laptop.**")).stream()
+          manager.getMatchedDevices(new PartialPath("root.laptop.**"), false).stream()
               .map(PartialPath::getFullPath)
               .collect(Collectors.toSet()));
       manager.setStorageGroup(new PartialPath("root.vehicle"));
@@ -759,7 +759,7 @@ public class MManagerBasicTest {
       // prefix with *
       assertEquals(
           devices,
-          manager.getMatchedDevices(new PartialPath("root.**")).stream()
+          manager.getMatchedDevices(new PartialPath("root.**"), false).stream()
               .map(PartialPath::getFullPath)
               .collect(Collectors.toSet()));
     } catch (MetadataException e) {
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 1bbd9e3..b444dc8 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -402,7 +402,7 @@ service TSDataService extends RaftService {
   /**
    * Given path patterns (paths with wildcard), return all devices they match.
    **/
-  set<string> getAllDevices(1:RaftNode header, 2:list<string> path)
+  set<string> getAllDevices(1:RaftNode header, 2:list<string> path, 3: bool isPrefixMatch)
 
   /**
    * Get the devices from the header according to the showDevicesPlan