You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/18 05:28:16 UTC

[iotdb] branch master updated: [IOTDB-2025] Fix count nodes and devices incorrectly in cluster (#4416)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fedbb92  [IOTDB-2025] Fix count nodes and devices incorrectly in cluster (#4416)
fedbb92 is described below

commit fedbb92a5b2dd7a20dc7a048e7a973b74b2540c9
Author: BaiJian <er...@hotmail.com>
AuthorDate: Thu Nov 18 13:27:43 2021 +0800

    [IOTDB-2025] Fix count nodes and devices incorrectly in cluster (#4416)
---
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 80 +++++++++++++++++++---
 .../iotdb/db/integration/IoTDBMetadataFetchIT.java | 17 ++++-
 2 files changed, 84 insertions(+), 13 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index e849787..3eb0228 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
@@ -63,8 +64,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -138,7 +141,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
     logger.debug("The storage groups of path {} are {}", path, sgPathMap.keySet());
     int ret;
     try {
-      ret = getDeviceCount(sgPathMap);
+      ret = getDeviceCount(sgPathMap, path);
     } catch (CheckConsistencyException e) {
       throw new MetadataException(e);
     }
@@ -146,24 +149,40 @@ public class ClusterPlanExecutor extends PlanExecutor {
     return ret;
   }
 
-  private int getDeviceCount(Map<String, String> sgPathMap)
+  private int getDeviceCount(Map<String, String> sgPathMap, PartialPath queryPath)
       throws CheckConsistencyException, MetadataException {
     AtomicInteger result = new AtomicInteger();
     // split the paths by the data group they belong to
     Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
-    for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
-      String storageGroupName = sgPathEntry.getKey();
-      PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
+    for (String storageGroupName : sgPathMap.keySet()) {
+      PartialPath pathUnderSG = new PartialPath(storageGroupName);
       // find the data group that should hold the device schemas of the storage group
       PartitionGroup partitionGroup =
           metaGroupMember.getPartitionTable().route(storageGroupName, 0);
+      PartialPath targetPath;
+      // If storage group node length is larger than the one of queryPath, we query the device count
+      // of the storage group directly
+      if (pathUnderSG.getNodeLength() >= queryPath.getNodeLength()) {
+        targetPath = pathUnderSG;
+      } else {
+        // Or we replace the prefix of queryPath with the storage group as the target queryPath
+        String[] targetNodes = new String[queryPath.getNodeLength()];
+        for (int i = 0; i < queryPath.getNodeLength(); i++) {
+          if (i < pathUnderSG.getNodeLength()) {
+            targetNodes[i] = pathUnderSG.getNodes()[i];
+          } else {
+            targetNodes[i] = queryPath.getNodes()[i];
+          }
+        }
+        targetPath = new PartialPath(targetNodes);
+      }
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
         metaGroupMember
             .getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
             .syncLeaderWithConsistencyCheck(false);
-        int localResult = getLocalDeviceCount(pathUnderSG);
+        int localResult = getLocalDeviceCount(targetPath);
         logger.debug(
             "{}: get device count of {} locally, result {}",
             metaGroupMember.getName(),
@@ -174,7 +193,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
         // batch the queries of the same group to reduce communication
         groupPathMap
             .computeIfAbsent(partitionGroup, p -> new ArrayList<>())
-            .add(pathUnderSG.getFullPath());
+            .add(targetPath.getFullPath());
       }
     }
     if (groupPathMap.isEmpty()) {
@@ -284,14 +303,55 @@ public class ClusterPlanExecutor extends PlanExecutor {
       throw new MetadataException(e);
     }
 
-    Map<String, String> sgPathMap = IoTDB.metaManager.groupPathByStorageGroup(path);
+    // Here we append a ** to the path to query the storage groups which have the prefix as 'path',
+    // if path doesn't end with **.
+    // e.g. we have SG root.sg.a and root.sg.b, the query path is root.sg, we should return the map
+    // with key root.sg.a and root.sg.b instead of an empty one.
+    PartialPath wildcardPath = path.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD);
+    Map<String, String> sgPathMap = IoTDB.metaManager.groupPathByStorageGroup(wildcardPath);
     if (sgPathMap.isEmpty()) {
       throw new PathNotExistException(path.getFullPath());
     }
     logger.debug("The storage groups of path {} are {}", path, sgPathMap.keySet());
-    int ret;
+    int ret = 0;
     try {
-      ret = getPathCount(sgPathMap, level);
+      // level >= 0 is the COUNT NODE query
+      if (level >= 0) {
+        int prefixPartIdx = 0;
+        for (; prefixPartIdx < path.getNodeLength(); prefixPartIdx++) {
+          String currentPart = path.getNodes()[prefixPartIdx];
+          if (currentPart.equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+            break;
+          } else if (currentPart.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
+            // Only level equals the first * occurred level, e.g. root.sg.d1.* and level = 4, the
+            // query makes sense.
+            if (level != prefixPartIdx) {
+              return 0;
+            }
+            break;
+          }
+        }
+        // if level is less than the query path level, there's no suitable node
+        if (level < prefixPartIdx - 1) {
+          return 0;
+        }
+        Set<String> deletedSg = new HashSet<>();
+        Set<PartialPath> matchedPath = new HashSet<>(0);
+        for (String sg : sgPathMap.keySet()) {
+          PartialPath p = new PartialPath(sg);
+          // if the storage group path level is larger than the query level, then the prefix must be
+          // a suitable node and there's no need to query children nodes later
+          if (p.getNodeLength() - 1 >= level) {
+            deletedSg.add(sg);
+            matchedPath.add(new PartialPath(Arrays.copyOfRange(p.getNodes(), 0, level + 1)));
+          }
+        }
+        for (String sg : deletedSg) {
+          sgPathMap.remove(sg);
+        }
+        ret += matchedPath.size();
+      }
+      ret += getPathCount(sgPathMap, level);
     } catch (CheckConsistencyException e) {
       throw new MetadataException(e);
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
index d6424d9..da395be 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
@@ -24,7 +24,10 @@ import org.apache.iotdb.jdbc.Config;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -549,8 +552,16 @@ public class IoTDBMetadataFetchIT {
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
-      String[] sqls = new String[] {"COUNT NODES root.** level=1"};
-      String[] standards = new String[] {"3,\n"};
+      String[] sqls =
+          new String[] {
+            "COUNT NODES root.** level=1",
+            "COUNT NODES root.ln level=1",
+            "COUNT NODES root.ln.wf01.** level=1",
+            "COUNT NODES root.ln.wf01.* level=2",
+            "COUNT NODES root.ln.wf01.* level=3",
+            "COUNT NODES root.ln.wf01.* level=4"
+          };
+      String[] standards = new String[] {"3,\n", "1,\n", "0,\n", "0,\n", "1,\n", "0,\n"};
       for (int n = 0; n < sqls.length; n++) {
         String sql = sqls[n];
         String standard = standards[n];