You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/07/22 06:52:18 UTC

[iotdb] branch master updated: [IOTDB-3880] Optimize asyncclientpool retry logic (#6735)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 842105470d [IOTDB-3880] Optimize asyncclientpool retry logic (#6735)
842105470d is described below

commit 842105470de4ad166d2307f4f05ffaa388e66d0c
Author: 任宇华 <79...@users.noreply.github.com>
AuthorDate: Fri Jul 22 14:52:12 2022 +0800

    [IOTDB-3880] Optimize asyncclientpool retry logic (#6735)
    
    [IOTDB-3880] Optimize asyncclientpool retry logic (#6735)
---
 .../confignode/client/DataNodeRequestType.java     |   3 +-
 .../async/datanode/AsyncDataNodeClientPool.java    | 221 ++++++++++++---------
 .../async/handlers/AbstractRetryHandler.java       |  33 +--
 .../client/async/handlers/CreateRegionHandler.java |  14 +-
 .../client/async/handlers/FlushHandler.java        |  16 +-
 .../async/handlers/FunctionManagementHandler.java  |  11 +-
 .../client/async/handlers/SetTTLHandler.java       |   7 +-
 .../handlers/UpdateRegionRouteMapHandler.java      |   9 +-
 .../confignode/manager/ClusterSchemaManager.java   |  30 +--
 .../iotdb/confignode/manager/NodeManager.java      |  43 ++--
 .../iotdb/confignode/manager/UDFManager.java       |  63 ++----
 .../iotdb/confignode/manager/load/LoadManager.java |  31 +--
 12 files changed, 219 insertions(+), 262 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index f9a96122a4..715ebfd8de 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -29,7 +29,8 @@ public enum DataNodeRequestType {
   STOP_DATA_NODE,
 
   SET_TTL,
-  CREATE_REGIONS,
+  CREATE_DATA_REGIONS,
+  CREATE_SCHEMA_REGIONS,
   CREATE_FUNCTION,
   DROP_FUNCTION,
   FLUSH,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 7475ea3516..91c10c5991 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -18,15 +18,15 @@
  */
 package org.apache.iotdb.confignode.client.async.datanode;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.confignode.client.ConfigNodeClientPoolFactory;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
 import org.apache.iotdb.confignode.client.async.handlers.CreateRegionHandler;
 import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
@@ -49,7 +49,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
 public class AsyncDataNodeClientPool {
@@ -72,38 +71,63 @@ public class AsyncDataNodeClientPool {
    * receive the requests
    *
    * @param req request
-   * @param handlerMap Map<index, Handler>
-   * @param dataNodeLocations ConcurrentHashMap<index, TDataNodeLocation> The specific DataNodes
+   * @param dataNodeLocationMap Map<DataNodeId, TDataNodeLocation>
+   * @param requestType DataNodeRequestType
+   * @param dataNodeResponseStatus response list.Used by CREATE_FUNCTION,DROP_FUNCTION and FLUSH
    */
   public void sendAsyncRequestToDataNodeWithRetry(
       Object req,
-      Map<Integer, AbstractRetryHandler> handlerMap,
-      Map<Integer, TDataNodeLocation> dataNodeLocations) {
-    CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
-    if (dataNodeLocations.isEmpty()) {
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      DataNodeRequestType requestType,
+      List<TSStatus> dataNodeResponseStatus) {
+    if (dataNodeLocationMap.isEmpty()) {
       return;
     }
     for (int retry = 0; retry < retryNum; retry++) {
-      AbstractRetryHandler handler = null;
-      for (Map.Entry<Integer, TDataNodeLocation> entry : dataNodeLocations.entrySet()) {
-        handler = handlerMap.get(entry.getKey());
-        // If it is not the first request, then prove that this operation is a retry.
-        // The count of countDownLatch needs to be updated
-        if (retry != 0) {
-          handler.setCountDownLatch(countDownLatch);
+      CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
+      for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
+        AbstractRetryHandler handler;
+        switch (requestType) {
+          case SET_TTL:
+            handler =
+                new SetTTLHandler(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+            break;
+          case CREATE_FUNCTION:
+          case DROP_FUNCTION:
+            handler =
+                new FunctionManagementHandler(
+                    countDownLatch,
+                    requestType,
+                    targetDataNode,
+                    dataNodeLocationMap,
+                    dataNodeResponseStatus);
+            break;
+          case FLUSH:
+            handler =
+                new FlushHandler(
+                    countDownLatch,
+                    requestType,
+                    targetDataNode,
+                    dataNodeLocationMap,
+                    dataNodeResponseStatus);
+            break;
+          case UPDATE_REGION_ROUTE_MAP:
+            handler =
+                new UpdateRegionRouteMapHandler(
+                    countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+            break;
+          default:
+            return;
         }
-        // send request
-        sendAsyncRequestToDataNode(entry.getValue(), req, handler, retry);
+        sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
       }
       try {
-        handler.getCountDownLatch().await();
+        countDownLatch.await();
       } catch (InterruptedException e) {
-        LOGGER.error("Interrupted during {} on ConfigNode", handler.getDataNodeRequestType());
+        LOGGER.error("Interrupted during {} on ConfigNode", requestType);
       }
       // Check if there is a node that fails to send the request, and retry if there is one
-      if (!handler.getDataNodeLocations().isEmpty()) {
-        countDownLatch = new CountDownLatch(handler.getDataNodeLocations().size());
-      } else {
+      if (dataNodeLocationMap.isEmpty()) {
         break;
       }
     }
@@ -118,21 +142,11 @@ public class AsyncDataNodeClientPool {
     try {
       client = clientManager.borrowClient(dataNodeLocation.getInternalEndPoint());
       switch (handler.getDataNodeRequestType()) {
-        case SET_TTL:
-          client.setTTL((TSetTTLReq) req, (SetTTLHandler) handler);
+        case CREATE_DATA_REGIONS:
+          client.createDataRegion((TCreateDataRegionReq) req, (CreateRegionHandler) handler);
           break;
-        case CREATE_REGIONS:
-          TConsensusGroupType regionType =
-              ((CreateRegionHandler) handler).getConsensusGroupId().getType();
-          if (regionType.equals(TConsensusGroupType.SchemaRegion)) {
-            client.createSchemaRegion(
-                (TCreateSchemaRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
-                (CreateRegionHandler) handler);
-          } else if (regionType.equals(TConsensusGroupType.DataRegion)) {
-            client.createDataRegion(
-                (TCreateDataRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
-                (CreateRegionHandler) handler);
-          }
+        case CREATE_SCHEMA_REGIONS:
+          client.createSchemaRegion((TCreateSchemaRegionReq) req, (CreateRegionHandler) handler);
           break;
         case CREATE_FUNCTION:
           client.createFunction((TCreateFunctionRequest) req, (FunctionManagementHandler) handler);
@@ -147,7 +161,6 @@ public class AsyncDataNodeClientPool {
           client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
           break;
         default:
-          return;
       }
     } catch (Exception e) {
       LOGGER.warn(
@@ -167,61 +180,85 @@ public class AsyncDataNodeClientPool {
    */
   public void createRegions(
       CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
-
-    // Number of regions to be created
-    int regionNum = 0;
-    // Assign an independent index to each Region
-    for (Map.Entry<String, List<TRegionReplicaSet>> entry :
-        createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
-      for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
-        regionNum += regionReplicaSet.getDataNodeLocationsSize();
+    // Because different requests will be sent to the same node when createRegions,
+    // so for CreateRegions use Map<index, TDataNodeLocation>
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+    int index = 0;
+    // Count the datanodes to be sent
+    for (List<TRegionReplicaSet> regionReplicaSets :
+        createRegionGroupsPlan.getRegionGroupMap().values()) {
+      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+          dataNodeLocationMap.put(index++, dataNodeLocation);
+        }
+      }
+    }
+    if (dataNodeLocationMap.isEmpty()) {
+      return;
+    }
+    for (int retry = 0; retry < retryNum; retry++) {
+      index = 0;
+      CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
+      for (Map.Entry<String, List<TRegionReplicaSet>> entry :
+          createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+        // Enumerate each RegionReplicaSet
+        for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
+          // Enumerate each Region
+          for (TDataNodeLocation targetDataNode : regionReplicaSet.getDataNodeLocations()) {
+            if (dataNodeLocationMap.containsKey(index)) {
+              AbstractRetryHandler handler;
+              switch (regionReplicaSet.getRegionId().getType()) {
+                case SchemaRegion:
+                  handler =
+                      new CreateRegionHandler(
+                          countDownLatch,
+                          DataNodeRequestType.CREATE_SCHEMA_REGIONS,
+                          regionReplicaSet.regionId,
+                          targetDataNode,
+                          dataNodeLocationMap,
+                          index++);
+                  sendAsyncRequestToDataNode(
+                      targetDataNode,
+                      genCreateSchemaRegionReq(entry.getKey(), regionReplicaSet),
+                      handler,
+                      retry);
+                  break;
+                case DataRegion:
+                  handler =
+                      new CreateRegionHandler(
+                          countDownLatch,
+                          DataNodeRequestType.CREATE_DATA_REGIONS,
+                          regionReplicaSet.regionId,
+                          targetDataNode,
+                          dataNodeLocationMap,
+                          index++);
+                  sendAsyncRequestToDataNode(
+                      targetDataNode,
+                      genCreateDataRegionReq(
+                          entry.getKey(), regionReplicaSet, ttlMap.get(entry.getKey())),
+                      handler,
+                      retry);
+                  break;
+                default:
+                  return;
+              }
+            } else {
+              index++;
+            }
+          }
+        }
+      }
+      try {
+        countDownLatch.await();
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted during createRegions on ConfigNode");
+      }
+      // Check if there is a node that fails to send the request, and
+      // retry if there is one
+      if (dataNodeLocationMap.isEmpty()) {
+        break;
       }
     }
-    Map<Integer, AbstractRetryHandler> handlerMap = new ConcurrentHashMap<>();
-    ConcurrentHashMap<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
-    Map<Integer, Object> req = new ConcurrentHashMap<>();
-    AtomicInteger index = new AtomicInteger();
-    CountDownLatch latch = new CountDownLatch(regionNum);
-    createRegionGroupsPlan
-        .getRegionGroupMap()
-        .forEach(
-            (storageGroup, regionReplicaSets) -> {
-              // Enumerate each RegionReplicaSet
-              regionReplicaSets.forEach(
-                  regionReplicaSet -> {
-                    // Enumerate each Region
-                    regionReplicaSet
-                        .getDataNodeLocations()
-                        .forEach(
-                            dataNodeLocation -> {
-                              handlerMap.put(
-                                  index.get(),
-                                  new CreateRegionHandler(
-                                      index.get(),
-                                      latch,
-                                      regionReplicaSet.getRegionId(),
-                                      dataNodeLocation,
-                                      dataNodeLocations));
-
-                              switch (regionReplicaSet.getRegionId().getType()) {
-                                case SchemaRegion:
-                                  req.put(
-                                      index.get(),
-                                      genCreateSchemaRegionReq(storageGroup, regionReplicaSet));
-                                  break;
-                                case DataRegion:
-                                  req.put(
-                                      index.get(),
-                                      genCreateDataRegionReq(
-                                          storageGroup,
-                                          regionReplicaSet,
-                                          ttlMap.get(storageGroup)));
-                              }
-                              dataNodeLocations.put(index.getAndIncrement(), dataNodeLocation);
-                            });
-                  });
-            });
-    sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
   }
 
   private TCreateSchemaRegionReq genCreateSchemaRegionReq(
@@ -244,7 +281,7 @@ public class AsyncDataNodeClientPool {
   /**
    * Only used in LoadManager
    *
-   * @param endPoint
+   * @param endPoint The specific DataNode
    */
   public void getDataNodeHeartBeat(
       TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
index 518036dd19..6d882ecbd2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
@@ -27,15 +27,14 @@ import java.util.concurrent.CountDownLatch;
 
 public abstract class AbstractRetryHandler {
 
-  protected final int index;
-
   protected CountDownLatch countDownLatch;
+
   /**
-   * Map<Index, TDataNodeLocation> The DataNode that successfully execute the request will be
-   * removed from this map
+   * Map<DataNodeId, TDataNodeLocation> The DataNode that successfully execute the request will be
+   * removed from this list
    */
-  protected Map<Integer, TDataNodeLocation> dataNodeLocations;
-
+  protected Map<Integer, TDataNodeLocation> dataNodeLocationMap;
+  /** Request type to DataNode */
   protected DataNodeRequestType dataNodeRequestType;
   /** Target DataNode */
   protected TDataNodeLocation targetDataNode;
@@ -44,32 +43,14 @@ public abstract class AbstractRetryHandler {
       CountDownLatch countDownLatch,
       DataNodeRequestType dataNodeRequestType,
       TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocations,
-      int index) {
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
     this.countDownLatch = countDownLatch;
-    this.dataNodeLocations = dataNodeLocations;
+    this.dataNodeLocationMap = dataNodeLocationMap;
     this.dataNodeRequestType = dataNodeRequestType;
     this.targetDataNode = targetDataNode;
-    this.index = index;
-  }
-
-  public void setCountDownLatch(CountDownLatch countDownLatch) {
-    this.countDownLatch = countDownLatch;
-  }
-
-  public CountDownLatch getCountDownLatch() {
-    return countDownLatch;
-  }
-
-  public Map<Integer, TDataNodeLocation> getDataNodeLocations() {
-    return dataNodeLocations;
   }
 
   public DataNodeRequestType getDataNodeRequestType() {
     return dataNodeRequestType;
   }
-
-  public int getIndex() {
-    return index;
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
index c1828b64ab..3cef7ed55a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
@@ -38,23 +38,29 @@ public class CreateRegionHandler extends AbstractRetryHandler
 
   private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionHandler.class);
 
+  // The index of dataNodeLocations
+  // We use Index instead of DataNodeId because it is possible to send multiple createRegion
+  // requests to the same DataNode
+  private final int index;
   // Used for Logger
   private final TConsensusGroupId consensusGroupId;
 
   public CreateRegionHandler(
-      int index,
       CountDownLatch latch,
+      DataNodeRequestType requestType,
       TConsensusGroupId consensusGroupId,
       TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocations) {
-    super(latch, DataNodeRequestType.CREATE_REGIONS, targetDataNode, dataNodeLocations, index);
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      int index) {
+    super(latch, requestType, targetDataNode, dataNodeLocationMap);
     this.consensusGroupId = consensusGroupId;
+    this.index = index;
   }
 
   @Override
   public void onComplete(TSStatus tsStatus) {
     if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      getDataNodeLocations().remove(index);
+      dataNodeLocationMap.remove(index);
       LOGGER.info(
           String.format(
               "Successfully create %s on DataNode: %s",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
index f72cf7ee74..aa0c366ced 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
@@ -39,13 +39,12 @@ public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCal
   private final List<TSStatus> dataNodeResponseStatus;
 
   public FlushHandler(
-      TDataNodeLocation targetDataNode,
       CountDownLatch countDownLatch,
       DataNodeRequestType requestType,
-      List<TSStatus> dataNodeResponseStatus,
-      Map<Integer, TDataNodeLocation> dataNodeLocations,
-      int index) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      List<TSStatus> dataNodeResponseStatus) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
     this.dataNodeResponseStatus = dataNodeResponseStatus;
   }
 
@@ -53,10 +52,13 @@ public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCal
   public void onComplete(TSStatus response) {
     if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       dataNodeResponseStatus.add(response);
-      dataNodeLocations.remove(index);
+      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
       LOGGER.info("Successfully Flush on DataNode: {}", targetDataNode);
     } else {
-      LOGGER.error("Failed to Flush on DataNode {}, {}", dataNodeLocations, response);
+      LOGGER.error(
+          "Failed to Flush on DataNode {}, {}",
+          dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
+          response);
     }
     countDownLatch.countDown();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
index 306cea3801..e48545a8ae 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
@@ -41,12 +41,11 @@ public class FunctionManagementHandler extends AbstractRetryHandler
 
   public FunctionManagementHandler(
       CountDownLatch countDownLatch,
-      TDataNodeLocation targetDataNode,
-      List<TSStatus> dataNodeResponseStatus,
       DataNodeRequestType requestType,
-      Map<Integer, TDataNodeLocation> dataNodeLocations,
-      int index) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      List<TSStatus> dataNodeResponseStatus) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
     this.dataNodeResponseStatus = dataNodeResponseStatus;
   }
 
@@ -54,7 +53,7 @@ public class FunctionManagementHandler extends AbstractRetryHandler
   public void onComplete(TSStatus response) {
     if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       dataNodeResponseStatus.add(response);
-      dataNodeLocations.remove(index);
+      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
       LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
     } else {
       LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
index e4fb7d79e6..6792a49e7f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
@@ -38,15 +38,14 @@ public class SetTTLHandler extends AbstractRetryHandler implements AsyncMethodCa
       CountDownLatch countDownLatch,
       DataNodeRequestType requestType,
       TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocations,
-      int index) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
   }
 
   @Override
   public void onComplete(TSStatus response) {
     if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      getDataNodeLocations().remove(index);
+      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
       LOGGER.info("Successfully SetTTL on DataNode: {}", targetDataNode);
     } else {
       LOGGER.error("Failed to SetTTL on DataNode: {}, {}", targetDataNode, response);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
index 5448f5f2e1..4877fb70a6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
@@ -36,18 +36,17 @@ public class UpdateRegionRouteMapHandler extends AbstractRetryHandler
   private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
 
   public UpdateRegionRouteMapHandler(
-      TDataNodeLocation targetDataNode,
       CountDownLatch countDownLatch,
       DataNodeRequestType requestType,
-      Map<Integer, TDataNodeLocation> dataNodeLocations,
-      int index) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
   }
 
   @Override
   public void onComplete(TSStatus status) {
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      getDataNodeLocations().remove(targetDataNode);
+      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
       LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", targetDataNode);
     } else {
       LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index ab88c4add0..a3eef8ecbe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetNodesInSchemaTemplatePlan;
@@ -63,13 +61,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /** The ClusterSchemaManager Manages cluster schema read and write requests. */
 public class ClusterSchemaManager {
@@ -162,33 +157,22 @@ public class ClusterSchemaManager {
           TSStatusCode.STORAGE_GROUP_NOT_EXIST,
           "storageGroup " + setTTLPlan.getStorageGroup() + " does not exist");
     }
-
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
     Set<TDataNodeLocation> dataNodeLocations =
         getPartitionManager()
             .getStorageGroupRelatedDataNodes(
                 setTTLPlan.getStorageGroup(), TConsensusGroupType.DataRegion);
+    dataNodeLocations.forEach(
+        dataNodeLocation ->
+            dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation));
     if (dataNodeLocations.size() > 0) {
-      CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
-      Map<Integer, AbstractRetryHandler> handler = new HashMap<>();
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-      AtomicInteger index = new AtomicInteger();
       // TODO: Use procedure to protect SetTTL on DataNodes
-      for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
-        handler.put(
-            index.get(),
-            new SetTTLHandler(
-                countDownLatch,
-                DataNodeRequestType.SET_TTL,
-                dataNodeLocation,
-                dataNodeLocationMap,
-                index.get()));
-        dataNodeLocationMap.put(index.getAndIncrement(), dataNodeLocation);
-      }
       AsyncDataNodeClientPool.getInstance()
           .sendAsyncRequestToDataNodeWithRetry(
               new TSetTTLReq(setTTLPlan.getStorageGroup(), setTTLPlan.getTTL()),
-              handler,
-              dataNodeLocationMap);
+              dataNodeLocationMap,
+              DataNodeRequestType.SET_TTL,
+              null);
     }
 
     return getConsensusManager().write(setTTLPlan).getStatus();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 03219060df..855bc4ec7a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -55,13 +53,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 /** NodeManager manages cluster node addition and removal requests */
@@ -179,6 +174,7 @@ public class NodeManager {
   public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
     return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset();
   }
+
   /**
    * Only leader use this interface
    *
@@ -208,6 +204,18 @@ public class NodeManager {
     return nodeInfo.getRegisteredDataNodes(dataNodeId);
   }
 
+  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations(int dataNodeId) {
+    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+    nodeInfo
+        .getRegisteredDataNodes(dataNodeId)
+        .forEach(
+            dataNodeConfiguration ->
+                dataNodeLocations.put(
+                    dataNodeConfiguration.getLocation().getDataNodeId(),
+                    dataNodeConfiguration.getLocation()));
+    return dataNodeLocations;
+  }
+
   public List<TDataNodesInfo> getRegisteredDataNodesInfoList() {
     List<TDataNodesInfo> dataNodesLocations = new ArrayList<>();
     List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes(-1);
@@ -425,28 +433,13 @@ public class NodeManager {
   }
 
   public List<TSStatus> flush(TFlushReq req) {
-    List<TDataNodeConfiguration> registeredDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(req.dataNodeId);
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
     List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
-    CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
-    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
-    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
-    AtomicInteger index = new AtomicInteger();
-    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
-      handlerMap.put(
-          index.get(),
-          new FlushHandler(
-              dataNodeInfo.getLocation(),
-              countDownLatch,
-              DataNodeRequestType.FLUSH,
-              dataNodeResponseStatus,
-              dataNodeLocations,
-              index.get()));
-      dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
-    }
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
+        .sendAsyncRequestToDataNodeWithRetry(
+            req, dataNodeLocationMap, DataNodeRequestType.FLUSH, dataNodeResponseStatus);
     return dataNodeResponseStatus;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index f9c789d95d..32bf01f437 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -19,13 +19,10 @@
 
 package org.apache.iotdb.confignode.manager;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -39,12 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class UDFManager {
 
@@ -86,30 +79,18 @@ public class UDFManager {
 
   private List<TSStatus> createFunctionOnDataNodes(
       String functionName, String className, List<String> uris) {
-    final List<TDataNodeConfiguration> registeredDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1);
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
     final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
-    final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TCreateFunctionRequest request =
         new TCreateFunctionRequest(functionName, className, uris);
-    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
-    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
-    AtomicInteger index = new AtomicInteger(0);
-    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
-      handlerMap.put(
-          index.get(),
-          new FunctionManagementHandler(
-              countDownLatch,
-              dataNodeInfo.getLocation(),
-              dataNodeResponseStatus,
-              DataNodeRequestType.CREATE_FUNCTION,
-              dataNodeLocations,
-              index.get()));
-      dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
-    }
     AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.CREATE_FUNCTION,
+            dataNodeResponseStatus);
     return dataNodeResponseStatus;
   }
 
@@ -130,29 +111,17 @@ public class UDFManager {
   }
 
   private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
-    final List<TDataNodeConfiguration> registeredDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1);
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
     final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
-    final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
-    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
-    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
-    AtomicInteger index = new AtomicInteger(0);
-    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
-      handlerMap.put(
-          index.get(),
-          new FunctionManagementHandler(
-              countDownLatch,
-              dataNodeInfo.getLocation(),
-              dataNodeResponseStatus,
-              DataNodeRequestType.DROP_FUNCTION,
-              dataNodeLocations,
-              index.get()));
-      dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
-    }
     AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.DROP_FUNCTION,
+            dataNodeResponseStatus);
     return dataNodeResponseStatus;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b52b1cd55d..9886593212 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -35,10 +35,8 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
 import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -66,7 +64,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -299,30 +296,20 @@ public class LoadManager {
 
   private void broadcastLatestRegionRouteMap() {
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
-    List<TDataNodeConfiguration> onlineDataNodes = getOnlineDataNodes(-1);
-    CountDownLatch latch = new CountDownLatch(onlineDataNodes.size());
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+    getOnlineDataNodes(-1)
+        .forEach(
+            onlineDataNode ->
+                dataNodeLocationMap.put(
+                    onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
 
     LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
-    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
-    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
-    AtomicInteger index = new AtomicInteger();
-    onlineDataNodes.forEach(
-        dataNodeInfo -> {
-          handlerMap.put(
-              index.get(),
-              new UpdateRegionRouteMapHandler(
-                  dataNodeInfo.getLocation(),
-                  latch,
-                  DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
-                  dataNodeLocations,
-                  index.get()));
-          dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
-        });
     AsyncDataNodeClientPool.getInstance()
         .sendAsyncRequestToDataNodeWithRetry(
             new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
-            handlerMap,
-            dataNodeLocations);
+            dataNodeLocationMap,
+            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+            null);
     LOGGER.info("Broadcast the latest RegionRouteMap finished.");
   }