You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2022/08/10 10:07:10 UTC
[iotdb] branch master updated: [IOTDB-4045] Optimize DataNode query relevant interfaces (#6944)
This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f12e15eb03 [IOTDB-4045] Optimize DataNode query relevant interfaces (#6944)
f12e15eb03 is described below
commit f12e15eb03023e9c86d5c2e2be497fb7edc0f614
Author: 23931017wu <71...@users.noreply.github.com>
AuthorDate: Wed Aug 10 18:07:06 2022 +0800
[IOTDB-4045] Optimize DataNode query relevant interfaces (#6944)
---
.../confignode/manager/ClusterSchemaManager.java | 4 +--
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../iotdb/confignode/manager/NodeManager.java | 38 ++++++++++++++++------
.../confignode/manager/PermissionManager.java | 2 +-
.../iotdb/confignode/manager/UDFManager.java | 4 +--
.../iotdb/confignode/manager/load/LoadManager.java | 16 ++++-----
.../manager/load/balancer/RegionBalancer.java | 2 +-
.../manager/load/balancer/RouteBalancer.java | 2 +-
.../iotdb/confignode/persistence/NodeInfo.java | 16 ++-------
.../procedure/env/ConfigNodeProcedureEnv.java | 4 +--
.../procedure/env/DataNodeRemoveHandler.java | 6 ++--
.../iotdb/confignode/persistence/NodeInfoTest.java | 4 +--
12 files changed, 54 insertions(+), 46 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 638f6b5332..d203e18f5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -407,7 +407,7 @@ public class ClusterSchemaManager {
// sync template set info to all dataNodes
TSStatus status;
List<TDataNodeConfiguration> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1);
+ configManager.getNodeManager().getRegisteredDataNodes();
for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
status =
SyncDataNodeClientPool.getInstance()
@@ -447,7 +447,7 @@ public class ClusterSchemaManager {
// get all dataNodes
List<TDataNodeConfiguration> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1);
+ configManager.getNodeManager().getRegisteredDataNodes();
// send rollbackReq
TSStatus status;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 3d9a04a89d..b8176c23b4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -240,7 +240,7 @@ public class ConfigManager implements IManager {
List<TConfigNodeLocation> configNodeLocations = getNodeManager().getRegisteredConfigNodes();
configNodeLocations.sort(Comparator.comparingInt(TConfigNodeLocation::getConfigNodeId));
List<TDataNodeLocation> dataNodeInfoLocations =
- getNodeManager().getRegisteredDataNodes(-1).stream()
+ getNodeManager().getRegisteredDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)
.sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
.collect(Collectors.toList());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index cce00c155a..fd92da65c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -60,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
@@ -205,17 +206,16 @@ public class NodeManager {
* Only leader use this interface
*
* @param dataNodeId Specific DataNodeId
- * @return All registered DataNodes if dataNodeId equals -1. And return the specific DataNode
- * otherwise.
+ * @return All registered DataNodes
*/
- public List<TDataNodeConfiguration> getRegisteredDataNodes(int dataNodeId) {
- return nodeInfo.getRegisteredDataNodes(dataNodeId);
+ public List<TDataNodeConfiguration> getRegisteredDataNodes() {
+ return nodeInfo.getRegisteredDataNodes();
}
- public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations(int dataNodeId) {
+ public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
nodeInfo
- .getRegisteredDataNodes(dataNodeId)
+ .getRegisteredDataNodes()
.forEach(
dataNodeConfiguration ->
dataNodeLocations.put(
@@ -235,7 +235,7 @@ public class NodeManager {
public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
- List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes(-1);
+ List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
if (registeredDataNodes != null) {
registeredDataNodes.forEach(
(dataNodeInfo) -> {
@@ -373,7 +373,13 @@ public class NodeManager {
public List<TSStatus> merge(TMergeReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ if (req.dataNodeId != -1) {
+ dataNodeLocationMap =
+ dataNodeLocationMap.entrySet().stream()
+ .filter((e) -> req.dataNodeId == e.getKey())
+ .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
+ }
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
@@ -384,7 +390,13 @@ public class NodeManager {
public List<TSStatus> flush(TFlushReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ if (req.dataNodeId != -1) {
+ dataNodeLocationMap =
+ dataNodeLocationMap.entrySet().stream()
+ .filter((e) -> req.dataNodeId == e.getKey())
+ .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
+ }
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
@@ -395,7 +407,13 @@ public class NodeManager {
public List<TSStatus> clearCache(TClearCacheReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ if (req.dataNodeId != -1) {
+ dataNodeLocationMap =
+ dataNodeLocationMap.entrySet().stream()
+ .filter((e) -> req.dataNodeId == e.getKey())
+ .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
+ }
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 87828532c2..a1e2ca29ef 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -100,7 +100,7 @@ public class PermissionManager {
*/
public TSStatus invalidateCache(String username, String roleName) {
List<TDataNodeConfiguration> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1);
+ configManager.getNodeManager().getRegisteredDataNodes();
TInvalidatePermissionCacheReq req = new TInvalidatePermissionCacheReq();
TSStatus status;
req.setUsername(username);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 32bf01f437..0309ace63a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -80,7 +80,7 @@ public class UDFManager {
private List<TSStatus> createFunctionOnDataNodes(
String functionName, String className, List<String> uris) {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
final List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TCreateFunctionRequest request =
@@ -112,7 +112,7 @@ public class UDFManager {
private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
final List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index ff9835cebf..fca9a8267b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -334,7 +334,7 @@ public class LoadManager {
public void broadcastLatestRegionRouteMap() {
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- getOnlineDataNodes(-1)
+ getOnlineDataNodes()
.forEach(
onlineDataNode ->
dataNodeLocationMap.put(
@@ -358,7 +358,7 @@ public class LoadManager {
// Generate HeartbeatReq
THeartbeatReq heartbeatReq = genHeartbeatReq();
// Send heartbeat requests to all the registered DataNodes
- pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes(-1));
+ pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
// Send heartbeat requests to all the registered ConfigNodes
pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes());
}
@@ -451,8 +451,8 @@ public class LoadManager {
.collect(Collectors.toList());
}
- public List<TDataNodeConfiguration> getOnlineDataNodes(int dataNodeId) {
- return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
+ public List<TDataNodeConfiguration> getOnlineDataNodes() {
+ return getNodeManager().getRegisteredDataNodes().stream()
.filter(
registeredDataNode -> {
int id = registeredDataNode.getLocation().getDataNodeId();
@@ -473,8 +473,8 @@ public class LoadManager {
.collect(Collectors.toList());
}
- public List<TDataNodeConfiguration> getUnknownDataNodes(int dataNodeId) {
- return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
+ public List<TDataNodeConfiguration> getUnknownDataNodes() {
+ return getNodeManager().getRegisteredDataNodes().stream()
.filter(
registeredDataNode -> {
int id = registeredDataNode.getLocation().getDataNodeId();
@@ -511,7 +511,7 @@ public class LoadManager {
}
public int getRunningDataNodesNum() {
- List<TDataNodeConfiguration> allDataNodes = getOnlineDataNodes(-1);
+ List<TDataNodeConfiguration> allDataNodes = getOnlineDataNodes();
if (allDataNodes == null) {
return 0;
}
@@ -564,7 +564,7 @@ public class LoadManager {
}
public int getUnknownDataNodesNum() {
- List<TDataNodeConfiguration> allDataNodes = getUnknownDataNodes(-1);
+ List<TDataNodeConfiguration> allDataNodes = getUnknownDataNodes();
if (allDataNodes == null) {
return 0;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index d0c51cef53..155dbd4dda 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -70,7 +70,7 @@ public class RegionBalancer {
// TODO: After waiting for the IT framework to complete, change the following code to:
// List<TDataNodeInfo> onlineDataNodes = getLoadManager().getOnlineDataNodes(-1);
- List<TDataNodeConfiguration> registeredDataNodes = getNodeManager().getRegisteredDataNodes(-1);
+ List<TDataNodeConfiguration> registeredDataNodes = getNodeManager().getRegisteredDataNodes();
List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index a77fa10be2..639170945e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -95,7 +95,7 @@ public class RouteBalancer {
.getDataRegionConsensusProtocolClass()
.equals(ConsensusFactory.MultiLeaderConsensus)) {
// Latent router for MultiLeader consensus protocol
- lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes(-1));
+ lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes());
return lazyGreedyRouter;
} else if (policy.equals(leaderPolicy)) {
return new LeaderRouter(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 1efbc648fb..73016a12f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -297,22 +297,12 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
- /**
- * Return the specific registered DataNode
- *
- * @param dataNodeId Specific DataNodeId
- * @return All registered DataNodes if dataNodeId equals -1. And return the specific DataNode
- * otherwise.
- */
- public List<TDataNodeConfiguration> getRegisteredDataNodes(int dataNodeId) {
+ /** Return All registered DataNodes */
+ public List<TDataNodeConfiguration> getRegisteredDataNodes() {
List<TDataNodeConfiguration> result;
dataNodeInfoReadWriteLock.readLock().lock();
try {
- if (dataNodeId == -1) {
- result = new ArrayList<>(registeredDataNodes.values());
- } else {
- result = Collections.singletonList(registeredDataNodes.get(dataNodeId));
- }
+ result = new ArrayList<>(registeredDataNodes.values());
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index d1151f8b8e..0408fe45ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -120,7 +120,7 @@ public class ConfigNodeProcedureEnv {
return invalidCacheResult;
}
List<TDataNodeConfiguration> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1);
+ configManager.getNodeManager().getRegisteredDataNodes();
TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
invalidateCacheReq.setStorageGroup(true);
invalidateCacheReq.setFullPath(storageGroupName);
@@ -264,7 +264,7 @@ public class ConfigNodeProcedureEnv {
public void broadCastTheLatestConfigNodeGroup() {
AsyncDataNodeClientPool.getInstance()
.broadCastTheLatestConfigNodeGroup(
- configManager.getNodeManager().getRegisteredDataNodeLocations(-1),
+ configManager.getNodeManager().getRegisteredDataNodeLocations(),
configManager.getNodeManager().getRegisteredConfigNodes());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index ced771f189..580d1f7519 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -87,7 +87,7 @@ public class DataNodeRemoveHandler {
"DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TEndPoint> otherOnlineDataNodes =
- configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+ configManager.getLoadManager().getOnlineDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)
.filter(loc -> !loc.equals(disabledDataNode))
.map(TDataNodeLocation::getInternalEndPoint)
@@ -232,7 +232,7 @@ public class DataNodeRemoveHandler {
private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
List<TDataNodeLocation> regionReplicaNodes) {
- return configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+ return configManager.getLoadManager().getOnlineDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)
.filter(e -> !regionReplicaNodes.contains(e))
.findAny();
@@ -337,7 +337,7 @@ public class DataNodeRemoveHandler {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TDataNodeLocation> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1).stream()
+ configManager.getNodeManager().getRegisteredDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
boolean hasNotExistNode =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index 65c2a41883..b2df82c2bf 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -78,7 +78,7 @@ public class NodeInfoTest {
nodeInfo.setDrainingDataNodes(drainingDataNodes_before);
int nextId = nodeInfo.getNextNodeId();
- List<TDataNodeConfiguration> onlineDataNodes_before = nodeInfo.getRegisteredDataNodes(-1);
+ List<TDataNodeConfiguration> onlineDataNodes_before = nodeInfo.getRegisteredDataNodes();
nodeInfo.processTakeSnapshot(snapshotDir);
nodeInfo.clear();
@@ -89,7 +89,7 @@ public class NodeInfoTest {
Set<TDataNodeLocation> drainingDataNodes_after = nodeInfo.getDrainingDataNodes();
Assert.assertEquals(drainingDataNodes_before, drainingDataNodes_after);
- List<TDataNodeConfiguration> onlineDataNodes_after = nodeInfo.getRegisteredDataNodes(-1);
+ List<TDataNodeConfiguration> onlineDataNodes_after = nodeInfo.getRegisteredDataNodes();
Assert.assertEquals(onlineDataNodes_before, onlineDataNodes_after);
}