You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2022/08/10 10:07:10 UTC

[iotdb] branch master updated: [IOTDB-4045] Optimize DataNode query relevant interfaces (#6944)

This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f12e15eb03 [IOTDB-4045] Optimize DataNode query relevant interfaces (#6944)
f12e15eb03 is described below

commit f12e15eb03023e9c86d5c2e2be497fb7edc0f614
Author: 23931017wu <71...@users.noreply.github.com>
AuthorDate: Wed Aug 10 18:07:06 2022 +0800

    [IOTDB-4045] Optimize DataNode query relevant interfaces (#6944)
---
 .../confignode/manager/ClusterSchemaManager.java   |  4 +--
 .../iotdb/confignode/manager/ConfigManager.java    |  2 +-
 .../iotdb/confignode/manager/NodeManager.java      | 38 ++++++++++++++++------
 .../confignode/manager/PermissionManager.java      |  2 +-
 .../iotdb/confignode/manager/UDFManager.java       |  4 +--
 .../iotdb/confignode/manager/load/LoadManager.java | 16 ++++-----
 .../manager/load/balancer/RegionBalancer.java      |  2 +-
 .../manager/load/balancer/RouteBalancer.java       |  2 +-
 .../iotdb/confignode/persistence/NodeInfo.java     | 16 ++-------
 .../procedure/env/ConfigNodeProcedureEnv.java      |  4 +--
 .../procedure/env/DataNodeRemoveHandler.java       |  6 ++--
 .../iotdb/confignode/persistence/NodeInfoTest.java |  4 +--
 12 files changed, 54 insertions(+), 46 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 638f6b5332..d203e18f5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -407,7 +407,7 @@ public class ClusterSchemaManager {
     // sync template set info to all dataNodes
     TSStatus status;
     List<TDataNodeConfiguration> allDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1);
+        configManager.getNodeManager().getRegisteredDataNodes();
     for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
       status =
           SyncDataNodeClientPool.getInstance()
@@ -447,7 +447,7 @@ public class ClusterSchemaManager {
 
     // get all dataNodes
     List<TDataNodeConfiguration> allDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1);
+        configManager.getNodeManager().getRegisteredDataNodes();
 
     // send rollbackReq
     TSStatus status;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 3d9a04a89d..b8176c23b4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -240,7 +240,7 @@ public class ConfigManager implements IManager {
       List<TConfigNodeLocation> configNodeLocations = getNodeManager().getRegisteredConfigNodes();
       configNodeLocations.sort(Comparator.comparingInt(TConfigNodeLocation::getConfigNodeId));
       List<TDataNodeLocation> dataNodeInfoLocations =
-          getNodeManager().getRegisteredDataNodes(-1).stream()
+          getNodeManager().getRegisteredDataNodes().stream()
               .map(TDataNodeConfiguration::getLocation)
               .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
               .collect(Collectors.toList());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index cce00c155a..fd92da65c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -60,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 /** NodeManager manages cluster node addition and removal requests */
 public class NodeManager {
@@ -205,17 +206,16 @@ public class NodeManager {
    * Only leader use this interface
    *
    * @param dataNodeId Specific DataNodeId
-   * @return All registered DataNodes if dataNodeId equals -1. And return the specific DataNode
-   *     otherwise.
+   * @return All registered DataNodes
    */
-  public List<TDataNodeConfiguration> getRegisteredDataNodes(int dataNodeId) {
-    return nodeInfo.getRegisteredDataNodes(dataNodeId);
+  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
+    return nodeInfo.getRegisteredDataNodes();
   }
 
-  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations(int dataNodeId) {
+  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
     Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
     nodeInfo
-        .getRegisteredDataNodes(dataNodeId)
+        .getRegisteredDataNodes()
         .forEach(
             dataNodeConfiguration ->
                 dataNodeLocations.put(
@@ -235,7 +235,7 @@ public class NodeManager {
 
   public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
     List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
-    List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes(-1);
+    List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
     if (registeredDataNodes != null) {
       registeredDataNodes.forEach(
           (dataNodeInfo) -> {
@@ -373,7 +373,13 @@ public class NodeManager {
 
   public List<TSStatus> merge(TMergeReq req) {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    if (req.dataNodeId != -1) {
+      dataNodeLocationMap =
+          dataNodeLocationMap.entrySet().stream()
+              .filter((e) -> req.dataNodeId == e.getKey())
+              .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
+    }
     List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     AsyncDataNodeClientPool.getInstance()
@@ -384,7 +390,13 @@ public class NodeManager {
 
   public List<TSStatus> flush(TFlushReq req) {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    if (req.dataNodeId != -1) {
+      dataNodeLocationMap =
+          dataNodeLocationMap.entrySet().stream()
+              .filter((e) -> req.dataNodeId == e.getKey())
+              .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
+    }
     List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     AsyncDataNodeClientPool.getInstance()
@@ -395,7 +407,13 @@ public class NodeManager {
 
   public List<TSStatus> clearCache(TClearCacheReq req) {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    if (req.dataNodeId != -1) {
+      dataNodeLocationMap =
+          dataNodeLocationMap.entrySet().stream()
+              .filter((e) -> req.dataNodeId == e.getKey())
+              .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
+    }
     List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     AsyncDataNodeClientPool.getInstance()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 87828532c2..a1e2ca29ef 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -100,7 +100,7 @@ public class PermissionManager {
    */
   public TSStatus invalidateCache(String username, String roleName) {
     List<TDataNodeConfiguration> allDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1);
+        configManager.getNodeManager().getRegisteredDataNodes();
     TInvalidatePermissionCacheReq req = new TInvalidatePermissionCacheReq();
     TSStatus status;
     req.setUsername(username);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 32bf01f437..0309ace63a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -80,7 +80,7 @@ public class UDFManager {
   private List<TSStatus> createFunctionOnDataNodes(
       String functionName, String className, List<String> uris) {
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
     final List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TCreateFunctionRequest request =
@@ -112,7 +112,7 @@ public class UDFManager {
 
   private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
     final List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index ff9835cebf..fca9a8267b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -334,7 +334,7 @@ public class LoadManager {
   public void broadcastLatestRegionRouteMap() {
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
     Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    getOnlineDataNodes(-1)
+    getOnlineDataNodes()
         .forEach(
             onlineDataNode ->
                 dataNodeLocationMap.put(
@@ -358,7 +358,7 @@ public class LoadManager {
       // Generate HeartbeatReq
       THeartbeatReq heartbeatReq = genHeartbeatReq();
       // Send heartbeat requests to all the registered DataNodes
-      pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes(-1));
+      pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
       // Send heartbeat requests to all the registered ConfigNodes
       pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes());
     }
@@ -451,8 +451,8 @@ public class LoadManager {
         .collect(Collectors.toList());
   }
 
-  public List<TDataNodeConfiguration> getOnlineDataNodes(int dataNodeId) {
-    return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
+  public List<TDataNodeConfiguration> getOnlineDataNodes() {
+    return getNodeManager().getRegisteredDataNodes().stream()
         .filter(
             registeredDataNode -> {
               int id = registeredDataNode.getLocation().getDataNodeId();
@@ -473,8 +473,8 @@ public class LoadManager {
         .collect(Collectors.toList());
   }
 
-  public List<TDataNodeConfiguration> getUnknownDataNodes(int dataNodeId) {
-    return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
+  public List<TDataNodeConfiguration> getUnknownDataNodes() {
+    return getNodeManager().getRegisteredDataNodes().stream()
         .filter(
             registeredDataNode -> {
               int id = registeredDataNode.getLocation().getDataNodeId();
@@ -511,7 +511,7 @@ public class LoadManager {
   }
 
   public int getRunningDataNodesNum() {
-    List<TDataNodeConfiguration> allDataNodes = getOnlineDataNodes(-1);
+    List<TDataNodeConfiguration> allDataNodes = getOnlineDataNodes();
     if (allDataNodes == null) {
       return 0;
     }
@@ -564,7 +564,7 @@ public class LoadManager {
   }
 
   public int getUnknownDataNodesNum() {
-    List<TDataNodeConfiguration> allDataNodes = getUnknownDataNodes(-1);
+    List<TDataNodeConfiguration> allDataNodes = getUnknownDataNodes();
     if (allDataNodes == null) {
       return 0;
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index d0c51cef53..155dbd4dda 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -70,7 +70,7 @@ public class RegionBalancer {
 
     // TODO: After waiting for the IT framework to complete, change the following code to:
     //  List<TDataNodeInfo> onlineDataNodes = getLoadManager().getOnlineDataNodes(-1);
-    List<TDataNodeConfiguration> registeredDataNodes = getNodeManager().getRegisteredDataNodes(-1);
+    List<TDataNodeConfiguration> registeredDataNodes = getNodeManager().getRegisteredDataNodes();
     List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
 
     for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index a77fa10be2..639170945e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -95,7 +95,7 @@ public class RouteBalancer {
             .getDataRegionConsensusProtocolClass()
             .equals(ConsensusFactory.MultiLeaderConsensus)) {
           // Latent router for MultiLeader consensus protocol
-          lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes(-1));
+          lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes());
           return lazyGreedyRouter;
         } else if (policy.equals(leaderPolicy)) {
           return new LeaderRouter(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 1efbc648fb..73016a12f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -297,22 +297,12 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
-  /**
-   * Return the specific registered DataNode
-   *
-   * @param dataNodeId Specific DataNodeId
-   * @return All registered DataNodes if dataNodeId equals -1. And return the specific DataNode
-   *     otherwise.
-   */
-  public List<TDataNodeConfiguration> getRegisteredDataNodes(int dataNodeId) {
+  /** Return All registered DataNodes */
+  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
     List<TDataNodeConfiguration> result;
     dataNodeInfoReadWriteLock.readLock().lock();
     try {
-      if (dataNodeId == -1) {
-        result = new ArrayList<>(registeredDataNodes.values());
-      } else {
-        result = Collections.singletonList(registeredDataNodes.get(dataNodeId));
-      }
+      result = new ArrayList<>(registeredDataNodes.values());
     } finally {
       dataNodeInfoReadWriteLock.readLock().unlock();
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index d1151f8b8e..0408fe45ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -120,7 +120,7 @@ public class ConfigNodeProcedureEnv {
       return invalidCacheResult;
     }
     List<TDataNodeConfiguration> allDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1);
+        configManager.getNodeManager().getRegisteredDataNodes();
     TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
     invalidateCacheReq.setStorageGroup(true);
     invalidateCacheReq.setFullPath(storageGroupName);
@@ -264,7 +264,7 @@ public class ConfigNodeProcedureEnv {
   public void broadCastTheLatestConfigNodeGroup() {
     AsyncDataNodeClientPool.getInstance()
         .broadCastTheLatestConfigNodeGroup(
-            configManager.getNodeManager().getRegisteredDataNodeLocations(-1),
+            configManager.getNodeManager().getRegisteredDataNodeLocations(),
             configManager.getNodeManager().getRegisteredConfigNodes());
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index ced771f189..580d1f7519 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -87,7 +87,7 @@ public class DataNodeRemoveHandler {
         "DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     List<TEndPoint> otherOnlineDataNodes =
-        configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+        configManager.getLoadManager().getOnlineDataNodes().stream()
             .map(TDataNodeConfiguration::getLocation)
             .filter(loc -> !loc.equals(disabledDataNode))
             .map(TDataNodeLocation::getInternalEndPoint)
@@ -232,7 +232,7 @@ public class DataNodeRemoveHandler {
 
   private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
       List<TDataNodeLocation> regionReplicaNodes) {
-    return configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+    return configManager.getLoadManager().getOnlineDataNodes().stream()
         .map(TDataNodeConfiguration::getLocation)
         .filter(e -> !regionReplicaNodes.contains(e))
         .findAny();
@@ -337,7 +337,7 @@ public class DataNodeRemoveHandler {
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
 
     List<TDataNodeLocation> allDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1).stream()
+        configManager.getNodeManager().getRegisteredDataNodes().stream()
             .map(TDataNodeConfiguration::getLocation)
             .collect(Collectors.toList());
     boolean hasNotExistNode =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index 65c2a41883..b2df82c2bf 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -78,7 +78,7 @@ public class NodeInfoTest {
     nodeInfo.setDrainingDataNodes(drainingDataNodes_before);
 
     int nextId = nodeInfo.getNextNodeId();
-    List<TDataNodeConfiguration> onlineDataNodes_before = nodeInfo.getRegisteredDataNodes(-1);
+    List<TDataNodeConfiguration> onlineDataNodes_before = nodeInfo.getRegisteredDataNodes();
 
     nodeInfo.processTakeSnapshot(snapshotDir);
     nodeInfo.clear();
@@ -89,7 +89,7 @@ public class NodeInfoTest {
     Set<TDataNodeLocation> drainingDataNodes_after = nodeInfo.getDrainingDataNodes();
     Assert.assertEquals(drainingDataNodes_before, drainingDataNodes_after);
 
-    List<TDataNodeConfiguration> onlineDataNodes_after = nodeInfo.getRegisteredDataNodes(-1);
+    List<TDataNodeConfiguration> onlineDataNodes_after = nodeInfo.getRegisteredDataNodes();
     Assert.assertEquals(onlineDataNodes_before, onlineDataNodes_after);
   }