You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/01/21 11:43:10 UTC
[iotdb] branch master updated: [IOTDB-2462] Fix ALIGN BY DEVICE query in cluster mode (#4947)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ae8e9da [IOTDB-2462] Fix ALIGN BY DEVICE query in cluster mode (#4947)
ae8e9da is described below
commit ae8e9dad8173df7dc7b94aba67fa1aba46b31dc1
Author: BaiJian <er...@hotmail.com>
AuthorDate: Fri Jan 21 19:42:36 2022 +0800
[IOTDB-2462] Fix ALIGN BY DEVICE query in cluster mode (#4947)
---
.../cluster/client/sync/SyncClientAdaptor.java | 4 +--
.../apache/iotdb/cluster/metadata/CMManager.java | 42 +++++++++++++---------
.../cluster/server/service/DataAsyncService.java | 7 ++--
.../server/service/DataGroupServiceImpls.java | 14 +++++---
.../cluster/server/service/DataSyncService.java | 5 +--
.../cluster/client/sync/SyncClientAdaptorTest.java | 7 ++--
.../org/apache/iotdb/db/metadata/MManager.java | 12 +------
.../db/tools/virtualsg/DeviceMappingViewer.java | 2 +-
.../iotdb/db/metadata/MManagerBasicTest.java | 10 +++---
thrift-cluster/src/main/thrift/cluster.thrift | 2 +-
10 files changed, 59 insertions(+), 46 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 36ca50e..7f16551 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -381,12 +381,12 @@ public class SyncClientAdaptor {
}
public static Set<String> getAllDevices(
- AsyncDataClient client, RaftNode header, List<String> pathsToQuery)
+ AsyncDataClient client, RaftNode header, List<String> pathsToQuery, boolean isPrefixMatch)
throws InterruptedException, TException {
AtomicReference<Set<String>> remoteResult = new AtomicReference<>();
GenericHandler<Set<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
- client.getAllDevices(header, pathsToQuery, handler);
+ client.getAllDevices(header, pathsToQuery, isPrefixMatch, handler);
return handler.getResult(ClusterConstant.getReadOperationTimeoutMS());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index dcdfcf1..6b2e15d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -923,12 +923,15 @@ public class CMManager extends MManager {
/**
* Get all devices after removing wildcards in the path
*
- * @param originPath a path potentially with wildcard
- * @return all paths after removing wildcards in the path
+ * @param originPath a path potentially with wildcard.
+ * @param isPrefixMatch if true, the path pattern is used to match prefix path
+ * @return all paths after removing wildcards in the path.
*/
- public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException {
+ @Override
+ public Set<PartialPath> getMatchedDevices(PartialPath originPath, boolean isPrefixMatch)
+ throws MetadataException {
Map<String, List<PartialPath>> sgPathMap = groupPathByStorageGroup(originPath);
- Set<PartialPath> ret = getMatchedDevices(sgPathMap);
+ Set<PartialPath> ret = getMatchedDevices(sgPathMap, isPrefixMatch);
logger.debug("The devices of path {} are {}", originPath, ret);
return ret;
}
@@ -1088,10 +1091,11 @@ public class CMManager extends MManager {
*
* @param sgPathMap the key is the storage group name and the value is the path pattern to be
* queried with storage group added
+ * @param isPrefixMatch if true, the path pattern is used to match prefix path
* @return a collection of all queried devices
*/
- private Set<PartialPath> getMatchedDevices(Map<String, List<PartialPath>> sgPathMap)
- throws MetadataException {
+ private Set<PartialPath> getMatchedDevices(
+ Map<String, List<PartialPath>> sgPathMap, boolean isPrefixMatch) throws MetadataException {
Set<PartialPath> result = new HashSet<>();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
@@ -1113,7 +1117,7 @@ public class CMManager extends MManager {
}
Set<PartialPath> allDevices = new HashSet<>();
for (PartialPath path : paths) {
- allDevices.addAll(super.getMatchedDevices(path));
+ allDevices.addAll(super.getMatchedDevices(path, isPrefixMatch));
}
logger.debug(
"{}: get matched paths of {} locally, result {}",
@@ -1136,19 +1140,21 @@ public class CMManager extends MManager {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
- result.addAll(getMatchedDevices(partitionGroup, pathsToQuery));
+ result.addAll(getMatchedDevices(partitionGroup, pathsToQuery, isPrefixMatch));
}
return result;
}
private Set<PartialPath> getMatchedDevices(
- PartitionGroup partitionGroup, List<String> pathsToQuery) throws MetadataException {
+ PartitionGroup partitionGroup, List<String> pathsToQuery, boolean isPrefixMatch)
+ throws MetadataException {
// choose the node with lowest latency or highest throughput
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
- Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
+ Set<String> paths =
+ getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery, isPrefixMatch);
logger.debug(
"{}: get matched paths of {} from {}, result {} for {}",
metaGroupMember.getName(),
@@ -1175,14 +1181,15 @@ public class CMManager extends MManager {
return Collections.emptySet();
}
- private Set<String> getMatchedDevices(Node node, RaftNode header, List<String> pathsToQuery)
+ private Set<String> getMatchedDevices(
+ Node node, RaftNode header, List<String> pathsToQuery, boolean isPrefixMatch)
throws IOException, TException, InterruptedException {
Set<String> paths;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
- paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery);
+ paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery, isPrefixMatch);
} else {
SyncDataClient syncDataClient = null;
try {
@@ -1190,7 +1197,7 @@ public class CMManager extends MManager {
ClusterIoTDB.getInstance()
.getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
try {
- paths = syncDataClient.getAllDevices(header, pathsToQuery);
+ paths = syncDataClient.getAllDevices(header, pathsToQuery, isPrefixMatch);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.close();
@@ -1321,12 +1328,15 @@ public class CMManager extends MManager {
/**
* Get the local devices that match any path in "paths". The result is deduplicated.
*
- * @param paths paths potentially contain wildcards
+ * @param paths paths potentially contain wildcards.
+ * @param isPrefixMatch if true, the path pattern is used to match prefix path.
+ * @return A HashSet instance which stores devices paths matching the given path pattern.
*/
- public Set<String> getAllDevices(List<String> paths) throws MetadataException {
+ public Set<String> getAllDevices(List<String> paths, boolean isPrefixMatch)
+ throws MetadataException {
Set<String> results = new HashSet<>();
for (String path : paths) {
- this.getMatchedDevices(new PartialPath(path)).stream()
+ this.getMatchedDevices(new PartialPath(path), isPrefixMatch).stream()
.map(PartialPath::getFullPath)
.forEach(results::add);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 2a218b8..eaa3ca8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -304,10 +304,13 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
@Override
public void getAllDevices(
- RaftNode header, List<String> path, AsyncMethodCallback<Set<String>> resultHandler) {
+ RaftNode header,
+ List<String> path,
+ boolean isPrefixMatch,
+ AsyncMethodCallback<Set<String>> resultHandler) {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllDevices(path));
+ resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllDevices(path, isPrefixMatch));
} catch (MetadataException | CheckConsistencyException e) {
resultHandler.onError(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index 2fec073..9b15dc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -282,11 +282,14 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getAllDevices(
- RaftNode header, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler) {
+ RaftNode header,
+ List<String> paths,
+ boolean isPrefixMatch,
+ AsyncMethodCallback<Set<String>> resultHandler) {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Get all devices");
if (service != null) {
- service.getAllDevices(header, paths, resultHandler);
+ service.getAllDevices(header, paths, isPrefixMatch, resultHandler);
}
}
@@ -519,8 +522,11 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public Set<String> getAllDevices(RaftNode header, List<String> path) throws TException {
- return DataGroupEngine.getInstance().getDataSyncService(header).getAllDevices(header, path);
+ public Set<String> getAllDevices(RaftNode header, List<String> path, boolean isPrefixMatch)
+ throws TException {
+ return DataGroupEngine.getInstance()
+ .getDataSyncService(header)
+ .getAllDevices(header, path, isPrefixMatch);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index d023b79..6d15e48 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -293,10 +293,11 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
}
@Override
- public Set<String> getAllDevices(RaftNode header, List<String> path) throws TException {
+ public Set<String> getAllDevices(RaftNode header, List<String> path, boolean isPrefixMatch)
+ throws TException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- return ((CMManager) IoTDB.metaManager).getAllDevices(path);
+ return ((CMManager) IoTDB.metaManager).getAllDevices(path, isPrefixMatch);
} catch (MetadataException | CheckConsistencyException e) {
throw new TException(e);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 1d14e9c..9618947 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -263,7 +263,10 @@ public class SyncClientAdaptorTest {
@Override
public void getAllDevices(
- RaftNode header, List<String> path, AsyncMethodCallback<Set<String>> resultHandler) {
+ RaftNode header,
+ List<String> path,
+ boolean isPrefixMatch,
+ AsyncMethodCallback<Set<String>> resultHandler) {
resultHandler.onComplete(new HashSet<>(path));
}
@@ -405,7 +408,7 @@ public class SyncClientAdaptorTest {
(int) SyncClientAdaptor.getPathCount(dataClient, TestUtils.getRaftNode(0, 0), paths, 0));
assertEquals(
new HashSet<>(paths),
- SyncClientAdaptor.getAllDevices(dataClient, TestUtils.getRaftNode(0, 0), paths));
+ SyncClientAdaptor.getAllDevices(dataClient, TestUtils.getRaftNode(0, 0), paths, false));
assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 0c5f234..d1aaf1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1223,7 +1223,7 @@ public class MManager {
* to match prefix path. All timeseries start with the matched prefix path will be collected.
*
* @param pathPattern the pattern of the target devices.
- * @param isPrefixMatch if true, the path pattern is used to match prefix path
+ * @param isPrefixMatch if true, the path pattern is used to match prefix path.
* @return A HashSet instance which stores devices paths matching the given path pattern.
*/
public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
@@ -1232,16 +1232,6 @@ public class MManager {
}
/**
- * Get all device paths matching the path pattern.
- *
- * @param pathPattern the pattern of the target devices.
- * @return A HashSet instance which stores devices paths matching the given path pattern.
- */
- public Set<PartialPath> getMatchedDevices(PartialPath pathPattern) throws MetadataException {
- return getMatchedDevices(pathPattern, false);
- }
-
- /**
* Get all device paths and according storage group paths as ShowDevicesResult.
*
* @param plan ShowDevicesPlan which contains the path pattern and restriction params.
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java b/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
index cbfd948..33eb93d 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
@@ -43,7 +43,7 @@ public class DeviceMappingViewer {
MManager mManager = MManager.getInstance();
mManager.init();
- Set<PartialPath> partialPathSet = mManager.getMatchedDevices(new PartialPath("root.**"));
+ Set<PartialPath> partialPathSet = mManager.getMatchedDevices(new PartialPath("root.**"), false);
if (partialPathSet.isEmpty() && args.length == 1) {
System.out.println("no mlog in given system schema dir: " + args[0] + " please have a check");
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index bdf8542..71c6233 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -479,7 +479,7 @@ public class MManagerBasicTest {
// prefix with *
assertEquals(
devices,
- manager.getMatchedDevices(new PartialPath("root.**")).stream()
+ manager.getMatchedDevices(new PartialPath("root.**"), false).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toSet()));
@@ -492,7 +492,7 @@ public class MManagerBasicTest {
// prefix with *
assertEquals(
devices,
- manager.getMatchedDevices(new PartialPath("root.**")).stream()
+ manager.getMatchedDevices(new PartialPath("root.**"), false).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toSet()));
@@ -506,7 +506,7 @@ public class MManagerBasicTest {
// prefix with *
assertEquals(
devices,
- recoverManager.getMatchedDevices(new PartialPath("root.**")).stream()
+ recoverManager.getMatchedDevices(new PartialPath("root.**"), false).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toSet()));
@@ -745,7 +745,7 @@ public class MManagerBasicTest {
// usual condition
assertEquals(
devices,
- manager.getMatchedDevices(new PartialPath("root.laptop.**")).stream()
+ manager.getMatchedDevices(new PartialPath("root.laptop.**"), false).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toSet()));
manager.setStorageGroup(new PartialPath("root.vehicle"));
@@ -759,7 +759,7 @@ public class MManagerBasicTest {
// prefix with *
assertEquals(
devices,
- manager.getMatchedDevices(new PartialPath("root.**")).stream()
+ manager.getMatchedDevices(new PartialPath("root.**"), false).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toSet()));
} catch (MetadataException e) {
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 1bbd9e3..b444dc8 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -402,7 +402,7 @@ service TSDataService extends RaftService {
/**
* Given path patterns (paths with wildcard), return all devices they match.
**/
- set<string> getAllDevices(1:RaftNode header, 2:list<string> path)
+ set<string> getAllDevices(1:RaftNode header, 2:list<string> path, 3: bool isPrefixMatch)
/**
* Get the devices from the header according to the showDevicesPlan