You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/07/16 06:26:42 UTC

[iotdb] branch master updated: [IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d0d298e986 [IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)
d0d298e986 is described below

commit d0d298e98673b44154a54c454abde973ee956a3b
Author: 任宇华 <79...@users.noreply.github.com>
AuthorDate: Sat Jul 16 14:26:37 2022 +0800

    [IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)
    
    [IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)
---
 .../confignode/client/AsyncDataNodeClientPool.java | 343 ---------------------
 .../confignode/client/ConfigNodeRequestType.java   |  10 +-
 .../confignode/client/DataNodeRequestType.java     |  21 +-
 .../confignode}/AsyncConfigNodeClientPool.java     |   4 +-
 .../async/datanode/AsyncDataNodeClientPool.java    | 283 +++++++++++++++++
 .../async/handlers/AbstractRetryHandler.java       |  75 +++++
 .../handlers/ConfigNodeHeartbeatHandler.java       |   2 +-
 .../{ => async}/handlers/CreateRegionHandler.java  |  43 ++-
 .../handlers/DataNodeHeartbeatHandler.java         |   2 +-
 .../client/{ => async}/handlers/FlushHandler.java  |  34 +-
 .../handlers/FunctionManagementHandler.java        |  37 ++-
 .../client/{ => async}/handlers/SetTTLHandler.java |  34 +-
 .../handlers/UpdateRegionRouteMapHandler.java      |  31 +-
 .../confignode}/SyncConfigNodeClientPool.java      |  15 +-
 .../datanode}/SyncDataNodeClientPool.java          |  23 +-
 .../confignode/conf/ConfigNodeRemoveCheck.java     |   6 +-
 .../confignode/manager/ClusterSchemaManager.java   |  41 ++-
 .../confignode/manager/DataNodeRemoveManager.java  |  15 +-
 .../iotdb/confignode/manager/NodeManager.java      |  36 ++-
 .../iotdb/confignode/manager/PartitionManager.java |   2 +-
 .../confignode/manager/PermissionManager.java      |   6 +-
 .../iotdb/confignode/manager/UDFManager.java       |  74 ++---
 .../iotdb/confignode/manager/load/LoadManager.java |  45 +--
 .../procedure/env/ConfigNodeProcedureEnv.java      |  20 +-
 .../iotdb/confignode/service/ConfigNode.java       |   6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   6 +-
 26 files changed, 651 insertions(+), 563 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
deleted file mode 100644
index 475f75c98c..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
-import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.handlers.FlushHandler;
-import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
-import org.apache.iotdb.confignode.client.handlers.SetTTLHandler;
-import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
-import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-
-/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
-public class AsyncDataNodeClientPool {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
-
-  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
-
-  private AsyncDataNodeClientPool() {
-    clientManager =
-        new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
-            .createClientManager(
-                new ConfigNodeClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
-  }
-
-  /**
-   * Execute CreateRegionsReq asynchronously
-   *
-   * @param createRegionGroupsPlan CreateRegionsReq
-   * @param ttlMap Map<StorageGroupName, TTL>
-   */
-  public void createRegions(
-      CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
-
-    // TODO: Unify retry logic
-
-    // Index of each Region
-    int index = 0;
-    // Number of regions to be created
-    int regionNum = 0;
-    // Map<TConsensusGroupId, Map<DataNodeId, index>>
-    Map<TConsensusGroupId, Map<Integer, Integer>> indexMap = new TreeMap<>();
-    // Assign an independent index to each Region
-    for (Map.Entry<String, List<TRegionReplicaSet>> entry :
-        createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
-      for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
-        regionNum += regionReplicaSet.getDataNodeLocationsSize();
-        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
-          indexMap
-              .computeIfAbsent(regionReplicaSet.getRegionId(), idMap -> new TreeMap<>())
-              .put(dataNodeLocation.getDataNodeId(), index);
-          index += 1;
-        }
-      }
-    }
-
-    BitSet bitSet = new BitSet(regionNum);
-    for (int retry = 0; retry < 3; retry++) {
-      CountDownLatch latch = new CountDownLatch(regionNum - bitSet.cardinality());
-      createRegionGroupsPlan
-          .getRegionGroupMap()
-          .forEach(
-              (storageGroup, regionReplicaSets) -> {
-                // Enumerate each RegionReplicaSet
-                regionReplicaSets.forEach(
-                    regionReplicaSet -> {
-                      // Enumerate each Region
-                      regionReplicaSet
-                          .getDataNodeLocations()
-                          .forEach(
-                              dataNodeLocation -> {
-                                // Skip those created successfully
-                                if (!bitSet.get(
-                                    indexMap
-                                        .get(regionReplicaSet.getRegionId())
-                                        .get(dataNodeLocation.getDataNodeId()))) {
-                                  TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
-                                  CreateRegionHandler handler =
-                                      new CreateRegionHandler(
-                                          indexMap
-                                              .get(regionReplicaSet.getRegionId())
-                                              .get(dataNodeLocation.getDataNodeId()),
-                                          bitSet,
-                                          latch,
-                                          regionReplicaSet.getRegionId(),
-                                          dataNodeLocation);
-
-                                  switch (regionReplicaSet.getRegionId().getType()) {
-                                    case SchemaRegion:
-                                      createSchemaRegion(
-                                          endPoint,
-                                          genCreateSchemaRegionReq(storageGroup, regionReplicaSet),
-                                          handler);
-                                      break;
-                                    case DataRegion:
-                                      createDataRegion(
-                                          endPoint,
-                                          genCreateDataRegionReq(
-                                              storageGroup,
-                                              regionReplicaSet,
-                                              ttlMap.get(storageGroup)),
-                                          handler);
-                                  }
-                                }
-                              });
-                    });
-              });
-
-      try {
-        // Waiting until this batch of create requests done
-        latch.await();
-      } catch (InterruptedException e) {
-        LOGGER.error("ClusterSchemaManager was interrupted during create Regions on DataNodes", e);
-      }
-
-      if (bitSet.cardinality() == regionNum) {
-        // Break if all creations success
-        break;
-      }
-    }
-
-    if (bitSet.cardinality() < regionNum) {
-      LOGGER.error(
-          "Failed to create some SchemaRegions or DataRegions on DataNodes. Please check former logs.");
-    }
-  }
-
-  private TCreateSchemaRegionReq genCreateSchemaRegionReq(
-      String storageGroup, TRegionReplicaSet regionReplicaSet) {
-    // TODO: Add a retry logic
-    TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
-    req.setStorageGroup(storageGroup);
-    req.setRegionReplicaSet(regionReplicaSet);
-    return req;
-  }
-
-  /**
-   * Create a SchemaRegion on specific DataNode
-   *
-   * @param endPoint The specific DataNode
-   */
-  private void createSchemaRegion(
-      TEndPoint endPoint, TCreateSchemaRegionReq req, CreateRegionHandler handler) {
-    // TODO: Add a retry logic
-    AsyncDataNodeInternalServiceClient client;
-    try {
-      client = clientManager.borrowClient(endPoint);
-      client.createSchemaRegion(req, handler);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-    } catch (TException e) {
-      LOGGER.error("Create SchemaRegion on DataNode {} failed", endPoint, e);
-    }
-  }
-
-  private TCreateDataRegionReq genCreateDataRegionReq(
-      String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
-    // TODO: Add a retry logic
-    TCreateDataRegionReq req = new TCreateDataRegionReq();
-    req.setStorageGroup(storageGroup);
-    req.setRegionReplicaSet(regionReplicaSet);
-    req.setTtl(TTL);
-    return req;
-  }
-
-  /**
-   * Create a DataRegion on specific DataNode
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void createDataRegion(
-      TEndPoint endPoint, TCreateDataRegionReq req, CreateRegionHandler handler) {
-    // TODO: Add a retry logic
-    AsyncDataNodeInternalServiceClient client;
-    try {
-      client = clientManager.borrowClient(endPoint);
-      client.createDataRegion(req, handler);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-    } catch (TException e) {
-      LOGGER.error("Create DataRegion on DataNode {} failed", endPoint, e);
-    }
-  }
-
-  /**
-   * Only used in LoadManager
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void getDataNodeHeartBeat(
-      TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) {
-    // TODO: Add a retry logic
-    AsyncDataNodeInternalServiceClient client;
-    try {
-      client = clientManager.borrowClient(endPoint);
-      client.getDataNodeHeartBeat(req, handler);
-    } catch (Exception e) {
-      LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
-    }
-  }
-
-  /**
-   * Always call this interface when a DataNode is restarted or removed
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void resetClient(TEndPoint endPoint) {
-    clientManager.clear(endPoint);
-  }
-
-  /**
-   * Only used in UDFManager
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void createFunction(
-      TEndPoint endPoint, TCreateFunctionRequest request, FunctionManagementHandler handler) {
-    // TODO: Add a retry logic
-    try {
-      clientManager.borrowClient(endPoint).createFunction(request, handler);
-    } catch (Exception e) {
-      LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e);
-    }
-  }
-
-  /**
-   * Only used in UDFManager
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void dropFunction(
-      TEndPoint endPoint, TDropFunctionRequest request, FunctionManagementHandler handler) {
-    // TODO: Add a retry logic
-    try {
-      clientManager.borrowClient(endPoint).dropFunction(request, handler);
-    } catch (Exception e) {
-      LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e);
-    }
-  }
-
-  /**
-   * Flush on specific DataNode
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void flush(TEndPoint endPoint, TFlushReq flushReq, FlushHandler handler) {
-    // TODO: Add a retry logic
-    try {
-      clientManager.borrowClient(endPoint).flush(flushReq, handler);
-    } catch (Exception e) {
-      LOGGER.error("Failed to asking DataNode to flush: {}", endPoint, e);
-    }
-  }
-
-  /**
-   * Set TTL on specific DataNode
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void setTTL(TEndPoint endPoint, TSetTTLReq setTTLReq, SetTTLHandler handler) {
-    // TODO: Add a retry logic
-    try {
-      clientManager.borrowClient(endPoint).setTTL(setTTLReq, handler);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-    } catch (TException e) {
-      LOGGER.error("Set TTL on DataNode {} failed", endPoint, e);
-    }
-  }
-
-  /**
-   * Update the RegionRouteMap cache on specific DataNode
-   *
-   * @param endPoint The specificDataNode
-   */
-  public void updateRegionRouteMap(
-      TEndPoint endPoint, TRegionRouteReq regionRouteReq, UpdateRegionRouteMapHandler handler) {
-    // TODO: Add a retry logic
-    try {
-      clientManager.borrowClient(endPoint).updateRegionCache(regionRouteReq, handler);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-    } catch (TException e) {
-      LOGGER.error("Update RegionRouteMap on DataNode {} failed", endPoint, e);
-    }
-  }
-
-  // TODO: Is the ClientPool must be a singleton?
-  private static class ClientPoolHolder {
-
-    private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
-
-    private ClientPoolHolder() {
-      // Empty constructor
-    }
-  }
-
-  public static AsyncDataNodeClientPool getInstance() {
-    return ClientPoolHolder.INSTANCE;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index c2592ceb56..d547759700 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.confignode.client;
 
 public enum ConfigNodeRequestType {
-  addConsensusGroup,
-  notifyRegisterSuccess,
-  registerConfigNode,
-  removeConfigNode,
-  stopConfigNode;
+  ADD_CONSENSUS_GROUP,
+  NOTIFY_REGISTER_SUCCESS,
+  REGISTER_CONFIG_NODE,
+  REMOVE_CONFIG_NODE,
+  STOP_CONFIG_NODE;
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index be28c12a04..f9a96122a4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -20,11 +20,18 @@
 package org.apache.iotdb.confignode.client;
 
 public enum DataNodeRequestType {
-  deleteRegions,
-  invalidatePartitionCache,
-  invalidatePermissionCache,
-  invalidateSchemaCache,
-  migrateRegion,
-  disableDataNode,
-  stopDataNode;
+  DELETE_REGIONS,
+  INVALIDATE_PARTITION_CACHE,
+  INVALIDATE_PERMISSION_CACHE,
+  INVALIDATE_SCHEMA_CACHE,
+  MIGRATE_REGION,
+  DISABLE_DATA_NODE,
+  STOP_DATA_NODE,
+
+  SET_TTL,
+  CREATE_REGIONS,
+  CREATE_FUNCTION,
+  DROP_FUNCTION,
+  FLUSH,
+  UPDATE_REGION_ROUTE_MAP
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeClientPool.java
index 00e37493dd..620cdcc4f3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeClientPool.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client;
+package org.apache.iotdb.confignode.client.async.confignode;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 
 import org.slf4j.Logger;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
new file mode 100644
index 0000000000..7475ea3516
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.datanode;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.ConfigNodeClientPoolFactory;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
+import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
+public class AsyncDataNodeClientPool {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
+
+  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
+
+  private final int retryNum = 6;
+
+  private AsyncDataNodeClientPool() {
+    clientManager =
+        new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new ConfigNodeClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  /**
+   * Send asynchronize requests to the specific DataNodes, and reconnect the DataNode that failed to
+   * receive the requests
+   *
+   * @param req request
+   * @param handlerMap Map<index, Handler>
+   * @param dataNodeLocations ConcurrentHashMap<index, TDataNodeLocation> The specific DataNodes
+   */
+  public void sendAsyncRequestToDataNodeWithRetry(
+      Object req,
+      Map<Integer, AbstractRetryHandler> handlerMap,
+      Map<Integer, TDataNodeLocation> dataNodeLocations) {
+    CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
+    if (dataNodeLocations.isEmpty()) {
+      return;
+    }
+    for (int retry = 0; retry < retryNum; retry++) {
+      AbstractRetryHandler handler = null;
+      for (Map.Entry<Integer, TDataNodeLocation> entry : dataNodeLocations.entrySet()) {
+        handler = handlerMap.get(entry.getKey());
+        // If it is not the first request, then prove that this operation is a retry.
+        // The count of countDownLatch needs to be updated
+        if (retry != 0) {
+          handler.setCountDownLatch(countDownLatch);
+        }
+        // send request
+        sendAsyncRequestToDataNode(entry.getValue(), req, handler, retry);
+      }
+      try {
+        handler.getCountDownLatch().await();
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted during {} on ConfigNode", handler.getDataNodeRequestType());
+      }
+      // Check if there is a node that fails to send the request, and retry if there is one
+      if (!handler.getDataNodeLocations().isEmpty()) {
+        countDownLatch = new CountDownLatch(handler.getDataNodeLocations().size());
+      } else {
+        break;
+      }
+    }
+  }
+
+  public void sendAsyncRequestToDataNode(
+      TDataNodeLocation dataNodeLocation,
+      Object req,
+      AbstractRetryHandler handler,
+      int retryCount) {
+    AsyncDataNodeInternalServiceClient client;
+    try {
+      client = clientManager.borrowClient(dataNodeLocation.getInternalEndPoint());
+      switch (handler.getDataNodeRequestType()) {
+        case SET_TTL:
+          client.setTTL((TSetTTLReq) req, (SetTTLHandler) handler);
+          break;
+        case CREATE_REGIONS:
+          TConsensusGroupType regionType =
+              ((CreateRegionHandler) handler).getConsensusGroupId().getType();
+          if (regionType.equals(TConsensusGroupType.SchemaRegion)) {
+            client.createSchemaRegion(
+                (TCreateSchemaRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
+                (CreateRegionHandler) handler);
+          } else if (regionType.equals(TConsensusGroupType.DataRegion)) {
+            client.createDataRegion(
+                (TCreateDataRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
+                (CreateRegionHandler) handler);
+          }
+          break;
+        case CREATE_FUNCTION:
+          client.createFunction((TCreateFunctionRequest) req, (FunctionManagementHandler) handler);
+          break;
+        case DROP_FUNCTION:
+          client.dropFunction((TDropFunctionRequest) req, (FunctionManagementHandler) handler);
+          break;
+        case FLUSH:
+          client.flush((TFlushReq) req, (FlushHandler) handler);
+          break;
+        case UPDATE_REGION_ROUTE_MAP:
+          client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
+          break;
+        default:
+          return;
+      }
+    } catch (Exception e) {
+      LOGGER.warn(
+          "{} failed on ConfigNode {}, because {}, retrying {}...",
+          handler.getDataNodeRequestType(),
+          dataNodeLocation.getInternalEndPoint(),
+          e.getMessage(),
+          retryCount);
+    }
+  }
+
+  /**
+   * Execute CreateRegionsReq asynchronously
+   *
+   * @param createRegionGroupsPlan CreateRegionsReq
+   * @param ttlMap Map<StorageGroupName, TTL>
+   */
+  public void createRegions(
+      CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
+
+    // Number of regions to be created
+    int regionNum = 0;
+    // Assign an independent index to each Region
+    for (Map.Entry<String, List<TRegionReplicaSet>> entry :
+        createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+      for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
+        regionNum += regionReplicaSet.getDataNodeLocationsSize();
+      }
+    }
+    Map<Integer, AbstractRetryHandler> handlerMap = new ConcurrentHashMap<>();
+    ConcurrentHashMap<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+    Map<Integer, Object> req = new ConcurrentHashMap<>();
+    AtomicInteger index = new AtomicInteger();
+    CountDownLatch latch = new CountDownLatch(regionNum);
+    createRegionGroupsPlan
+        .getRegionGroupMap()
+        .forEach(
+            (storageGroup, regionReplicaSets) -> {
+              // Enumerate each RegionReplicaSet
+              regionReplicaSets.forEach(
+                  regionReplicaSet -> {
+                    // Enumerate each Region
+                    regionReplicaSet
+                        .getDataNodeLocations()
+                        .forEach(
+                            dataNodeLocation -> {
+                              handlerMap.put(
+                                  index.get(),
+                                  new CreateRegionHandler(
+                                      index.get(),
+                                      latch,
+                                      regionReplicaSet.getRegionId(),
+                                      dataNodeLocation,
+                                      dataNodeLocations));
+
+                              switch (regionReplicaSet.getRegionId().getType()) {
+                                case SchemaRegion:
+                                  req.put(
+                                      index.get(),
+                                      genCreateSchemaRegionReq(storageGroup, regionReplicaSet));
+                                  break;
+                                case DataRegion:
+                                  req.put(
+                                      index.get(),
+                                      genCreateDataRegionReq(
+                                          storageGroup,
+                                          regionReplicaSet,
+                                          ttlMap.get(storageGroup)));
+                              }
+                              dataNodeLocations.put(index.getAndIncrement(), dataNodeLocation);
+                            });
+                  });
+            });
+    sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
+  }
+
+  private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+      String storageGroup, TRegionReplicaSet regionReplicaSet) {
+    TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+    req.setStorageGroup(storageGroup);
+    req.setRegionReplicaSet(regionReplicaSet);
+    return req;
+  }
+
+  private TCreateDataRegionReq genCreateDataRegionReq(
+      String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+    TCreateDataRegionReq req = new TCreateDataRegionReq();
+    req.setStorageGroup(storageGroup);
+    req.setRegionReplicaSet(regionReplicaSet);
+    req.setTtl(TTL);
+    return req;
+  }
+
+  /**
+   * Only used in LoadManager
+   *
+   * @param endPoint
+   */
+  public void getDataNodeHeartBeat(
+      TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) {
+    // TODO: Add a retry logic
+    AsyncDataNodeInternalServiceClient client;
+    try {
+      client = clientManager.borrowClient(endPoint);
+      client.getDataNodeHeartBeat(req, handler);
+    } catch (Exception e) {
+      LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
+    }
+  }
+
+  /**
+   * Always call this interface when a DataNode is restarted or removed
+   *
+   * @param endPoint The specific DataNode
+   */
+  public void resetClient(TEndPoint endPoint) {
+    clientManager.clear(endPoint);
+  }
+
+  // TODO: Is the ClientPool must be a singleton?
+  private static class ClientPoolHolder {
+
+    private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
+
+    private ClientPoolHolder() {
+      // Empty constructor
+    }
+  }
+
+  public static AsyncDataNodeClientPool getInstance() {
+    return ClientPoolHolder.INSTANCE;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
new file mode 100644
index 0000000000..518036dd19
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public abstract class AbstractRetryHandler {
+
+  protected final int index;
+
+  protected CountDownLatch countDownLatch;
+  /**
+   * Map<Index, TDataNodeLocation> The DataNode that successfully execute the request will be
+   * removed from this map
+   */
+  protected Map<Integer, TDataNodeLocation> dataNodeLocations;
+
+  protected DataNodeRequestType dataNodeRequestType;
+  /** Target DataNode */
+  protected TDataNodeLocation targetDataNode;
+
+  public AbstractRetryHandler(
+      CountDownLatch countDownLatch,
+      DataNodeRequestType dataNodeRequestType,
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocations,
+      int index) {
+    this.countDownLatch = countDownLatch;
+    this.dataNodeLocations = dataNodeLocations;
+    this.dataNodeRequestType = dataNodeRequestType;
+    this.targetDataNode = targetDataNode;
+    this.index = index;
+  }
+
+  public void setCountDownLatch(CountDownLatch countDownLatch) {
+    this.countDownLatch = countDownLatch;
+  }
+
+  public CountDownLatch getCountDownLatch() {
+    return countDownLatch;
+  }
+
+  public Map<Integer, TDataNodeLocation> getDataNodeLocations() {
+    return dataNodeLocations;
+  }
+
+  public DataNodeRequestType getDataNodeRequestType() {
+    return dataNodeRequestType;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
index d177a13cad..7eca734b83 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
similarity index 75%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
index fbcca3e121..c1828b64ab 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
@@ -16,69 +16,58 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.BitSet;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 /** Only use CreateRegionHandler when the LoadManager wants to create Regions */
-public class CreateRegionHandler implements AsyncMethodCallback<TSStatus> {
+public class CreateRegionHandler extends AbstractRetryHandler
+    implements AsyncMethodCallback<TSStatus> {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionHandler.class);
 
-  // Mark BitSet when successfully create
-  private final int index;
-  private final BitSet bitSet;
-
-  // Used to protect asynchronous creation
-  private final CountDownLatch latch;
-
   // Used for Logger
   private final TConsensusGroupId consensusGroupId;
-  private final TDataNodeLocation dataNodeLocation;
 
   public CreateRegionHandler(
       int index,
-      BitSet bitSet,
       CountDownLatch latch,
       TConsensusGroupId consensusGroupId,
-      TDataNodeLocation dataNodeLocation) {
-    this.index = index;
-    this.bitSet = bitSet;
-    this.latch = latch;
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocations) {
+    super(latch, DataNodeRequestType.CREATE_REGIONS, targetDataNode, dataNodeLocations, index);
     this.consensusGroupId = consensusGroupId;
-    this.dataNodeLocation = dataNodeLocation;
   }
 
   @Override
   public void onComplete(TSStatus tsStatus) {
     if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      synchronized (bitSet) {
-        bitSet.set(index);
-      }
+      getDataNodeLocations().remove(index);
       LOGGER.info(
           String.format(
               "Successfully create %s on DataNode: %s",
-              ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), dataNodeLocation));
+              ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode));
     } else {
       LOGGER.error(
           String.format(
               "Create %s on DataNode: %s failed, %s",
               ConsensusGroupId.formatTConsensusGroupId(consensusGroupId),
-              dataNodeLocation,
+              targetDataNode,
               tsStatus));
     }
-    latch.countDown();
+    countDownLatch.countDown();
   }
 
   @Override
@@ -86,7 +75,11 @@ public class CreateRegionHandler implements AsyncMethodCallback<TSStatus> {
     LOGGER.error(
         String.format(
             "Create %s on DataNode: %s failed, %s",
-            ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), dataNodeLocation, e));
-    latch.countDown();
+            ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode, e));
+    countDownLatch.countDown();
+  }
+
+  public TConsensusGroupId getConsensusGroupId() {
+    return consensusGroupId;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index d6c02d3712..a148c1cc14 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FlushHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
similarity index 61%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FlushHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
index e45ec58525..f72cf7ee74 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FlushHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
@@ -16,37 +16,49 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-public class FlushHandler implements AsyncMethodCallback<TSStatus> {
+public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlushHandler.class);
 
-  private final TDataNodeLocation dataNodeLocation;
-  private final CountDownLatch countDownLatch;
   private final List<TSStatus> dataNodeResponseStatus;
 
   public FlushHandler(
-      TDataNodeLocation dataNodeLocation,
+      TDataNodeLocation targetDataNode,
       CountDownLatch countDownLatch,
-      List<TSStatus> dataNodeResponseStatus) {
-    this.dataNodeLocation = dataNodeLocation;
-    this.countDownLatch = countDownLatch;
+      DataNodeRequestType requestType,
+      List<TSStatus> dataNodeResponseStatus,
+      Map<Integer, TDataNodeLocation> dataNodeLocations,
+      int index) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
     this.dataNodeResponseStatus = dataNodeResponseStatus;
   }
 
   @Override
   public void onComplete(TSStatus response) {
+    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataNodeResponseStatus.add(response);
+      dataNodeLocations.remove(index);
+      LOGGER.info("Successfully Flush on DataNode: {}", targetDataNode);
+    } else {
+      LOGGER.error("Failed to Flush on DataNode {}, {}", dataNodeLocations, response);
+    }
     countDownLatch.countDown();
-    dataNodeResponseStatus.add(response);
   }
 
   @Override
@@ -57,9 +69,9 @@ public class FlushHandler implements AsyncMethodCallback<TSStatus> {
             RpcUtils.getStatus(
                 TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
                 "Flush error on DataNode: {id="
-                    + dataNodeLocation.getDataNodeId()
+                    + targetDataNode.getDataNodeId()
                     + ", internalEndPoint="
-                    + dataNodeLocation.getInternalEndPoint()
+                    + targetDataNode.getInternalEndPoint()
                     + "}"
                     + exception.getMessage())));
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
similarity index 53%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
index 90a4967e12..306cea3801 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
@@ -17,34 +17,48 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-public class FunctionManagementHandler implements AsyncMethodCallback<TSStatus> {
+public class FunctionManagementHandler extends AbstractRetryHandler
+    implements AsyncMethodCallback<TSStatus> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FunctionManagementHandler.class);
 
-  private final CountDownLatch countDownLatch;
   private final List<TSStatus> dataNodeResponseStatus;
-  private final String ip;
-  private final int port;
 
   public FunctionManagementHandler(
-      CountDownLatch countDownLatch, List<TSStatus> dataNodeResponseStatus, String ip, int port) {
-    this.countDownLatch = countDownLatch;
+      CountDownLatch countDownLatch,
+      TDataNodeLocation targetDataNode,
+      List<TSStatus> dataNodeResponseStatus,
+      DataNodeRequestType requestType,
+      Map<Integer, TDataNodeLocation> dataNodeLocations,
+      int index) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
     this.dataNodeResponseStatus = dataNodeResponseStatus;
-    this.ip = ip;
-    this.port = port;
   }
 
   @Override
   public void onComplete(TSStatus response) {
-    dataNodeResponseStatus.add(response);
+    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataNodeResponseStatus.add(response);
+      dataNodeLocations.remove(index);
+      LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
+    } else {
+      LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
+    }
     countDownLatch.countDown();
   }
 
@@ -52,7 +66,8 @@ public class FunctionManagementHandler implements AsyncMethodCallback<TSStatus>
   public void onError(Exception exception) {
     dataNodeResponseStatus.add(
         new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
-            .setMessage("DataNode[" + ip + ":" + port + "] " + exception.getMessage()));
+            .setMessage(targetDataNode + exception.getMessage()));
+    LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
     countDownLatch.countDown();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/SetTTLHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
similarity index 56%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/SetTTLHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
index c31efe1030..e4fb7d79e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/SetTTLHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
@@ -16,43 +16,47 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-public class SetTTLHandler implements AsyncMethodCallback<TSStatus> {
+public class SetTTLHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLHandler.class);
 
-  private final TDataNodeLocation dataNodeLocation;
-  private final CountDownLatch latch;
-
-  public SetTTLHandler(TDataNodeLocation dataNodeLocation, CountDownLatch latch) {
-    this.dataNodeLocation = dataNodeLocation;
-    this.latch = latch;
+  public SetTTLHandler(
+      CountDownLatch countDownLatch,
+      DataNodeRequestType requestType,
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocations,
+      int index) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
   }
 
   @Override
-  public void onComplete(TSStatus status) {
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.info("Successfully SetTTL on DataNode: {}", dataNodeLocation);
+  public void onComplete(TSStatus response) {
+    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      getDataNodeLocations().remove(index);
+      LOGGER.info("Successfully SetTTL on DataNode: {}", targetDataNode);
     } else {
-      LOGGER.error("Failed to SetTTL on DataNode: {}, {}", dataNodeLocation, status);
+      LOGGER.error("Failed to SetTTL on DataNode: {}, {}", targetDataNode, response);
     }
-    latch.countDown();
+    countDownLatch.countDown();
   }
 
   @Override
   public void onError(Exception e) {
-    latch.countDown();
-    LOGGER.error("Failed to SetTTL on DataNode: {}", dataNodeLocation);
+    countDownLatch.countDown();
+    LOGGER.error("Failed to SetTTL on DataNode: {}", targetDataNode);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
similarity index 68%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
index aa97397943..5448f5f2e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
@@ -16,43 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-public class UpdateRegionRouteMapHandler implements AsyncMethodCallback<TSStatus> {
+public class UpdateRegionRouteMapHandler extends AbstractRetryHandler
+    implements AsyncMethodCallback<TSStatus> {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
 
-  private final TDataNodeLocation dataNodeLocation;
-  private final CountDownLatch latch;
-
-  public UpdateRegionRouteMapHandler(TDataNodeLocation dataNodeLocation, CountDownLatch latch) {
-    this.dataNodeLocation = dataNodeLocation;
-    this.latch = latch;
+  public UpdateRegionRouteMapHandler(
+      TDataNodeLocation targetDataNode,
+      CountDownLatch countDownLatch,
+      DataNodeRequestType requestType,
+      Map<Integer, TDataNodeLocation> dataNodeLocations,
+      int index) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
   }
 
   @Override
   public void onComplete(TSStatus status) {
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", dataNodeLocation);
+      getDataNodeLocations().remove(targetDataNode);
+      LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", targetDataNode);
     } else {
-      LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
+      LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode);
     }
-    latch.countDown();
+    countDownLatch.countDown();
   }
 
   @Override
   public void onError(Exception e) {
-    LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
-    latch.countDown();
+    LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode);
+    countDownLatch.countDown();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
index 514cd1eab1..2e607a7de0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client;
+package org.apache.iotdb.confignode.client.sync.confignode;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
@@ -65,24 +66,24 @@ public class SyncConfigNodeClientPool {
     }
   }
 
-  public Object sendSyncRequestToConfigNode(
+  public Object sendSyncRequestToConfigNodeWithRetry(
       TEndPoint endPoint, Object req, ConfigNodeRequestType requestType) {
     Throwable lastException = null;
     for (int retry = 0; retry < retryNum; retry++) {
       try (SyncConfigNodeIServiceClient client = clientManager.borrowClient(endPoint)) {
         switch (requestType) {
-          case registerConfigNode:
+          case REGISTER_CONFIG_NODE:
             // Only use registerConfigNode when the ConfigNode is first startup.
             return client.registerConfigNode((TConfigNodeRegisterReq) req);
-          case addConsensusGroup:
+          case ADD_CONSENSUS_GROUP:
             addConsensusGroup((List<TConfigNodeLocation>) req, client);
             return null;
-          case notifyRegisterSuccess:
+          case NOTIFY_REGISTER_SUCCESS:
             client.notifyRegisterSuccess();
             return null;
-          case removeConfigNode:
+          case REMOVE_CONFIG_NODE:
             return removeConfigNode((TConfigNodeLocation) req, client);
-          case stopConfigNode:
+          case STOP_CONFIG_NODE:
             // Only use stopConfigNode when the ConfigNode is removed.
             return client.stopConfigNode((TConfigNodeLocation) req);
           default:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
similarity index 93%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index 668cbc248c..3610d2403b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client;
+package org.apache.iotdb.confignode.client.sync.datanode;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -25,6 +25,8 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.ConfigNodeClientPoolFactory;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.mpp.rpc.thrift.TAddConsensusGroup;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
@@ -62,25 +64,25 @@ public class SyncDataNodeClientPool {
                 new ConfigNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
   }
 
-  public TSStatus sendSyncRequestToDataNode(
+  public TSStatus sendSyncRequestToDataNodeWithRetry(
       TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
     Throwable lastException = null;
     for (int retry = 0; retry < retryNum; retry++) {
       try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
         switch (requestType) {
-          case invalidatePartitionCache:
+          case INVALIDATE_PARTITION_CACHE:
             return client.invalidatePartitionCache((TInvalidateCacheReq) req);
-          case invalidateSchemaCache:
+          case INVALIDATE_SCHEMA_CACHE:
             return client.invalidateSchemaCache((TInvalidateCacheReq) req);
-          case deleteRegions:
+          case DELETE_REGIONS:
             return client.deleteRegion((TConsensusGroupId) req);
-          case invalidatePermissionCache:
+          case INVALIDATE_PERMISSION_CACHE:
             return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
-          case migrateRegion:
+          case MIGRATE_REGION:
             return client.migrateRegion((TMigrateRegionReq) req);
-          case disableDataNode:
+          case DISABLE_DATA_NODE:
             return client.disableDataNode((TDisableDataNodeReq) req);
-          case stopDataNode:
+          case STOP_DATA_NODE:
             return client.stopDataNode();
           default:
             return RpcUtils.getStatus(
@@ -125,7 +127,8 @@ public class SyncDataNodeClientPool {
     for (TConsensusGroupId regionId : regionIds) {
       LOGGER.debug("Delete region {} ", regionId);
       final TSStatus status =
-          sendSyncRequestToDataNode(endPoint, regionId, DataNodeRequestType.deleteRegions);
+          sendSyncRequestToDataNodeWithRetry(
+              endPoint, regionId, DataNodeRequestType.DELETE_REGIONS);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         LOGGER.info("DELETE Region {} successfully", regionId);
         deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
index cc53114cb1..894e712374 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -77,10 +77,10 @@ public class ConfigNodeRemoveCheck {
       status =
           (TSStatus)
               SyncConfigNodeClientPool.getInstance()
-                  .sendSyncRequestToConfigNode(
+                  .sendSyncRequestToConfigNodeWithRetry(
                       configNodeLocation.getInternalEndPoint(),
                       nodeLocation,
-                      ConfigNodeRequestType.removeConfigNode);
+                      ConfigNodeRequestType.REMOVE_CONFIG_NODE);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         break;
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 4c2302d3a7..ab88c4add0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -24,8 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetNodesInSchemaTemplatePlan;
@@ -61,10 +63,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** The ClusterSchemaManager Manages cluster schema read and write requests. */
 public class ClusterSchemaManager {
@@ -163,23 +168,27 @@ public class ClusterSchemaManager {
             .getStorageGroupRelatedDataNodes(
                 setTTLPlan.getStorageGroup(), TConsensusGroupType.DataRegion);
     if (dataNodeLocations.size() > 0) {
+      CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
+      Map<Integer, AbstractRetryHandler> handler = new HashMap<>();
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+      AtomicInteger index = new AtomicInteger();
       // TODO: Use procedure to protect SetTTL on DataNodes
-      CountDownLatch latch = new CountDownLatch(dataNodeLocations.size());
       for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
-        SetTTLHandler handler = new SetTTLHandler(dataNodeLocation, latch);
-        AsyncDataNodeClientPool.getInstance()
-            .setTTL(
-                dataNodeLocation.getInternalEndPoint(),
-                new TSetTTLReq(setTTLPlan.getStorageGroup(), setTTLPlan.getTTL()),
-                handler);
-      }
-
-      try {
-        // Waiting until this batch of SetTTL requests done
-        latch.await();
-      } catch (InterruptedException e) {
-        LOGGER.error("ClusterSchemaManager was interrupted during SetTTL on DataNodes", e);
+        handler.put(
+            index.get(),
+            new SetTTLHandler(
+                countDownLatch,
+                DataNodeRequestType.SET_TTL,
+                dataNodeLocation,
+                dataNodeLocationMap,
+                index.get()));
+        dataNodeLocationMap.put(index.getAndIncrement(), dataNodeLocation);
       }
+      AsyncDataNodeClientPool.getInstance()
+          .sendAsyncRequestToDataNodeWithRetry(
+              new TSetTTLReq(setTTLPlan.getStorageGroup(), setTTLPlan.getTTL()),
+              handler,
+              dataNodeLocationMap);
     }
 
     return getConsensusManager().write(setTTLPlan).getStatus();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
index d2756389d9..010beee521 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.enums.DataNodeRemoveState;
 import org.apache.iotdb.commons.enums.RegionMigrateState;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
 import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
@@ -364,7 +364,8 @@ public class DataNodeRemoveManager {
       TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
       status =
           SyncDataNodeClientPool.getInstance()
-              .sendSyncRequestToDataNode(server, disableReq, DataNodeRequestType.disableDataNode);
+              .sendSyncRequestToDataNodeWithRetry(
+                  server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
       if (!isSucceed(status)) {
         return status;
       }
@@ -505,8 +506,8 @@ public class DataNodeRemoveManager {
     migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
     status =
         SyncDataNodeClientPool.getInstance()
-            .sendSyncRequestToDataNode(
-                node.getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.migrateRegion);
+            .sendSyncRequestToDataNodeWithRetry(
+                node.getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.MIGRATE_REGION);
     // maybe send rpc failed
     if (isFailed(status)) {
       return status;
@@ -648,8 +649,8 @@ public class DataNodeRemoveManager {
     AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
     TSStatus status =
         SyncDataNodeClientPool.getInstance()
-            .sendSyncRequestToDataNode(
-                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.stopDataNode);
+            .sendSyncRequestToDataNodeWithRetry(
+                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
     LOGGER.info("stop Data Node {} result: {}", dataNode, status);
     return status;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 4e01617060..1e726ca8a4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -25,8 +25,10 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodesInfo;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.FlushHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
@@ -53,10 +55,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 /** NodeManager manages cluster node addition and removal requests */
@@ -452,19 +458,23 @@ public class NodeManager {
     List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
     CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
+    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+    AtomicInteger index = new AtomicInteger();
     for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
-      AsyncDataNodeClientPool.getInstance()
-          .flush(
-              dataNodeInfo.getLocation().getInternalEndPoint(),
-              req,
-              new FlushHandler(dataNodeInfo.getLocation(), countDownLatch, dataNodeResponseStatus));
-    }
-    try {
-      countDownLatch.await();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOGGER.error("NodeManager was interrupted during flushing on data nodes", e);
+      handlerMap.put(
+          index.get(),
+          new FlushHandler(
+              dataNodeInfo.getLocation(),
+              countDownLatch,
+              DataNodeRequestType.FLUSH,
+              dataNodeResponseStatus,
+              dataNodeLocations,
+              index.get()));
+      dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
     }
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
     return dataNodeResponseStatus;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 428a9fa5f4..bd95b0e155 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index d6d5da8bc1..0fdbae755b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
 import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
@@ -107,10 +107,10 @@ public class PermissionManager {
     for (TDataNodeInfo dataNodeInfo : allDataNodes) {
       status =
           SyncDataNodeClientPool.getInstance()
-              .sendSyncRequestToDataNode(
+              .sendSyncRequestToDataNodeWithRetry(
                   dataNodeInfo.getLocation().getInternalEndPoint(),
                   req,
-                  DataNodeRequestType.invalidatePermissionCache);
+                  DataNodeRequestType.INVALIDATE_PERMISSION_CACHE);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 22b8f112d8..68daf6461a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -37,8 +39,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class UDFManager {
 
@@ -87,24 +93,23 @@ public class UDFManager {
     final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
     final TCreateFunctionRequest request =
         new TCreateFunctionRequest(functionName, className, uris);
-
+    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+    AtomicInteger index = new AtomicInteger(0);
     for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
-      final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint();
-      AsyncDataNodeClientPool.getInstance()
-          .createFunction(
-              endPoint,
-              request,
-              new FunctionManagementHandler(
-                  countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
+      handlerMap.put(
+          index.get(),
+          new FunctionManagementHandler(
+              countDownLatch,
+              dataNodeInfo.getLocation(),
+              dataNodeResponseStatus,
+              DataNodeRequestType.CREATE_FUNCTION,
+              dataNodeLocations,
+              index.get()));
+      dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
     }
-
-    try {
-      countDownLatch.await();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOGGER.error("UDFManager was interrupted during creating functions on data nodes", e);
-    }
-
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
     return dataNodeResponseStatus;
   }
 
@@ -131,24 +136,23 @@ public class UDFManager {
         Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
     final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
     final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
-
+    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+    AtomicInteger index = new AtomicInteger(0);
     for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
-      final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint();
-      AsyncDataNodeClientPool.getInstance()
-          .dropFunction(
-              endPoint,
-              request,
-              new FunctionManagementHandler(
-                  countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
+      handlerMap.put(
+          index.get(),
+          new FunctionManagementHandler(
+              countDownLatch,
+              dataNodeInfo.getLocation(),
+              dataNodeResponseStatus,
+              DataNodeRequestType.DROP_FUNCTION,
+              dataNodeLocations,
+              index.get()));
+      dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
     }
-
-    try {
-      countDownLatch.await();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOGGER.error("UDFManager was interrupted during dropping functions on data nodes", e);
-    }
-
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
     return dataNodeResponseStatus;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 348744ff8c..6beed810d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -31,11 +32,13 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.client.AsyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -300,20 +303,26 @@ public class LoadManager {
     CountDownLatch latch = new CountDownLatch(onlineDataNodes.size());
 
     LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
-
+    Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+    AtomicInteger index = new AtomicInteger();
     onlineDataNodes.forEach(
-        dataNodeInfo ->
-            AsyncDataNodeClientPool.getInstance()
-                .updateRegionRouteMap(
-                    dataNodeInfo.getLocation().getInternalEndPoint(),
-                    new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
-                    new UpdateRegionRouteMapHandler(dataNodeInfo.getLocation(), latch)));
-
-    try {
-      latch.await();
-    } catch (InterruptedException e) {
-      LOGGER.warn("Broadcast the latest RegionRouteMap was interrupted!");
-    }
+        dataNodeInfo -> {
+          handlerMap.put(
+              index.get(),
+              new UpdateRegionRouteMapHandler(
+                  dataNodeInfo.getLocation(),
+                  latch,
+                  DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+                  dataNodeLocations,
+                  index.get()));
+          dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
+        });
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
+            handlerMap,
+            dataNodeLocations);
     LOGGER.info("Broadcast the latest RegionRouteMap finished.");
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 1a6986813f..631a31cf03 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.exception.AddPeerException;
@@ -115,16 +115,16 @@ public class ConfigNodeProcedureEnv {
     for (TDataNodeInfo dataNodeInfo : allDataNodes) {
       final TSStatus invalidateSchemaStatus =
           SyncDataNodeClientPool.getInstance()
-              .sendSyncRequestToDataNode(
+              .sendSyncRequestToDataNodeWithRetry(
                   dataNodeInfo.getLocation().getInternalEndPoint(),
                   invalidateCacheReq,
-                  DataNodeRequestType.invalidateSchemaCache);
+                  DataNodeRequestType.INVALIDATE_SCHEMA_CACHE);
       final TSStatus invalidatePartitionStatus =
           SyncDataNodeClientPool.getInstance()
-              .sendSyncRequestToDataNode(
+              .sendSyncRequestToDataNodeWithRetry(
                   dataNodeInfo.getLocation().getInternalEndPoint(),
                   invalidateCacheReq,
-                  DataNodeRequestType.invalidatePartitionCache);
+                  DataNodeRequestType.INVALIDATE_PARTITION_CACHE);
       if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
         LOG.error(
             "Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}",
@@ -151,10 +151,10 @@ public class ConfigNodeProcedureEnv {
         new ArrayList<>(configManager.getNodeManager().getRegisteredConfigNodes());
     configNodeLocations.add(tConfigNodeLocation);
     SyncConfigNodeClientPool.getInstance()
-        .sendSyncRequestToConfigNode(
+        .sendSyncRequestToConfigNodeWithRetry(
             tConfigNodeLocation.getInternalEndPoint(),
             configNodeLocations,
-            ConfigNodeRequestType.addConsensusGroup);
+            ConfigNodeRequestType.ADD_CONSENSUS_GROUP);
   }
 
   /**
@@ -183,10 +183,10 @@ public class ConfigNodeProcedureEnv {
    */
   public void notifyRegisterSuccess(TConfigNodeLocation configNodeLocation) {
     SyncConfigNodeClientPool.getInstance()
-        .sendSyncRequestToConfigNode(
+        .sendSyncRequestToConfigNodeWithRetry(
             configNodeLocation.getInternalEndPoint(),
             null,
-            ConfigNodeRequestType.notifyRegisterSuccess);
+            ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
   }
 
   public ReentrantLock getAddConfigNodeLock() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 2b39975316..3aa8035c17 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -192,8 +192,8 @@ public class ConfigNode implements ConfigNodeMBean {
       TConfigNodeRegisterResp resp =
           (TConfigNodeRegisterResp)
               SyncConfigNodeClientPool.getInstance()
-                  .sendSyncRequestToConfigNode(
-                      targetConfigNode, req, ConfigNodeRequestType.registerConfigNode);
+                  .sendSyncRequestToConfigNodeWithRetry(
+                      targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
       if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         conf.setPartitionRegionId(resp.getPartitionRegionId().getId());
         break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 118c98a2ec..298c287d7f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
@@ -461,10 +461,10 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
       status =
           (TSStatus)
               SyncConfigNodeClientPool.getInstance()
-                  .sendSyncRequestToConfigNode(
+                  .sendSyncRequestToConfigNodeWithRetry(
                       configNodeLocation.getInternalEndPoint(),
                       configNodeLocation,
-                      ConfigNodeRequestType.stopConfigNode);
+                      ConfigNodeRequestType.STOP_CONFIG_NODE);
     }
 
     // Print log to record the ConfigNode that performs the RemoveConfigNodeRequest