You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/18 05:28:16 UTC
[iotdb] branch master updated: [IOTDB-2025] Fix count nodes and devices incorrectly in cluster (#4416)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fedbb92 [IOTDB-2025] Fix count nodes and devices incorrectly in cluster (#4416)
fedbb92 is described below
commit fedbb92a5b2dd7a20dc7a048e7a973b74b2540c9
Author: BaiJian <er...@hotmail.com>
AuthorDate: Thu Nov 18 13:27:43 2021 +0800
[IOTDB-2025] Fix count nodes and devices incorrectly in cluster (#4416)
---
.../iotdb/cluster/query/ClusterPlanExecutor.java | 80 +++++++++++++++++++---
.../iotdb/db/integration/IoTDBMetadataFetchIT.java | 17 ++++-
2 files changed, 84 insertions(+), 13 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index e849787..3eb0228 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
@@ -63,8 +64,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -138,7 +141,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
logger.debug("The storage groups of path {} are {}", path, sgPathMap.keySet());
int ret;
try {
- ret = getDeviceCount(sgPathMap);
+ ret = getDeviceCount(sgPathMap, path);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
@@ -146,24 +149,40 @@ public class ClusterPlanExecutor extends PlanExecutor {
return ret;
}
- private int getDeviceCount(Map<String, String> sgPathMap)
+ private int getDeviceCount(Map<String, String> sgPathMap, PartialPath queryPath)
throws CheckConsistencyException, MetadataException {
AtomicInteger result = new AtomicInteger();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
- for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
- String storageGroupName = sgPathEntry.getKey();
- PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
+ for (String storageGroupName : sgPathMap.keySet()) {
+ PartialPath pathUnderSG = new PartialPath(storageGroupName);
// find the data group that should hold the device schemas of the storage group
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName, 0);
+ PartialPath targetPath;
+ // If storage group node length is larger than the one of queryPath, we query the device count
+ // of the storage group directly
+ if (pathUnderSG.getNodeLength() >= queryPath.getNodeLength()) {
+ targetPath = pathUnderSG;
+ } else {
+ // Or we replace the prefix of queryPath with the storage group as the target queryPath
+ String[] targetNodes = new String[queryPath.getNodeLength()];
+ for (int i = 0; i < queryPath.getNodeLength(); i++) {
+ if (i < pathUnderSG.getNodeLength()) {
+ targetNodes[i] = pathUnderSG.getNodes()[i];
+ } else {
+ targetNodes[i] = queryPath.getNodes()[i];
+ }
+ }
+ targetPath = new PartialPath(targetNodes);
+ }
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
metaGroupMember
.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
.syncLeaderWithConsistencyCheck(false);
- int localResult = getLocalDeviceCount(pathUnderSG);
+ int localResult = getLocalDeviceCount(targetPath);
logger.debug(
"{}: get device count of {} locally, result {}",
metaGroupMember.getName(),
@@ -174,7 +193,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
// batch the queries of the same group to reduce communication
groupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
- .add(pathUnderSG.getFullPath());
+ .add(targetPath.getFullPath());
}
}
if (groupPathMap.isEmpty()) {
@@ -284,14 +303,55 @@ public class ClusterPlanExecutor extends PlanExecutor {
throw new MetadataException(e);
}
- Map<String, String> sgPathMap = IoTDB.metaManager.groupPathByStorageGroup(path);
+ // Here we append a ** to the path to query the storage groups which have the prefix as 'path',
+ // if path doesn't end with **.
+ // e.g. we have SG root.sg.a and root.sg.b, the query path is root.sg, we should return the map
+ // with key root.sg.a and root.sg.b instead of an empty one.
+ PartialPath wildcardPath = path.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD);
+ Map<String, String> sgPathMap = IoTDB.metaManager.groupPathByStorageGroup(wildcardPath);
if (sgPathMap.isEmpty()) {
throw new PathNotExistException(path.getFullPath());
}
logger.debug("The storage groups of path {} are {}", path, sgPathMap.keySet());
- int ret;
+ int ret = 0;
try {
- ret = getPathCount(sgPathMap, level);
+ // level >= 0 is the COUNT NODE query
+ if (level >= 0) {
+ int prefixPartIdx = 0;
+ for (; prefixPartIdx < path.getNodeLength(); prefixPartIdx++) {
+ String currentPart = path.getNodes()[prefixPartIdx];
+ if (currentPart.equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+ break;
+ } else if (currentPart.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
+ // Only level equals the first * occurred level, e.g. root.sg.d1.* and level = 4, the
+ // query makes sense.
+ if (level != prefixPartIdx) {
+ return 0;
+ }
+ break;
+ }
+ }
+ // if level is less than the query path level, there's no suitable node
+ if (level < prefixPartIdx - 1) {
+ return 0;
+ }
+ Set<String> deletedSg = new HashSet<>();
+ Set<PartialPath> matchedPath = new HashSet<>(0);
+ for (String sg : sgPathMap.keySet()) {
+ PartialPath p = new PartialPath(sg);
+ // if the storage group path level is larger than the query level, then the prefix must be
+ // a suitable node and there's no need to query children nodes later
+ if (p.getNodeLength() - 1 >= level) {
+ deletedSg.add(sg);
+ matchedPath.add(new PartialPath(Arrays.copyOfRange(p.getNodes(), 0, level + 1)));
+ }
+ }
+ for (String sg : deletedSg) {
+ sgPathMap.remove(sg);
+ }
+ ret += matchedPath.size();
+ }
+ ret += getPathCount(sgPathMap, level);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
index d6424d9..da395be 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
@@ -24,7 +24,10 @@ import org.apache.iotdb.jdbc.Config;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -549,8 +552,16 @@ public class IoTDBMetadataFetchIT {
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- String[] sqls = new String[] {"COUNT NODES root.** level=1"};
- String[] standards = new String[] {"3,\n"};
+ String[] sqls =
+ new String[] {
+ "COUNT NODES root.** level=1",
+ "COUNT NODES root.ln level=1",
+ "COUNT NODES root.ln.wf01.** level=1",
+ "COUNT NODES root.ln.wf01.* level=2",
+ "COUNT NODES root.ln.wf01.* level=3",
+ "COUNT NODES root.ln.wf01.* level=4"
+ };
+ String[] standards = new String[] {"3,\n", "1,\n", "0,\n", "0,\n", "1,\n", "0,\n"};
for (int n = 0; n < sqls.length; n++) {
String sql = sqls[n];
String standard = standards[n];