You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/11 02:12:54 UTC

[iotdb] branch master updated: [IOTDB-4396] Refactor AbstractRetryHandler in ConfigNode (#7534)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e8bbc70c5b [IOTDB-4396] Refactor AbstractRetryHandler in ConfigNode (#7534)
e8bbc70c5b is described below

commit e8bbc70c5be333284c41110f88d8466b220a7e59
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Oct 11 10:12:48 2022 +0800

    [IOTDB-4396] Refactor AbstractRetryHandler in ConfigNode (#7534)
---
 .../AsyncConfigNodeHeartbeatClientPool.java        |   4 +-
 .../client/async/AsyncDataNodeClientPool.java      | 284 +++++++++++
 .../AsyncDataNodeHeartbeatClientPool.java          |   4 +-
 .../async/datanode/AsyncDataNodeClientPool.java    | 545 ---------------------
 .../async/handlers/AbstractRetryHandler.java       |  75 ---
 .../client/async/handlers/AsyncClientHandler.java  | 201 ++++++++
 .../client/async/handlers/ClearCacheHandler.java   |  81 ---
 .../handlers/ConstructSchemaBlackListHandler.java  |  95 ----
 .../client/async/handlers/CreateRegionHandler.java |  91 ----
 .../DeleteDataForDeleteTimeSeriesHandler.java      | 103 ----
 .../async/handlers/DeleteTimeSeriesHandler.java    |  93 ----
 .../async/handlers/FunctionManagementHandler.java  |  72 ---
 .../InvalidateMatchedSchemaCacheHandler.java       |  95 ----
 .../async/handlers/LoadConfigurationHandler.java   |  82 ----
 .../client/async/handlers/MergeHandler.java        |  83 ----
 .../handlers/RollbackSchemaBlackListHandler.java   |  99 ----
 .../async/handlers/SetSystemStatusHandler.java     |  82 ----
 .../client/async/handlers/SetTTLHandler.java       |  61 ---
 .../async/handlers/TriggerManagementHandler.java   |  72 ---
 .../handlers/UpdateConfigNodeGroupHandler.java     |  66 ---
 .../handlers/UpdateRegionRouteMapHandler.java      |  63 ---
 .../ConfigNodeHeartbeatHandler.java                |   2 +-
 .../{ => heartbeat}/DataNodeHeartbeatHandler.java  |   2 +-
 .../handlers/rpc/AbstractAsyncRPCHandler.java      |  84 ++++
 .../AsyncTSStatusRPCHandler.java}                  |  64 +--
 .../handlers/rpc/DeleteTimeSeriesRPCHandler.java   |  84 ++++
 .../FetchSchemaBlackListRPCHandler.java}           |  66 +--
 .../client/async/task/AbstractDataNodeTask.java    |  53 --
 .../task/ConstructSchemaBlackListDataNodeTask.java |  45 --
 .../DeleteDataForDeleteTimeSeriesDataNodeTask.java |  46 --
 .../async/task/DeleteTimeSeriesDataNodeTask.java   |  45 --
 .../task/FetchSchemaBlackListDataNodeTask.java     |  46 --
 .../InvalidateMatchedSchemaCacheDataNodeTask.java  |  46 --
 .../task/RollbackSchemaBlackListDataNodeTask.java  |  45 --
 .../{confignode => }/SyncConfigNodeClientPool.java |   2 +-
 .../{datanode => }/SyncDataNodeClientPool.java     |   2 +-
 .../confignode/conf/ConfigNodeRemoveCheck.java     |   2 +-
 .../confignode/manager/ClusterSchemaManager.java   |  51 +-
 .../confignode/manager/PermissionManager.java      |   2 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  21 +-
 .../iotdb/confignode/manager/UDFManager.java       |  33 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  15 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  71 ++-
 .../manager/partition/PartitionManager.java        |   5 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      | 198 ++++++--
 .../procedure/env/DataNodeRemoveHandler.java       |   4 +-
 .../procedure/impl/CreateTriggerProcedure.java     |   1 +
 .../procedure/impl/DropTriggerProcedure.java       |   1 +
 .../impl/{ => node}/AbstractNodeProcedure.java     |   4 +-
 .../impl/{ => node}/AddConfigNodeProcedure.java    |   2 +-
 .../impl/{ => node}/RemoveConfigNodeProcedure.java |   2 +-
 .../impl/{ => node}/RemoveDataNodeProcedure.java   |   3 +-
 .../CreateRegionGroupsProcedure.java               |  37 +-
 .../DeleteStorageGroupProcedure.java               |   3 +-
 .../DeleteTimeSeriesProcedure.java                 | 111 ++---
 .../{ => statemachine}/RegionMigrateProcedure.java |   3 +-
 .../statemachine}/StateMachineProcedure.java       |   3 +-
 .../procedure/store/ProcedureFactory.java          |  14 +-
 .../iotdb/confignode/service/ConfigNode.java       |   2 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |   7 +-
 .../procedure/entity/SimpleSTMProcedure.java       |   2 +-
 .../procedure/entity/StuckSTMProcedure.java        |   2 +-
 .../impl/CreateRegionGroupsProcedureTest.java      |   5 +-
 .../impl/DeleteStorageGroupProcedureTest.java      |   1 +
 .../impl/DeleteTimeSeriesProcedureTest.java        |   1 +
 65 files changed, 1074 insertions(+), 2515 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeHeartbeatClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 37dd3a5555..8dd67da315 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.async.confignode;
+package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncConfigNodeHeartbeatServiceClient;
-import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
 
 public class AsyncConfigNodeHeartbeatClientPool {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
new file mode 100644
index 0000000000..98fe826e93
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteTimeSeriesRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
+public class AsyncDataNodeClientPool {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
+
+  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
+
+  private static final int MAX_RETRY_NUM = 6;
+
+  private AsyncDataNodeClientPool() {
+    clientManager =
+        new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  /**
+   * Send asynchronous requests to the specified DataNodes
+   *
+   * <p>Notice: The DataNodes that failed to receive the requests will be reconnected
+   *
+   * @param clientHandler <RequestType, ResponseType> which will also contain the result
+   */
+  public void sendAsyncRequestToDataNodeWithRetry(AsyncClientHandler<?, ?> clientHandler) {
+    if (clientHandler.getRequestIndices().isEmpty()) {
+      return;
+    }
+
+    DataNodeRequestType requestType = clientHandler.getRequestType();
+    for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
+      // Always Reset CountDownLatch first
+      clientHandler.resetCountDownLatch();
+
+      // Send requests to all targetDataNodes
+      for (int requestId : clientHandler.getRequestIndices()) {
+        TDataNodeLocation targetDataNode = clientHandler.getDataNodeLocation(requestId);
+        sendAsyncRequestToDataNode(clientHandler, requestId, targetDataNode, retry);
+      }
+
+      // Wait for this batch of asynchronous RPC requests finish
+      try {
+        clientHandler.getCountDownLatch().await();
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted during {} on ConfigNode", requestType);
+      }
+
+      // Check if there is a DataNode that fails to execute the request, and retry if there exists
+      if (clientHandler.getRequestIndices().isEmpty()) {
+        return;
+      }
+    }
+  }
+
+  private void sendAsyncRequestToDataNode(
+      AsyncClientHandler<?, ?> clientHandler,
+      int requestId,
+      TDataNodeLocation targetDataNode,
+      int retryCount) {
+
+    try {
+      AsyncDataNodeInternalServiceClient client;
+      client = clientManager.borrowClient(targetDataNode.getInternalEndPoint());
+
+      switch (clientHandler.getRequestType()) {
+        case SET_TTL:
+          client.setTTL(
+              (TSetTTLReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case CREATE_DATA_REGION:
+          client.createDataRegion(
+              (TCreateDataRegionReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case CREATE_SCHEMA_REGION:
+          client.createSchemaRegion(
+              (TCreateSchemaRegionReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case CREATE_FUNCTION:
+          client.createFunction(
+              (TCreateFunctionRequest) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case DROP_FUNCTION:
+          client.dropFunction(
+              (TDropFunctionRequest) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case CREATE_TRIGGER_INSTANCE:
+          client.createTriggerInstance(
+              (TCreateTriggerInstanceReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case DROP_TRIGGER_INSTANCE:
+          client.dropTriggerInstance(
+              (TDropTriggerInstanceReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case ACTIVE_TRIGGER_INSTANCE:
+          client.activeTriggerInstance(
+              (TActiveTriggerInstanceReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case INACTIVE_TRIGGER_INSTANCE:
+          client.inactiveTriggerInstance(
+              (TInactiveTriggerInstanceReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case MERGE:
+        case FULL_MERGE:
+          client.merge(
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case FLUSH:
+          client.flush(
+              (TFlushReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case CLEAR_CACHE:
+          client.clearCache(
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case LOAD_CONFIGURATION:
+          client.loadConfiguration(
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case SET_SYSTEM_STATUS:
+          client.setSystemStatus(
+              (String) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case UPDATE_REGION_ROUTE_MAP:
+          client.updateRegionCache(
+              (TRegionRouteReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+          client.updateConfigNodeGroup(
+              (TUpdateConfigNodeGroupReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case CONSTRUCT_SCHEMA_BLACK_LIST:
+          client.constructSchemaBlackList(
+              (TConstructSchemaBlackListReq) clientHandler.getRequest(requestId),
+              (DeleteTimeSeriesRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case ROLLBACK_SCHEMA_BLACK_LIST:
+          client.rollbackSchemaBlackList(
+              (TRollbackSchemaBlackListReq) clientHandler.getRequest(requestId),
+              (DeleteTimeSeriesRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case FETCH_SCHEMA_BLACK_LIST:
+          client.fetchSchemaBlackList(
+              (TFetchSchemaBlackListReq) clientHandler.getRequest(requestId),
+              (FetchSchemaBlackListRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case INVALIDATE_MATCHED_SCHEMA_CACHE:
+          client.invalidateMatchedSchemaCache(
+              (TInvalidateMatchedSchemaCacheReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case DELETE_DATA_FOR_DELETE_TIMESERIES:
+          client.deleteDataForDeleteTimeSeries(
+              (TDeleteDataForDeleteTimeSeriesReq) clientHandler.getRequest(requestId),
+              (DeleteTimeSeriesRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case DELETE_TIMESERIES:
+          client.deleteTimeSeries(
+              (TDeleteTimeSeriesReq) clientHandler.getRequest(requestId),
+              (DeleteTimeSeriesRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        default:
+          LOGGER.error(
+              "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
+              clientHandler.getRequestType());
+      }
+    } catch (Exception e) {
+      LOGGER.warn(
+          "{} failed on DataNode {}, because {}, retrying {}...",
+          clientHandler.getRequestType(),
+          targetDataNode.getInternalEndPoint(),
+          e.getMessage(),
+          retryCount);
+    }
+  }
+
+  /**
+   * Always call this interface when a DataNode is restarted or removed
+   *
+   * @param endPoint The specific DataNode
+   */
+  public void resetClient(TEndPoint endPoint) {
+    clientManager.clear(endPoint);
+  }
+
+  // TODO: Is the ClientPool must be a singleton?
+  private static class ClientPoolHolder {
+
+    private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
+
+    private ClientPoolHolder() {
+      // Empty constructor
+    }
+  }
+
+  public static AsyncDataNodeClientPool getInstance() {
+    return ClientPoolHolder.INSTANCE;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeHeartbeatClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index b1a51a505e..a42616c722 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.async.datanode;
+package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeHeartbeatServiceClient;
-import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 
 /** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
deleted file mode 100644
index 746fed9db4..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.datanode;
-
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.ClearCacheHandler;
-import org.apache.iotdb.confignode.client.async.handlers.ConstructSchemaBlackListHandler;
-import org.apache.iotdb.confignode.client.async.handlers.CreateRegionHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteDataForDeleteTimeSeriesHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteTimeSeriesHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FetchSchemaBlackLsitHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
-import org.apache.iotdb.confignode.client.async.handlers.InvalidateMatchedSchemaCacheHandler;
-import org.apache.iotdb.confignode.client.async.handlers.LoadConfigurationHandler;
-import org.apache.iotdb.confignode.client.async.handlers.MergeHandler;
-import org.apache.iotdb.confignode.client.async.handlers.RollbackSchemaBlackListHandler;
-import org.apache.iotdb.confignode.client.async.handlers.SetSystemStatusHandler;
-import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
-import org.apache.iotdb.confignode.client.async.handlers.TriggerManagementHandler;
-import org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
-import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
-import org.apache.iotdb.confignode.client.async.task.AbstractDataNodeTask;
-import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
-public class AsyncDataNodeClientPool {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
-
-  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
-
-  private static final int MAX_RETRY_NUM = 6;
-
-  private AsyncDataNodeClientPool() {
-    clientManager =
-        new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
-            .createClientManager(
-                new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
-  }
-
-  /**
-   * Send asynchronize requests to the specific DataNodes, and reconnect the DataNode that failed to
-   * receive the requests
-   *
-   * @param req request
-   * @param dataNodeLocationMap Map<DataNodeId, TDataNodeLocation>
-   * @param requestType DataNodeRequestType
-   * @param dataNodeResponseStatus response list.Used by CREATE_FUNCTION,DROP_FUNCTION and FLUSH
-   */
-  public void sendAsyncRequestToDataNodeWithRetry(
-      Object req,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      DataNodeRequestType requestType,
-      List<TSStatus> dataNodeResponseStatus) {
-    if (dataNodeLocationMap.isEmpty()) {
-      return;
-    }
-    for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
-      CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
-      for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
-        AbstractRetryHandler handler;
-        switch (requestType) {
-          case SET_TTL:
-            handler =
-                new SetTTLHandler(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-            break;
-          case CREATE_FUNCTION:
-          case DROP_FUNCTION:
-            handler =
-                new FunctionManagementHandler(
-                    countDownLatch,
-                    requestType,
-                    targetDataNode,
-                    dataNodeLocationMap,
-                    dataNodeResponseStatus);
-            break;
-          case CREATE_TRIGGER_INSTANCE:
-          case DROP_TRIGGER_INSTANCE:
-          case ACTIVE_TRIGGER_INSTANCE:
-          case INACTIVE_TRIGGER_INSTANCE:
-            handler =
-                new TriggerManagementHandler(
-                    countDownLatch,
-                    requestType,
-                    targetDataNode,
-                    dataNodeLocationMap,
-                    dataNodeResponseStatus);
-            break;
-          case FULL_MERGE:
-          case MERGE:
-            handler =
-                new MergeHandler(
-                    countDownLatch,
-                    requestType,
-                    targetDataNode,
-                    dataNodeLocationMap,
-                    dataNodeResponseStatus);
-            break;
-          case FLUSH:
-            handler =
-                new FlushHandler(
-                    countDownLatch,
-                    requestType,
-                    targetDataNode,
-                    dataNodeLocationMap,
-                    dataNodeResponseStatus);
-            break;
-          case CLEAR_CACHE:
-            handler =
-                new ClearCacheHandler(
-                    countDownLatch,
-                    requestType,
-                    targetDataNode,
-                    dataNodeLocationMap,
-                    dataNodeResponseStatus);
-            break;
-          case LOAD_CONFIGURATION:
-            handler =
-                new LoadConfigurationHandler(
-                    countDownLatch,
-                    requestType,
-                    targetDataNode,
-                    dataNodeLocationMap,
-                    dataNodeResponseStatus);
-            break;
-          case SET_SYSTEM_STATUS:
-            handler =
-                new SetSystemStatusHandler(
-                    countDownLatch,
-                    requestType,
-                    targetDataNode,
-                    dataNodeLocationMap,
-                    dataNodeResponseStatus);
-            break;
-          case UPDATE_REGION_ROUTE_MAP:
-            handler =
-                new UpdateRegionRouteMapHandler(
-                    countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-            break;
-          case BROADCAST_LATEST_CONFIG_NODE_GROUP:
-            handler =
-                new UpdateConfigNodeGroupHandler(
-                    countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-            break;
-          case CONSTRUCT_SCHEMA_BLACK_LIST:
-            handler =
-                new ConstructSchemaBlackListHandler(
-                    countDownLatch, targetDataNode, dataNodeLocationMap);
-            break;
-          case ROLLBACK_SCHEMA_BLACK_LIST:
-            handler =
-                new RollbackSchemaBlackListHandler(
-                    countDownLatch, targetDataNode, dataNodeLocationMap);
-            break;
-          case FETCH_SCHEMA_BLACK_LIST:
-            handler =
-                new FetchSchemaBlackLsitHandler(
-                    countDownLatch, targetDataNode, dataNodeLocationMap);
-            break;
-          case INVALIDATE_MATCHED_SCHEMA_CACHE:
-            handler =
-                new InvalidateMatchedSchemaCacheHandler(
-                    countDownLatch, targetDataNode, dataNodeLocationMap);
-            break;
-          case DELETE_DATA_FOR_DELETE_TIMESERIES:
-            handler =
-                new DeleteDataForDeleteTimeSeriesHandler(
-                    countDownLatch, targetDataNode, dataNodeLocationMap);
-            break;
-          case DELETE_TIMESERIES:
-            handler =
-                new DeleteTimeSeriesHandler(countDownLatch, targetDataNode, dataNodeLocationMap);
-            break;
-          default:
-            return;
-        }
-        sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
-      }
-      try {
-        countDownLatch.await();
-      } catch (InterruptedException e) {
-        LOGGER.error("Interrupted during {} on ConfigNode", requestType);
-      }
-      // Check if there is a node that fails to send the request, and retry if there is one
-      if (dataNodeLocationMap.isEmpty()) {
-        break;
-      }
-    }
-  }
-
-  public void sendAsyncRequestToDataNodeWithRetry(
-      Object req, AbstractDataNodeTask<?> dataNodeTask) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap = dataNodeTask.getDataNodeLocationMap();
-    if (dataNodeLocationMap.isEmpty()) {
-      return;
-    }
-    for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
-      CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
-      for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
-        AbstractRetryHandler handler = dataNodeTask.getSingleRequestHandler();
-        handler.setCountDownLatch(countDownLatch);
-        handler.setTargetDataNode(targetDataNode);
-        sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
-      }
-
-      try {
-        countDownLatch.await();
-      } catch (InterruptedException e) {
-        LOGGER.error("Interrupted during {} on ConfigNode", dataNodeTask.getDataNodeRequestType());
-      }
-      // Check if there is a node that fails to send the request, and retry if there is one
-      if (dataNodeLocationMap.isEmpty()) {
-        break;
-      }
-    }
-  }
-
-  public void sendAsyncRequestToDataNode(
-      TDataNodeLocation dataNodeLocation,
-      Object req,
-      AbstractRetryHandler handler,
-      int retryCount) {
-    AsyncDataNodeInternalServiceClient client;
-    try {
-      client = clientManager.borrowClient(dataNodeLocation.getInternalEndPoint());
-      switch (handler.getDataNodeRequestType()) {
-        case SET_TTL:
-          client.setTTL((TSetTTLReq) req, (SetTTLHandler) handler);
-          break;
-        case CREATE_DATA_REGION:
-          client.createDataRegion((TCreateDataRegionReq) req, (CreateRegionHandler) handler);
-          break;
-        case CREATE_SCHEMA_REGION:
-          client.createSchemaRegion((TCreateSchemaRegionReq) req, (CreateRegionHandler) handler);
-          break;
-        case CREATE_FUNCTION:
-          client.createFunction((TCreateFunctionRequest) req, (FunctionManagementHandler) handler);
-          break;
-        case DROP_FUNCTION:
-          client.dropFunction((TDropFunctionRequest) req, (FunctionManagementHandler) handler);
-          break;
-        case CREATE_TRIGGER_INSTANCE:
-          client.createTriggerInstance(
-              (TCreateTriggerInstanceReq) req, (TriggerManagementHandler) handler);
-          break;
-        case DROP_TRIGGER_INSTANCE:
-          client.dropTriggerInstance(
-              (TDropTriggerInstanceReq) req, (TriggerManagementHandler) handler);
-          break;
-        case ACTIVE_TRIGGER_INSTANCE:
-          client.activeTriggerInstance(
-              (TActiveTriggerInstanceReq) req, (TriggerManagementHandler) handler);
-          break;
-        case INACTIVE_TRIGGER_INSTANCE:
-          client.inactiveTriggerInstance(
-              (TInactiveTriggerInstanceReq) req, (TriggerManagementHandler) handler);
-          break;
-        case MERGE:
-        case FULL_MERGE:
-          client.merge((MergeHandler) handler);
-          break;
-        case FLUSH:
-          client.flush((TFlushReq) req, (FlushHandler) handler);
-          break;
-        case CLEAR_CACHE:
-          client.clearCache((ClearCacheHandler) handler);
-          break;
-        case LOAD_CONFIGURATION:
-          client.loadConfiguration((LoadConfigurationHandler) handler);
-          break;
-        case SET_SYSTEM_STATUS:
-          client.setSystemStatus((String) req, (SetSystemStatusHandler) handler);
-          break;
-        case UPDATE_REGION_ROUTE_MAP:
-          client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
-          break;
-        case BROADCAST_LATEST_CONFIG_NODE_GROUP:
-          client.updateConfigNodeGroup(
-              (TUpdateConfigNodeGroupReq) req, (UpdateConfigNodeGroupHandler) handler);
-          break;
-        case CONSTRUCT_SCHEMA_BLACK_LIST:
-          client.constructSchemaBlackList(
-              (TConstructSchemaBlackListReq) req, (ConstructSchemaBlackListHandler) handler);
-          break;
-        case ROLLBACK_SCHEMA_BLACK_LIST:
-          client.rollbackSchemaBlackList(
-              (TRollbackSchemaBlackListReq) req, (RollbackSchemaBlackListHandler) handler);
-          break;
-        case FETCH_SCHEMA_BLACK_LIST:
-          client.fetchSchemaBlackList(
-              (TFetchSchemaBlackListReq) req, (FetchSchemaBlackLsitHandler) handler);
-          break;
-        case INVALIDATE_MATCHED_SCHEMA_CACHE:
-          client.invalidateMatchedSchemaCache(
-              (TInvalidateMatchedSchemaCacheReq) req,
-              (InvalidateMatchedSchemaCacheHandler) handler);
-          break;
-        case DELETE_DATA_FOR_DELETE_TIMESERIES:
-          client.deleteDataForDeleteTimeSeries(
-              (TDeleteDataForDeleteTimeSeriesReq) req,
-              (DeleteDataForDeleteTimeSeriesHandler) handler);
-          break;
-        case DELETE_TIMESERIES:
-          client.deleteTimeSeries((TDeleteTimeSeriesReq) req, (DeleteTimeSeriesHandler) handler);
-          break;
-        default:
-          LOGGER.error(
-              "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
-              handler.getDataNodeRequestType());
-      }
-    } catch (Exception e) {
-      LOGGER.warn(
-          "{} failed on ConfigNode {}, because {}, retrying {}...",
-          handler.getDataNodeRequestType(),
-          dataNodeLocation.getInternalEndPoint(),
-          e.getMessage(),
-          retryCount);
-    }
-  }
-
-  /**
-   * Execute CreateRegionGroupsPlan asynchronously
-   *
-   * @param ttlMap Map<StorageGroupName, TTL>
-   * @return Those RegionReplicas that failed to create
-   */
-  public Map<TConsensusGroupId, TRegionReplicaSet> createRegionGroups(
-      CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
-    // Because different requests will be sent to the same node when createRegions,
-    // so for CreateRegions use Map<index, TDataNodeLocation>
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    int index = 0;
-    // Count the DataNodes to be sent
-    for (List<TRegionReplicaSet> regionReplicaSets :
-        createRegionGroupsPlan.getRegionGroupMap().values()) {
-      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
-        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
-          dataNodeLocationMap.put(index++, dataNodeLocation);
-        }
-      }
-    }
-    if (dataNodeLocationMap.isEmpty()) {
-      return new HashMap<>();
-    }
-    for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
-      index = 0;
-      CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
-      for (Map.Entry<String, List<TRegionReplicaSet>> entry :
-          createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
-        // Enumerate each RegionReplicaSet
-        for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
-          // Enumerate each Region
-          for (TDataNodeLocation targetDataNode : regionReplicaSet.getDataNodeLocations()) {
-            if (dataNodeLocationMap.containsKey(index)) {
-              AbstractRetryHandler handler;
-              switch (regionReplicaSet.getRegionId().getType()) {
-                case SchemaRegion:
-                  handler =
-                      new CreateRegionHandler(
-                          countDownLatch,
-                          DataNodeRequestType.CREATE_SCHEMA_REGION,
-                          regionReplicaSet.regionId,
-                          targetDataNode,
-                          dataNodeLocationMap,
-                          index++);
-                  sendAsyncRequestToDataNode(
-                      targetDataNode,
-                      genCreateSchemaRegionReq(entry.getKey(), regionReplicaSet),
-                      handler,
-                      retry);
-                  break;
-                case DataRegion:
-                  handler =
-                      new CreateRegionHandler(
-                          countDownLatch,
-                          DataNodeRequestType.CREATE_DATA_REGION,
-                          regionReplicaSet.regionId,
-                          targetDataNode,
-                          dataNodeLocationMap,
-                          index++);
-                  sendAsyncRequestToDataNode(
-                      targetDataNode,
-                      genCreateDataRegionReq(
-                          entry.getKey(), regionReplicaSet, ttlMap.get(entry.getKey())),
-                      handler,
-                      retry);
-                  break;
-                default:
-                  break;
-              }
-            } else {
-              index++;
-            }
-          }
-        }
-      }
-      try {
-        countDownLatch.await();
-      } catch (InterruptedException e) {
-        LOGGER.error("Interrupted during createRegions on ConfigNode");
-      }
-      // Check if there is a node that fails to send the request, and
-      // retry if there is one
-      if (dataNodeLocationMap.isEmpty()) {
-        break;
-      }
-    }
-
-    // Filter RegionGroups that weren't created successfully
-    index = 0;
-    Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
-    for (List<TRegionReplicaSet> regionReplicaSets :
-        createRegionGroupsPlan.getRegionGroupMap().values()) {
-      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
-        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
-          if (dataNodeLocationMap.containsKey(index)) {
-            failedRegions
-                .computeIfAbsent(
-                    regionReplicaSet.getRegionId(),
-                    empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
-                .addToDataNodeLocations(dataNodeLocation);
-          }
-          index += 1;
-        }
-      }
-    }
-    return failedRegions;
-  }
-
-  private TCreateSchemaRegionReq genCreateSchemaRegionReq(
-      String storageGroup, TRegionReplicaSet regionReplicaSet) {
-    TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
-    req.setStorageGroup(storageGroup);
-    req.setRegionReplicaSet(regionReplicaSet);
-    return req;
-  }
-
-  private TCreateDataRegionReq genCreateDataRegionReq(
-      String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
-    TCreateDataRegionReq req = new TCreateDataRegionReq();
-    req.setStorageGroup(storageGroup);
-    req.setRegionReplicaSet(regionReplicaSet);
-    req.setTtl(TTL);
-    return req;
-  }
-
-  /**
-   * notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced
-   *
-   * @param registeredDataNodeLocationMap Map<Integer, TDataNodeLocation>
-   * @param registeredConfigNodes List<TConfigNodeLocation>
-   */
-  public void broadCastTheLatestConfigNodeGroup(
-      Map<Integer, TDataNodeLocation> registeredDataNodeLocationMap,
-      List<TConfigNodeLocation> registeredConfigNodes) {
-    if (registeredDataNodeLocationMap != null) {
-      TUpdateConfigNodeGroupReq updateConfigNodeGroupReq =
-          new TUpdateConfigNodeGroupReq(registeredConfigNodes);
-      LOGGER.info("Begin to broadcast the latest configNodeGroup: {}", registeredConfigNodes);
-      sendAsyncRequestToDataNodeWithRetry(
-          updateConfigNodeGroupReq,
-          registeredDataNodeLocationMap,
-          DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
-          null);
-      LOGGER.info("Broadcast the latest configNodeGroup finished.");
-    }
-  }
-
-  /**
-   * Always call this interface when a DataNode is restarted or removed
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void resetClient(TEndPoint endPoint) {
-    clientManager.clear(endPoint);
-  }
-
-  // TODO: Is the ClientPool must be a singleton?
-  private static class ClientPoolHolder {
-
-    private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
-
-    private ClientPoolHolder() {
-      // Empty constructor
-    }
-  }
-
-  public static AsyncDataNodeClientPool getInstance() {
-    return ClientPoolHolder.INSTANCE;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
deleted file mode 100644
index b3a5fd9328..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public abstract class AbstractRetryHandler {
-
-  protected CountDownLatch countDownLatch;
-
-  /**
-   * Map<DataNodeId, TDataNodeLocation> The DataNode that successfully execute the request will be
-   * removed from this list
-   */
-  protected Map<Integer, TDataNodeLocation> dataNodeLocationMap;
-  /** Request type to DataNode */
-  protected DataNodeRequestType dataNodeRequestType;
-  /** Target DataNode */
-  protected TDataNodeLocation targetDataNode;
-
-  public AbstractRetryHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType dataNodeRequestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    this.countDownLatch = countDownLatch;
-    this.dataNodeLocationMap = dataNodeLocationMap;
-    this.dataNodeRequestType = dataNodeRequestType;
-    this.targetDataNode = targetDataNode;
-  }
-
-  public AbstractRetryHandler(
-      DataNodeRequestType dataNodeRequestType,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    this.dataNodeLocationMap = dataNodeLocationMap;
-    this.dataNodeRequestType = dataNodeRequestType;
-  }
-
-  public DataNodeRequestType getDataNodeRequestType() {
-    return dataNodeRequestType;
-  }
-
-  public Map<Integer, TDataNodeLocation> getDataNodeLocationMap() {
-    return dataNodeLocationMap;
-  }
-
-  public void setCountDownLatch(CountDownLatch countDownLatch) {
-    this.countDownLatch = countDownLatch;
-  }
-
-  public void setTargetDataNode(TDataNodeLocation targetDataNode) {
-    this.targetDataNode = targetDataNode;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
new file mode 100644
index 0000000000..60c7878fe4
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.AbstractAsyncRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteTimeSeriesRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Asynchronous Client handler
+ *
+ * @param <Q> ClassName of RPC request
+ * @param <R> ClassName of RPC response
+ */
+public class AsyncClientHandler<Q, R> {
+
+  // Type of RPC request
+  protected final DataNodeRequestType requestType;
+
+  /**
+   * Map key: The indices of asynchronous RPC requests
+   *
+   * <p>Map value: The corresponding RPC request
+   */
+  private final Map<Integer, Q> requestMap;
+
+  /**
+   * Map key: The indices of asynchronous RPC requests
+   *
+   * <p>Map value: The target DataNodes of corresponding indices
+   *
+   * <p>All kinds of AsyncHandler will remove its targetDataNode from the dataNodeLocationMap only
+   * if its corresponding RPC request success
+   */
+  private final Map<Integer, TDataNodeLocation> dataNodeLocationMap;
+
+  /**
+   * Map key: The indices(targetDataNode's ID) of asynchronous RPC requests
+   *
+   * <p>Map value: The response of corresponding indices
+   *
+   * <p>All kinds of AsyncHandler will add response to the responseMap after its corresponding RPC
+   * request finished
+   */
+  private final Map<Integer, R> responseMap;
+
+  private CountDownLatch countDownLatch;
+
+  /** Custom constructor */
+  public AsyncClientHandler(DataNodeRequestType requestType) {
+    this.requestType = requestType;
+    this.requestMap = new ConcurrentHashMap<>();
+    this.dataNodeLocationMap = new ConcurrentHashMap<>();
+    this.responseMap = new ConcurrentHashMap<>();
+  }
+
+  public void putRequest(int requestId, Q request) {
+    requestMap.put(requestId, request);
+  }
+
+  public void putDataNodeLocation(int requestId, TDataNodeLocation dataNodeLocation) {
+    dataNodeLocationMap.put(requestId, dataNodeLocation);
+  }
+
+  /** Constructor for null requests */
+  public AsyncClientHandler(
+      DataNodeRequestType requestType, Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+    this.requestType = requestType;
+    this.dataNodeLocationMap = dataNodeLocationMap;
+
+    this.requestMap = new ConcurrentHashMap<>();
+    this.responseMap = new ConcurrentHashMap<>();
+  }
+
+  /** Constructor for unique request */
+  public AsyncClientHandler(
+      DataNodeRequestType requestType,
+      Q request,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+    this.requestType = requestType;
+    this.dataNodeLocationMap = dataNodeLocationMap;
+
+    this.requestMap = new ConcurrentHashMap<>();
+    this.dataNodeLocationMap
+        .keySet()
+        .forEach(dataNodeId -> this.requestMap.put(dataNodeId, request));
+
+    this.responseMap = new ConcurrentHashMap<>();
+  }
+
+  public DataNodeRequestType getRequestType() {
+    return requestType;
+  }
+
+  public List<Integer> getRequestIndices() {
+    return new ArrayList<>(dataNodeLocationMap.keySet());
+  }
+
+  public Q getRequest(int requestId) {
+    return requestMap.get(requestId);
+  }
+
+  public TDataNodeLocation getDataNodeLocation(int requestId) {
+    return dataNodeLocationMap.get(requestId);
+  }
+
+  public List<R> getResponseList() {
+    return new ArrayList<>(responseMap.values());
+  }
+
+  public Map<Integer, R> getResponseMap() {
+    return responseMap;
+  }
+
+  /** Always reset CountDownLatch before retry */
+  public void resetCountDownLatch() {
+    countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
+  }
+
+  public CountDownLatch getCountDownLatch() {
+    return countDownLatch;
+  }
+
+  public AbstractAsyncRPCHandler<?> createAsyncRPCHandler(
+      int requestId, TDataNodeLocation targetDataNode) {
+    switch (requestType) {
+      case CONSTRUCT_SCHEMA_BLACK_LIST:
+      case ROLLBACK_SCHEMA_BLACK_LIST:
+      case DELETE_DATA_FOR_DELETE_TIMESERIES:
+      case DELETE_TIMESERIES:
+        return new DeleteTimeSeriesRPCHandler(
+            requestType,
+            requestId,
+            targetDataNode,
+            dataNodeLocationMap,
+            (Map<Integer, TSStatus>) responseMap,
+            countDownLatch);
+      case FETCH_SCHEMA_BLACK_LIST:
+        return new FetchSchemaBlackListRPCHandler(
+            requestType,
+            requestId,
+            targetDataNode,
+            dataNodeLocationMap,
+            (Map<Integer, TFetchSchemaBlackListResp>) responseMap,
+            countDownLatch);
+      case SET_TTL:
+      case CREATE_DATA_REGION:
+      case CREATE_SCHEMA_REGION:
+      case CREATE_FUNCTION:
+      case DROP_FUNCTION:
+      case CREATE_TRIGGER_INSTANCE:
+      case DROP_TRIGGER_INSTANCE:
+      case ACTIVE_TRIGGER_INSTANCE:
+      case INACTIVE_TRIGGER_INSTANCE:
+      case MERGE:
+      case FULL_MERGE:
+      case FLUSH:
+      case CLEAR_CACHE:
+      case LOAD_CONFIGURATION:
+      case SET_SYSTEM_STATUS:
+      case UPDATE_REGION_ROUTE_MAP:
+      case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+      case INVALIDATE_MATCHED_SCHEMA_CACHE:
+      default:
+        return new AsyncTSStatusRPCHandler(
+            requestType,
+            requestId,
+            targetDataNode,
+            dataNodeLocationMap,
+            (Map<Integer, TSStatus>) responseMap,
+            countDownLatch);
+    }
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ClearCacheHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ClearCacheHandler.java
deleted file mode 100644
index 67a02843e9..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ClearCacheHandler.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class ClearCacheHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(ClearCacheHandler.class);
-
-  private final List<TSStatus> dataNodeResponseStatus;
-
-  public ClearCacheHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      List<TSStatus> dataNodeResponseStatus) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-    this.dataNodeResponseStatus = dataNodeResponseStatus;
-  }
-
-  @Override
-  public void onComplete(TSStatus response) {
-    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeResponseStatus.add(response);
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully Clear Cache on DataNode: {}", targetDataNode);
-    } else {
-      LOGGER.error(
-          "Failed to Clear Cache on DataNode {}, {}",
-          dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
-          response);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception exception) {
-    countDownLatch.countDown();
-    dataNodeResponseStatus.add(
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Clear Cache error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + exception.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConstructSchemaBlackListHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConstructSchemaBlackListHandler.java
deleted file mode 100644
index bce6a47d5b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConstructSchemaBlackListHandler.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class ConstructSchemaBlackListHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(ConstructSchemaBlackListHandler.class);
-
-  private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
-  public ConstructSchemaBlackListHandler(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      Map<Integer, TSStatus> dataNodeResponseStatusMap) {
-    super(DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST, dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
-  }
-
-  public ConstructSchemaBlackListHandler(
-      CountDownLatch countDownLatch,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(
-        countDownLatch,
-        DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST,
-        targetDataNode,
-        dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
-  }
-
-  @Override
-  public void onComplete(TSStatus tsStatus) {
-    dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
-    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully construct schema black list on DataNode: {}", targetDataNode);
-    } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.error(
-          "Failed to construct schema black list on DataNode {}, {}", targetDataNode, tsStatus);
-    } else {
-      LOGGER.error(
-          "Failed to construct schema black list on DataNode {}, {}", targetDataNode, tsStatus);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    countDownLatch.countDown();
-    dataNodeResponseStatusMap.put(
-        targetDataNode.dataNodeId,
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Construct schema black list error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + e.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
deleted file mode 100644
index 3cef7ed55a..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-/** Only use CreateRegionHandler when the LoadManager wants to create Regions */
-public class CreateRegionHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionHandler.class);
-
-  // The index of dataNodeLocations
-  // We use Index instead of DataNodeId because it is possible to send multiple createRegion
-  // requests to the same DataNode
-  private final int index;
-  // Used for Logger
-  private final TConsensusGroupId consensusGroupId;
-
-  public CreateRegionHandler(
-      CountDownLatch latch,
-      DataNodeRequestType requestType,
-      TConsensusGroupId consensusGroupId,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      int index) {
-    super(latch, requestType, targetDataNode, dataNodeLocationMap);
-    this.consensusGroupId = consensusGroupId;
-    this.index = index;
-  }
-
-  @Override
-  public void onComplete(TSStatus tsStatus) {
-    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(index);
-      LOGGER.info(
-          String.format(
-              "Successfully create %s on DataNode: %s",
-              ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode));
-    } else {
-      LOGGER.error(
-          String.format(
-              "Create %s on DataNode: %s failed, %s",
-              ConsensusGroupId.formatTConsensusGroupId(consensusGroupId),
-              targetDataNode,
-              tsStatus));
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    LOGGER.error(
-        String.format(
-            "Create %s on DataNode: %s failed, %s",
-            ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode, e));
-    countDownLatch.countDown();
-  }
-
-  public TConsensusGroupId getConsensusGroupId() {
-    return consensusGroupId;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteDataForDeleteTimeSeriesHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteDataForDeleteTimeSeriesHandler.java
deleted file mode 100644
index d72fee586a..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteDataForDeleteTimeSeriesHandler.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class DeleteDataForDeleteTimeSeriesHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(DeleteDataForDeleteTimeSeriesHandler.class);
-
-  private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
-  public DeleteDataForDeleteTimeSeriesHandler(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      Map<Integer, TSStatus> dataNodeResponseStatusMap) {
-    super(DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES, dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
-  }
-
-  public DeleteDataForDeleteTimeSeriesHandler(
-      CountDownLatch countDownLatch,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(
-        countDownLatch,
-        DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES,
-        targetDataNode,
-        dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
-  }
-
-  public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
-    return dataNodeResponseStatusMap;
-  }
-
-  @Override
-  public void onComplete(TSStatus tsStatus) {
-    dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
-    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully delete data for delete timeseries on DataNode: {}", targetDataNode);
-    } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.error(
-          "Failed to delete data for delete timeseries on DataNode {}, {}",
-          targetDataNode,
-          tsStatus);
-    } else {
-      LOGGER.error(
-          "Failed to delete data for delete timeseries on DataNode {}, {}",
-          targetDataNode,
-          tsStatus);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    countDownLatch.countDown();
-    dataNodeResponseStatusMap.put(
-        targetDataNode.dataNodeId,
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Delete data for delete timeseries error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + e.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteTimeSeriesHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteTimeSeriesHandler.java
deleted file mode 100644
index e7c0cea47b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteTimeSeriesHandler.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class DeleteTimeSeriesHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTimeSeriesHandler.class);
-
-  private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
-  public DeleteTimeSeriesHandler(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      Map<Integer, TSStatus> dataNodeResponseStatusMap) {
-    super(DataNodeRequestType.DELETE_TIMESERIES, dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
-  }
-
-  public DeleteTimeSeriesHandler(
-      CountDownLatch countDownLatch,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(
-        countDownLatch, DataNodeRequestType.DELETE_TIMESERIES, targetDataNode, dataNodeLocationMap);
-    dataNodeResponseStatusMap = new ConcurrentHashMap<>();
-  }
-
-  public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
-    return dataNodeResponseStatusMap;
-  }
-
-  @Override
-  public void onComplete(TSStatus tsStatus) {
-    dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
-    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully delete timeseries on DataNode: {}", targetDataNode);
-    } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.error("Failed to delete timeseries on DataNode {}, {}", targetDataNode, tsStatus);
-    } else {
-      LOGGER.error("Failed to delete timeseries on DataNode {}, {}", targetDataNode, tsStatus);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    countDownLatch.countDown();
-    dataNodeResponseStatusMap.put(
-        targetDataNode.dataNodeId,
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Delete timeseries error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + e.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
deleted file mode 100644
index e48545a8ae..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class FunctionManagementHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(FunctionManagementHandler.class);
-
-  private final List<TSStatus> dataNodeResponseStatus;
-
-  public FunctionManagementHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      List<TSStatus> dataNodeResponseStatus) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-    this.dataNodeResponseStatus = dataNodeResponseStatus;
-  }
-
-  @Override
-  public void onComplete(TSStatus response) {
-    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeResponseStatus.add(response);
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
-    } else {
-      LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception exception) {
-    dataNodeResponseStatus.add(
-        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
-            .setMessage(targetDataNode + exception.getMessage()));
-    LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
-    countDownLatch.countDown();
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/InvalidateMatchedSchemaCacheHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/InvalidateMatchedSchemaCacheHandler.java
deleted file mode 100644
index b620560fad..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/InvalidateMatchedSchemaCacheHandler.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class InvalidateMatchedSchemaCacheHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(InvalidateMatchedSchemaCacheHandler.class);
-
-  private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
-  public InvalidateMatchedSchemaCacheHandler(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      Map<Integer, TSStatus> dataNodeResponseStatusMap) {
-    super(DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE, dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
-  }
-
-  public InvalidateMatchedSchemaCacheHandler(
-      CountDownLatch countDownLatch,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(
-        countDownLatch,
-        DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
-        targetDataNode,
-        dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
-  }
-
-  public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
-    return dataNodeResponseStatusMap;
-  }
-
-  @Override
-  public void onComplete(TSStatus tsStatus) {
-    dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
-    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully invalidate matched schema cache on DataNode: {}", targetDataNode);
-    } else {
-      LOGGER.error(
-          "Failed to invalidate matched schema cache on DataNode {}, {}", targetDataNode, tsStatus);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    countDownLatch.countDown();
-    dataNodeResponseStatusMap.put(
-        targetDataNode.getDataNodeId(),
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Invalidate matched schema cache error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + e.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/LoadConfigurationHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/LoadConfigurationHandler.java
deleted file mode 100644
index 198ce3f5f6..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/LoadConfigurationHandler.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class LoadConfigurationHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(LoadConfigurationHandler.class);
-
-  private final List<TSStatus> dataNodeResponseStatus;
-
-  public LoadConfigurationHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      List<TSStatus> dataNodeResponseStatus) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-    this.dataNodeResponseStatus = dataNodeResponseStatus;
-  }
-
-  @Override
-  public void onComplete(TSStatus response) {
-    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeResponseStatus.add(response);
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully Load Configuration on DataNode: {}", targetDataNode);
-    } else {
-      dataNodeResponseStatus.add(response);
-      LOGGER.error(
-          "Failed to Load Configuration on DataNode {}, {}",
-          dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
-          response);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception exception) {
-    countDownLatch.countDown();
-    dataNodeResponseStatus.add(
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Load Configuration error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + exception.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/MergeHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/MergeHandler.java
deleted file mode 100644
index 5ed81310de..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/MergeHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class MergeHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(MergeHandler.class);
-
-  private final List<TSStatus> dataNodeResponseStatus;
-
-  public MergeHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType dataNodeRequestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      List<TSStatus> dataNodeResponseStatus) {
-    super(countDownLatch, dataNodeRequestType, targetDataNode, dataNodeLocationMap);
-    this.dataNodeResponseStatus = dataNodeResponseStatus;
-  }
-
-  @Override
-  public void onComplete(TSStatus response) {
-    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeResponseStatus.add(response);
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
-    } else {
-      LOGGER.error(
-          "Failed to {} on DataNode {}, {}",
-          dataNodeRequestType,
-          dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
-          response);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception exception) {
-    countDownLatch.countDown();
-    dataNodeResponseStatus.add(
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                dataNodeRequestType
-                    + " error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + exception.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/RollbackSchemaBlackListHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/RollbackSchemaBlackListHandler.java
deleted file mode 100644
index c1ea9b08f1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/RollbackSchemaBlackListHandler.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class RollbackSchemaBlackListHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(RollbackSchemaBlackListHandler.class);
-
-  private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
-  public RollbackSchemaBlackListHandler(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      Map<Integer, TSStatus> dataNodeResponseStatusMap) {
-    super(DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST, dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
-  }
-
-  public RollbackSchemaBlackListHandler(
-      CountDownLatch countDownLatch,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(
-        countDownLatch,
-        DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST,
-        targetDataNode,
-        dataNodeLocationMap);
-    this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
-  }
-
-  public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
-    return dataNodeResponseStatusMap;
-  }
-
-  @Override
-  public void onComplete(TSStatus tsStatus) {
-    dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
-    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully rollback schema black list on DataNode: {}", targetDataNode);
-    } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.error(
-          "Failed to rollback schema black list on DataNode {}, {}", targetDataNode, tsStatus);
-    } else {
-      LOGGER.error(
-          "Failed to rollback schema black list on DataNode {}, {}", targetDataNode, tsStatus);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    countDownLatch.countDown();
-    dataNodeResponseStatusMap.put(
-        targetDataNode.getDataNodeId(),
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Rollback schema black list error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + e.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java
deleted file mode 100644
index 92f00aa766..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class SetSystemStatusHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SetSystemStatusHandler.class);
-
-  private final List<TSStatus> dataNodeResponseStatus;
-
-  public SetSystemStatusHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      List<TSStatus> dataNodeResponseStatus) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-    this.dataNodeResponseStatus = dataNodeResponseStatus;
-  }
-
-  @Override
-  public void onComplete(TSStatus response) {
-    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeResponseStatus.add(response);
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully Set System Status on DataNode: {}", targetDataNode);
-    } else {
-      dataNodeResponseStatus.add(response);
-      LOGGER.error(
-          "Failed to Set System Status on DataNode {}, {}",
-          dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
-          response);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception exception) {
-    countDownLatch.countDown();
-    dataNodeResponseStatus.add(
-        new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Set System Status error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + exception.getMessage())));
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
deleted file mode 100644
index 6792a49e7f..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class SetTTLHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLHandler.class);
-
-  public SetTTLHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-  }
-
-  @Override
-  public void onComplete(TSStatus response) {
-    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully SetTTL on DataNode: {}", targetDataNode);
-    } else {
-      LOGGER.error("Failed to SetTTL on DataNode: {}, {}", targetDataNode, response);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    countDownLatch.countDown();
-    LOGGER.error("Failed to SetTTL on DataNode: {}", targetDataNode);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
deleted file mode 100644
index 00d8047bdf..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class TriggerManagementHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManagementHandler.class);
-
-  private final List<TSStatus> dataNodeResponseStatus;
-
-  public TriggerManagementHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      List<TSStatus> dataNodeResponseStatus) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-    this.dataNodeResponseStatus = dataNodeResponseStatus;
-  }
-
-  @Override
-  public void onComplete(TSStatus response) {
-    dataNodeResponseStatus.add(response);
-    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
-    } else {
-      LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception exception) {
-    dataNodeResponseStatus.add(
-        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
-            .setMessage(targetDataNode + exception.getMessage()));
-    LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
-    countDownLatch.countDown();
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
deleted file mode 100644
index e4cf71b4b1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class UpdateConfigNodeGroupHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateConfigNodeGroupHandler.class);
-
-  public UpdateConfigNodeGroupHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-  }
-
-  @Override
-  public void onComplete(TSStatus status) {
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info(
-          "Successfully broadCast the latest configNodeGroup on DataNode: {}", targetDataNode);
-    } else {
-      LOGGER.error(
-          "Failed to broadCast the latest configNodeGroup on DataNode: {}, {}",
-          targetDataNode,
-          status);
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    LOGGER.error("BroadCast the latest configNodeGroup on DataNode: {} failed", targetDataNode);
-    countDownLatch.countDown();
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
deleted file mode 100644
index 30a88953c1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class UpdateRegionRouteMapHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TSStatus> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
-
-  public UpdateRegionRouteMapHandler(
-      CountDownLatch countDownLatch,
-      DataNodeRequestType requestType,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-  }
-
-  @Override
-  public void onComplete(TSStatus status) {
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info(
-          "Successfully update the RegionRouteMap on DataNode: {}", targetDataNode.getDataNodeId());
-    } else {
-      LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode.getDataNodeId());
-    }
-    countDownLatch.countDown();
-  }
-
-  @Override
-  public void onError(Exception e) {
-    LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode.getDataNodeId());
-    countDownLatch.countDown();
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index 458267568e..62f2ea3676 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
 import org.apache.iotdb.confignode.manager.node.ConfigNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index ded73deaa2..fc61134e9d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AbstractAsyncRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AbstractAsyncRPCHandler.java
new file mode 100644
index 0000000000..7fe5d780f9
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AbstractAsyncRPCHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public abstract class AbstractAsyncRPCHandler<T> implements AsyncMethodCallback<T> {
+
+  // Type of RPC request
+  protected final DataNodeRequestType requestType;
+  // Index of request
+  protected final int requestId;
+  // Target DataNode
+  protected final TDataNodeLocation targetDataNode;
+
+  /**
+   * Map key: The indices of asynchronous RPC requests
+   *
+   * <p>Map value: The target DataNodes of corresponding indices
+   *
+   * <p>All kinds of AsyncHandler will remove its targetDataNode from the dataNodeLocationMap only
+   * if its corresponding RPC request success
+   */
+  protected final Map<Integer, TDataNodeLocation> dataNodeLocationMap;
+
+  /**
+   * Map key: The indices(targetDataNode's ID) of asynchronous RPC requests
+   *
+   * <p>Map value: The response of corresponding indices
+   *
+   * <p>All kinds of AsyncHandler will add response to the responseMap after its corresponding RPC
+   * request finished
+   */
+  protected final Map<Integer, T> responseMap;
+
+  // All kinds of AsyncHandler will invoke countDown after its corresponding RPC request finished
+  protected final CountDownLatch countDownLatch;
+
+  protected final String formattedTargetLocation;
+
+  public AbstractAsyncRPCHandler(
+      DataNodeRequestType requestType,
+      int requestId,
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      Map<Integer, T> responseMap,
+      CountDownLatch countDownLatch) {
+    this.requestType = requestType;
+    this.requestId = requestId;
+    this.targetDataNode = targetDataNode;
+    this.formattedTargetLocation =
+        "{id="
+            + targetDataNode.getDataNodeId()
+            + ", internalEndPoint="
+            + targetDataNode.getInternalEndPoint()
+            + "}";
+
+    this.dataNodeLocationMap = dataNodeLocationMap;
+    this.responseMap = responseMap;
+    this.countDownLatch = countDownLatch;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AsyncTSStatusRPCHandler.java
similarity index 54%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AsyncTSStatusRPCHandler.java
index aa0c366ced..ace2eb58aa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AsyncTSStatusRPCHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -24,57 +24,65 @@ import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
+/** General RPC handler for TSStatus response type */
+public class AsyncTSStatusRPCHandler extends AbstractAsyncRPCHandler<TSStatus> {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(FlushHandler.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTSStatusRPCHandler.class);
 
-  private final List<TSStatus> dataNodeResponseStatus;
-
-  public FlushHandler(
-      CountDownLatch countDownLatch,
+  public AsyncTSStatusRPCHandler(
       DataNodeRequestType requestType,
+      int requestId,
       TDataNodeLocation targetDataNode,
       Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      List<TSStatus> dataNodeResponseStatus) {
-    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
-    this.dataNodeResponseStatus = dataNodeResponseStatus;
+      Map<Integer, TSStatus> responseMap,
+      CountDownLatch countDownLatch) {
+    super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
   }
 
   @Override
   public void onComplete(TSStatus response) {
+    // Put response
+    responseMap.put(requestId, response);
+
     if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeResponseStatus.add(response);
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully Flush on DataNode: {}", targetDataNode);
+      // Remove only if success
+      dataNodeLocationMap.remove(requestId);
+      LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
     } else {
       LOGGER.error(
-          "Failed to Flush on DataNode {}, {}",
-          dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
+          "Failed to {} on DataNode: {}, response: {}",
+          requestType,
+          formattedTargetLocation,
           response);
     }
+
+    // Always CountDown
     countDownLatch.countDown();
   }
 
   @Override
-  public void onError(Exception exception) {
-    countDownLatch.countDown();
-    dataNodeResponseStatus.add(
+  public void onError(Exception e) {
+    String errorMsg =
+        "Failed to "
+            + requestType
+            + " on DataNode: "
+            + formattedTargetLocation
+            + ", exception: "
+            + e.getMessage();
+    LOGGER.error(errorMsg);
+
+    responseMap.put(
+        requestId,
         new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Flush error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + exception.getMessage())));
+            RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
+
+    // Always CountDown
+    countDownLatch.countDown();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DeleteTimeSeriesRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DeleteTimeSeriesRPCHandler.java
new file mode 100644
index 0000000000..c2d9074d6e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DeleteTimeSeriesRPCHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class DeleteTimeSeriesRPCHandler extends AsyncTSStatusRPCHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTimeSeriesRPCHandler.class);
+
+  public DeleteTimeSeriesRPCHandler(
+      DataNodeRequestType requestType,
+      int requestId,
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      Map<Integer, TSStatus> responseMap,
+      CountDownLatch countDownLatch) {
+    super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
+  }
+
+  @Override
+  public void onComplete(TSStatus tsStatus) {
+    responseMap.put(requestId, tsStatus);
+    LOGGER.info("{} for {} receives: {}", requestType, requestId, tsStatus);
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataNodeLocationMap.remove(requestId);
+      LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
+    } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      dataNodeLocationMap.remove(requestId);
+      LOGGER.error(
+          "Failed to {} on DataNode {}, {}", requestType, formattedTargetLocation, tsStatus);
+    } else {
+      LOGGER.error(
+          "Failed to {} on DataNode {}, {}", requestType, formattedTargetLocation, tsStatus);
+    }
+    countDownLatch.countDown();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    String errorMsg =
+        requestType
+            + " error on DataNode: {id="
+            + targetDataNode.getDataNodeId()
+            + ", internalEndPoint="
+            + targetDataNode.getInternalEndPoint()
+            + "}"
+            + e.getMessage();
+    LOGGER.error(errorMsg);
+
+    countDownLatch.countDown();
+    responseMap.put(
+        requestId,
+        new TSStatus(
+            RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FetchSchemaBlackLsitHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
similarity index 55%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FetchSchemaBlackLsitHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
index 9db74c5f59..90c0f90ab6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FetchSchemaBlackLsitHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -26,53 +26,37 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
-public class FetchSchemaBlackLsitHandler extends AbstractRetryHandler
-    implements AsyncMethodCallback<TFetchSchemaBlackListResp> {
+public class FetchSchemaBlackListRPCHandler
+    extends AbstractAsyncRPCHandler<TFetchSchemaBlackListResp> {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(FetchSchemaBlackLsitHandler.class);
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(FetchSchemaBlackListRPCHandler.class);
 
-  private Map<Integer, TFetchSchemaBlackListResp> dataNodeResponseMap;
-
-  public FetchSchemaBlackLsitHandler(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      Map<Integer, TFetchSchemaBlackListResp> dataNodeResponseMap) {
-    super(DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST, dataNodeLocationMap);
-    this.dataNodeResponseMap = dataNodeResponseMap;
-  }
-
-  public FetchSchemaBlackLsitHandler(
-      CountDownLatch countDownLatch,
+  public FetchSchemaBlackListRPCHandler(
+      DataNodeRequestType requestType,
+      int requestId,
       TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(
-        countDownLatch,
-        DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST,
-        targetDataNode,
-        dataNodeLocationMap);
-    this.dataNodeResponseMap = new ConcurrentHashMap<>();
-  }
-
-  public Map<Integer, TFetchSchemaBlackListResp> getDataNodeResponseMap() {
-    return dataNodeResponseMap;
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      Map<Integer, TFetchSchemaBlackListResp> responseMap,
+      CountDownLatch countDownLatch) {
+    super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
   }
 
   @Override
   public void onComplete(TFetchSchemaBlackListResp tFetchSchemaBlackListResp) {
     TSStatus tsStatus = tFetchSchemaBlackListResp.getStatus();
-    dataNodeResponseMap.put(targetDataNode.getDataNodeId(), tFetchSchemaBlackListResp);
+    responseMap.put(requestId, tFetchSchemaBlackListResp);
     if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+      dataNodeLocationMap.remove(requestId);
       LOGGER.info("Successfully fetch schema black list on DataNode: {}", targetDataNode);
     } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+      dataNodeLocationMap.remove(requestId);
       LOGGER.error(
           "Failed to fetch schema black list on DataNode {}, {}", targetDataNode, tsStatus);
     } else {
@@ -84,18 +68,20 @@ public class FetchSchemaBlackLsitHandler extends AbstractRetryHandler
 
   @Override
   public void onError(Exception e) {
+    String errorMsg =
+        "Fetch schema black list error on DataNode: {id="
+            + targetDataNode.getDataNodeId()
+            + ", internalEndPoint="
+            + targetDataNode.getInternalEndPoint()
+            + "}"
+            + e.getMessage();
+    LOGGER.error(errorMsg);
+
     countDownLatch.countDown();
     TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
     resp.setStatus(
         new TSStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
-                "Fetch schema black list error on DataNode: {id="
-                    + targetDataNode.getDataNodeId()
-                    + ", internalEndPoint="
-                    + targetDataNode.getInternalEndPoint()
-                    + "}"
-                    + e.getMessage())));
-    dataNodeResponseMap.put(targetDataNode.getDataNodeId(), resp);
+            RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
+    responseMap.put(requestId, resp);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/AbstractDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/AbstractDataNodeTask.java
deleted file mode 100644
index 10671031f0..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/AbstractDataNodeTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public abstract class AbstractDataNodeTask<T> {
-
-  protected Map<Integer, TDataNodeLocation> dataNodeLocationMap;
-
-  protected Map<Integer, T> dataNodeResponseMap = new ConcurrentHashMap<>();
-
-  /** Must provide thread-safe map */
-  public AbstractDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    this.dataNodeLocationMap = dataNodeLocationMap;
-  }
-
-  public Map<Integer, TDataNodeLocation> getDataNodeLocationMap() {
-    return dataNodeLocationMap;
-  }
-
-  public abstract DataNodeRequestType getDataNodeRequestType();
-
-  /** Provide the handler for single rpc process */
-  public abstract AbstractRetryHandler getSingleRequestHandler();
-
-  /** Get the final responses from all dataNodes */
-  public Map<Integer, T> getDataNodeResponseMap() {
-    return dataNodeResponseMap;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/ConstructSchemaBlackListDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/ConstructSchemaBlackListDataNodeTask.java
deleted file mode 100644
index 9e12a1b61b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/ConstructSchemaBlackListDataNodeTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.ConstructSchemaBlackListHandler;
-
-import java.util.Map;
-
-public class ConstructSchemaBlackListDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
-  public ConstructSchemaBlackListDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(dataNodeLocationMap);
-  }
-
-  @Override
-  public DataNodeRequestType getDataNodeRequestType() {
-    return DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST;
-  }
-
-  @Override
-  public AbstractRetryHandler getSingleRequestHandler() {
-    return new ConstructSchemaBlackListHandler(dataNodeLocationMap, dataNodeResponseMap);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteDataForDeleteTimeSeriesDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteDataForDeleteTimeSeriesDataNodeTask.java
deleted file mode 100644
index 089068aa51..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteDataForDeleteTimeSeriesDataNodeTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteDataForDeleteTimeSeriesHandler;
-
-import java.util.Map;
-
-public class DeleteDataForDeleteTimeSeriesDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
-  public DeleteDataForDeleteTimeSeriesDataNodeTask(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(dataNodeLocationMap);
-  }
-
-  @Override
-  public DataNodeRequestType getDataNodeRequestType() {
-    return DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES;
-  }
-
-  @Override
-  public AbstractRetryHandler getSingleRequestHandler() {
-    return new DeleteDataForDeleteTimeSeriesHandler(dataNodeLocationMap, dataNodeResponseMap);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteTimeSeriesDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteTimeSeriesDataNodeTask.java
deleted file mode 100644
index caf67790c2..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteTimeSeriesDataNodeTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteTimeSeriesHandler;
-
-import java.util.Map;
-
-public class DeleteTimeSeriesDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
-  public DeleteTimeSeriesDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(dataNodeLocationMap);
-  }
-
-  @Override
-  public DataNodeRequestType getDataNodeRequestType() {
-    return DataNodeRequestType.DELETE_TIMESERIES;
-  }
-
-  @Override
-  public AbstractRetryHandler getSingleRequestHandler() {
-    return new DeleteTimeSeriesHandler(dataNodeLocationMap, dataNodeResponseMap);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/FetchSchemaBlackListDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/FetchSchemaBlackListDataNodeTask.java
deleted file mode 100644
index 8545b7dd5c..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/FetchSchemaBlackListDataNodeTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FetchSchemaBlackLsitHandler;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-
-import java.util.Map;
-
-public class FetchSchemaBlackListDataNodeTask
-    extends AbstractDataNodeTask<TFetchSchemaBlackListResp> {
-
-  public FetchSchemaBlackListDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(dataNodeLocationMap);
-  }
-
-  @Override
-  public DataNodeRequestType getDataNodeRequestType() {
-    return DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST;
-  }
-
-  @Override
-  public AbstractRetryHandler getSingleRequestHandler() {
-    return new FetchSchemaBlackLsitHandler(dataNodeLocationMap, dataNodeResponseMap);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/InvalidateMatchedSchemaCacheDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/InvalidateMatchedSchemaCacheDataNodeTask.java
deleted file mode 100644
index 2a08fdcb99..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/InvalidateMatchedSchemaCacheDataNodeTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.InvalidateMatchedSchemaCacheHandler;
-
-import java.util.Map;
-
-public class InvalidateMatchedSchemaCacheDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
-  public InvalidateMatchedSchemaCacheDataNodeTask(
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(dataNodeLocationMap);
-  }
-
-  @Override
-  public DataNodeRequestType getDataNodeRequestType() {
-    return DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE;
-  }
-
-  @Override
-  public AbstractRetryHandler getSingleRequestHandler() {
-    return new InvalidateMatchedSchemaCacheHandler(dataNodeLocationMap, dataNodeResponseMap);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/RollbackSchemaBlackListDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/RollbackSchemaBlackListDataNodeTask.java
deleted file mode 100644
index 3a24ae7898..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/RollbackSchemaBlackListDataNodeTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.RollbackSchemaBlackListHandler;
-
-import java.util.Map;
-
-public class RollbackSchemaBlackListDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
-  public RollbackSchemaBlackListDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
-    super(dataNodeLocationMap);
-  }
-
-  @Override
-  public DataNodeRequestType getDataNodeRequestType() {
-    return DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST;
-  }
-
-  @Override
-  public AbstractRetryHandler getSingleRequestHandler() {
-    return new RollbackSchemaBlackListHandler(dataNodeLocationMap, dataNodeResponseMap);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index de74066c1b..557a00f55f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.sync.confignode;
+package org.apache.iotdb.confignode.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
similarity index 99%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 41a8d2d914..d50210bb4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.client.sync.datanode;
+package org.apache.iotdb.confignode.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
index d47a3f7424..6b9031c30a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 257a68d1ca..7627405067 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -28,8 +28,9 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
@@ -216,37 +217,39 @@ public class ClusterSchemaManager {
           TSStatusCode.STORAGE_GROUP_NOT_EXIST,
           "Path [" + new PartialPath(setTTLPlan.getStorageGroupPathPattern()) + "] does not exist");
     }
-    Map<Integer, TDataNodeLocation> dataNodeLocationMaps = new ConcurrentHashMap<>();
+
+    // Map<DataNodeId, TDataNodeLocation>
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+    // Map<DataNodeId, StorageGroupPatterns>
     Map<Integer, List<String>> dnlToSgMap = new ConcurrentHashMap<>();
     for (String storageGroup : storageSchemaMap.keySet()) {
-
+      // Get related DataNodes
       Set<TDataNodeLocation> dataNodeLocations =
           getPartitionManager()
               .getStorageGroupRelatedDataNodes(storageGroup, TConsensusGroupType.DataRegion);
+
       for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
-        if (!dataNodeLocationMaps.containsKey(dataNodeLocation.getDataNodeId())) {
-          dataNodeLocationMaps.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
-          List<String> storageGroups = new ArrayList<>();
-          storageGroups.add(storageGroup);
-          dnlToSgMap.put(dataNodeLocation.getDataNodeId(), storageGroups);
-        } else {
-          List<String> storageGroups = dnlToSgMap.get(dataNodeLocation.getDataNodeId());
-          storageGroups.add(storageGroup);
-          dnlToSgMap.put(dataNodeLocation.getDataNodeId(), storageGroups);
-        }
+        dataNodeLocationMap.putIfAbsent(dataNodeLocation.getDataNodeId(), dataNodeLocation);
+        dnlToSgMap
+            .computeIfAbsent(dataNodeLocation.getDataNodeId(), empty -> new ArrayList<>())
+            .add(storageGroup);
       }
     }
 
-    for (Map.Entry<Integer, List<String>> entry : dnlToSgMap.entrySet()) {
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-      dataNodeLocationMap.put(entry.getKey(), dataNodeLocationMaps.get(entry.getKey()));
-      AsyncDataNodeClientPool.getInstance()
-          .sendAsyncRequestToDataNodeWithRetry(
-              new TSetTTLReq(entry.getValue(), setTTLPlan.getTTL()),
-              dataNodeLocationMap,
-              DataNodeRequestType.SET_TTL,
-              null);
-    }
+    AsyncClientHandler<TSetTTLReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.SET_TTL);
+    dnlToSgMap
+        .keySet()
+        .forEach(
+            dataNodeId -> {
+              TSetTTLReq setTTLReq =
+                  new TSetTTLReq(dnlToSgMap.get(dataNodeId), setTTLPlan.getTTL());
+              clientHandler.putRequest(dataNodeId, setTTLReq);
+              clientHandler.putDataNodeLocation(dataNodeId, dataNodeLocationMap.get(dataNodeId));
+            });
+    // TODO: Check response
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+
     return getConsensusManager().write(setTTLPlan).getStatus();
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index a1e2ca29ef..150d7175c3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
 import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 656e5880f6..e57b16e56f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.path.PathPatternTree;
@@ -34,15 +35,15 @@ import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
@@ -212,9 +213,11 @@ public class ProcedureManager {
    *
    * @return SUCCESS_STATUS if all RegionGroups created successfully, CREATE_REGION_ERROR otherwise
    */
-  public TSStatus createRegionGroups(CreateRegionGroupsPlan createRegionGroupsPlan) {
+  public TSStatus createRegionGroups(
+      TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
     long procedureId =
-        executor.submitProcedure(new CreateRegionGroupsProcedure(createRegionGroupsPlan));
+        executor.submitProcedure(
+            new CreateRegionGroupsProcedure(consensusGroupType, createRegionGroupsPlan));
     List<TSStatus> statusList = new ArrayList<>();
     boolean isSucceed =
         waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 0309ace63a..9a380741ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -22,7 +22,8 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -34,8 +35,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -81,17 +80,13 @@ public class UDFManager {
       String functionName, String className, List<String> uris) {
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TCreateFunctionRequest request =
         new TCreateFunctionRequest(functionName, className, uris);
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            request,
-            dataNodeLocationMap,
-            DataNodeRequestType.CREATE_FUNCTION,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+
+    AsyncClientHandler<TCreateFunctionRequest, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.CREATE_FUNCTION, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public TSStatus dropFunction(String functionName) {
@@ -113,15 +108,11 @@ public class UDFManager {
   private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            request,
-            dataNodeLocationMap,
-            DataNodeRequestType.DROP_FUNCTION,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+
+    AsyncClientHandler<TDropFunctionRequest, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.DROP_FUNCTION, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 7f01dcde96..902c77be99 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -30,7 +31,8 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
@@ -248,12 +250,13 @@ public class LoadManager {
     LOGGER.info("[latestRegionRouteMap] Begin to broadcast RegionRouteMap:");
     long broadcastTime = System.currentTimeMillis();
     printRegionRouteMap(broadcastTime, latestRegionRouteMap);
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
-            dataNodeLocationMap,
+
+    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
             DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
-            null);
+            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+            dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
     LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap finished.");
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index f997298e3d..df31b8c922 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -33,12 +33,13 @@ import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeHeartbeatClientPool;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeHeartbeatClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -74,7 +75,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -463,62 +463,47 @@ public class NodeManager {
   public List<TSStatus> merge() {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            null, dataNodeLocationMap, DataNodeRequestType.MERGE, dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncClientHandler<Object, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.MERGE, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public List<TSStatus> flush(TFlushReq req) {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            req, dataNodeLocationMap, DataNodeRequestType.FLUSH, dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncClientHandler<TFlushReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public List<TSStatus> clearCache() {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            null, dataNodeLocationMap, DataNodeRequestType.CLEAR_CACHE, dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncClientHandler<Object, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public List<TSStatus> loadConfiguration() {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            null,
-            dataNodeLocationMap,
-            DataNodeRequestType.LOAD_CONFIGURATION,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncClientHandler<Object, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public List<TSStatus> setSystemStatus(String status) {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            status,
-            dataNodeLocationMap,
-            DataNodeRequestType.SET_SYSTEM_STATUS,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncClientHandler<String, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   /** Start the heartbeat service */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 8a666f53b8..29d4098917 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
@@ -389,7 +389,8 @@ public class PartitionManager {
       if (!allotmentMap.isEmpty()) {
         CreateRegionGroupsPlan createRegionGroupsPlan =
             getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
-        result = getProcedureManager().createRegionGroups(createRegionGroupsPlan);
+        result =
+            getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan);
       } else {
         result = RpcUtils.SUCCESS_STATUS;
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index c8133dda1a..6862be0a4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.env;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -29,9 +30,10 @@ import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
@@ -51,10 +53,13 @@ import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
 
@@ -66,7 +71,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -302,12 +306,19 @@ public class ConfigNodeProcedureEnv {
             ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
   }
 
-  /** notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced */
+  /** Notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced */
   public void broadCastTheLatestConfigNodeGroup() {
-    AsyncDataNodeClientPool.getInstance()
-        .broadCastTheLatestConfigNodeGroup(
-            configManager.getNodeManager().getRegisteredDataNodeLocations(),
-            configManager.getNodeManager().getRegisteredConfigNodes());
+    List<TConfigNodeLocation> registeredConfigNodes =
+        configManager.getNodeManager().getRegisteredConfigNodes();
+    AsyncClientHandler<TUpdateConfigNodeGroupReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
+            new TUpdateConfigNodeGroupReq(registeredConfigNodes),
+            configManager.getNodeManager().getRegisteredDataNodeLocations());
+
+    LOG.info("Begin to broadcast the latest configNodeGroup: {}", registeredConfigNodes);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    LOG.info("Broadcast the latest configNodeGroup finished.");
   }
 
   /**
@@ -327,7 +338,97 @@ public class ConfigNodeProcedureEnv {
    * @return Those RegionReplicas that failed to create
    */
   public Map<TConsensusGroupId, TRegionReplicaSet> doRegionCreation(
+      TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
+
+    // Prepare clientHandler
+    AsyncClientHandler<?, TSStatus> clientHandler;
+    switch (consensusGroupType) {
+      case SchemaRegion:
+        clientHandler = getCreateSchemaRegionClientHandler(createRegionGroupsPlan);
+        break;
+      case DataRegion:
+      default:
+        clientHandler = getCreateDataRegionClientHandler(createRegionGroupsPlan);
+        break;
+    }
+    if (clientHandler.getRequestIndices().isEmpty()) {
+      return new HashMap<>();
+    }
+
+    // Send CreateRegion requests to DataNodes
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+
+    // Filter RegionGroups that weren't created successfully
+    int requestId = 0;
+    Map<Integer, TSStatus> responseMap = clientHandler.getResponseMap();
+    Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
+    for (List<TRegionReplicaSet> regionReplicaSets :
+        createRegionGroupsPlan.getRegionGroupMap().values()) {
+      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+          if (responseMap.get(requestId).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            failedRegions
+                .computeIfAbsent(
+                    regionReplicaSet.getRegionId(),
+                    empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
+                .addToDataNodeLocations(dataNodeLocation);
+          }
+          requestId += 1;
+        }
+      }
+    }
+    return failedRegions;
+  }
+
+  private AsyncClientHandler<TCreateSchemaRegionReq, TSStatus> getCreateSchemaRegionClientHandler(
       CreateRegionGroupsPlan createRegionGroupsPlan) {
+    AsyncClientHandler<TCreateSchemaRegionReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.CREATE_SCHEMA_REGION);
+
+    int requestId = 0;
+    for (Map.Entry<String, List<TRegionReplicaSet>> sgRegionsEntry :
+        createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+      String storageGroup = sgRegionsEntry.getKey();
+      List<TRegionReplicaSet> regionReplicaSets = sgRegionsEntry.getValue();
+      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+          clientHandler.putRequest(
+              requestId, genCreateSchemaRegionReq(storageGroup, regionReplicaSet));
+          clientHandler.putDataNodeLocation(requestId, dataNodeLocation);
+          requestId += 1;
+        }
+      }
+    }
+
+    return clientHandler;
+  }
+
+  private AsyncClientHandler<TCreateDataRegionReq, TSStatus> getCreateDataRegionClientHandler(
+      CreateRegionGroupsPlan createRegionGroupsPlan) {
+    Map<String, Long> ttlMap = getTTLMap(createRegionGroupsPlan);
+    AsyncClientHandler<TCreateDataRegionReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.CREATE_DATA_REGION);
+
+    int requestId = 0;
+    for (Map.Entry<String, List<TRegionReplicaSet>> sgRegionsEntry :
+        createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+      String storageGroup = sgRegionsEntry.getKey();
+      List<TRegionReplicaSet> regionReplicaSets = sgRegionsEntry.getValue();
+      long ttl = ttlMap.get(storageGroup);
+      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+          clientHandler.putRequest(
+              requestId, genCreateDataRegionReq(storageGroup, regionReplicaSet, ttl));
+          clientHandler.putDataNodeLocation(requestId, dataNodeLocation);
+          requestId += 1;
+        }
+      }
+    }
+
+    return clientHandler;
+  }
+
+  private Map<String, Long> getTTLMap(CreateRegionGroupsPlan createRegionGroupsPlan) {
     Map<String, Long> ttlMap = new HashMap<>();
     for (String storageGroup : createRegionGroupsPlan.getRegionGroupMap().keySet()) {
       try {
@@ -335,11 +436,28 @@ public class ConfigNodeProcedureEnv {
             storageGroup,
             getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
       } catch (StorageGroupNotExistsException e) {
-        // Notice: This line will never
+        // Notice: This line will never reach since we've checked before
         LOG.error("StorageGroup doesn't exist", e);
       }
     }
-    return AsyncDataNodeClientPool.getInstance().createRegionGroups(createRegionGroupsPlan, ttlMap);
+    return ttlMap;
+  }
+
+  private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+      String storageGroup, TRegionReplicaSet regionReplicaSet) {
+    TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+    req.setStorageGroup(storageGroup);
+    req.setRegionReplicaSet(regionReplicaSet);
+    return req;
+  }
+
+  private TCreateDataRegionReq genCreateDataRegionReq(
+      String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+    TCreateDataRegionReq req = new TCreateDataRegionReq();
+    req.setStorageGroup(storageGroup);
+    req.setRegionReplicaSet(regionReplicaSet);
+    req.setTtl(TTL);
+    return req;
   }
 
   public long getTTL(String storageGroup) throws StorageGroupNotExistsException {
@@ -375,72 +493,58 @@ public class ConfigNodeProcedureEnv {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
-    final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TCreateTriggerInstanceReq request =
         new TCreateTriggerInstanceReq(
             triggerInformation.serialize(), ByteBuffer.wrap(jarFile.getValues()));
+
+    AsyncClientHandler<TCreateTriggerInstanceReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.CREATE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
     // TODO: The request sent to DataNodes which stateful triggerInstance needn't to be created
     // don't set
     // JarFile
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            request,
-            dataNodeLocationMap,
-            DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public List<TSStatus> dropTriggerOnDataNodes(String triggerName, boolean needToDeleteJarFile) {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
-    final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TDropTriggerInstanceReq request =
         new TDropTriggerInstanceReq(triggerName, needToDeleteJarFile);
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            request,
-            dataNodeLocationMap,
-            DataNodeRequestType.DROP_TRIGGER_INSTANCE,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+
+    AsyncClientHandler<TDropTriggerInstanceReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.DROP_TRIGGER_INSTANCE, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public List<TSStatus> activeTriggerOnDataNodes(String triggerName) {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
-    final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TActiveTriggerInstanceReq request = new TActiveTriggerInstanceReq(triggerName);
 
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            request,
-            dataNodeLocationMap,
-            DataNodeRequestType.ACTIVE_TRIGGER_INSTANCE,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncClientHandler<TActiveTriggerInstanceReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.ACTIVE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
-    final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TInactiveTriggerInstanceReq request = new TInactiveTriggerInstanceReq(triggerName);
 
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            request,
-            dataNodeLocationMap,
-            DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE,
-            dataNodeResponseStatus);
-    return dataNodeResponseStatus;
+    AsyncClientHandler<TInactiveTriggerInstanceReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
   }
 
   public LockQueue getNodeLock() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index bcf5ba1a6e..73e2b65c48 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -26,8 +26,8 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index a016160087..a9fb7bf404 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import org.apache.iotdb.confignode.procedure.state.CreateTriggerState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
index 7902466653..3d1c764ad5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTrigger
 import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import org.apache.iotdb.confignode.procedure.state.DropTriggerState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index 1bd5565dc4..b4b35ee77f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -17,10 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
 
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 
 import org.slf4j.Logger;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 82131683f6..fe1dd6fc67 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
index d63a7c6c20..5e560ce799 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 1743e1f111..f46d6bf4f7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
similarity index 90%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 4e7bf3a6ef..10f24259fd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -16,19 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
@@ -49,6 +49,8 @@ public class CreateRegionGroupsProcedure
 
   private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionGroupsProcedure.class);
 
+  private TConsensusGroupType consensusGroupType;
+
   private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
 
   /** key: TConsensusGroupId value: Failed RegionReplicas */
@@ -58,13 +60,18 @@ public class CreateRegionGroupsProcedure
     super();
   }
 
-  public CreateRegionGroupsProcedure(CreateRegionGroupsPlan createRegionGroupsPlan) {
+  public CreateRegionGroupsProcedure(
+      TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
+    this.consensusGroupType = consensusGroupType;
     this.createRegionGroupsPlan = createRegionGroupsPlan;
   }
 
+  @TestOnly
   public CreateRegionGroupsProcedure(
+      TConsensusGroupType consensusGroupType,
       CreateRegionGroupsPlan createRegionGroupsPlan,
       Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets) {
+    this.consensusGroupType = consensusGroupType;
     this.createRegionGroupsPlan = createRegionGroupsPlan;
     this.failedRegionReplicaSets = failedRegionReplicaSets;
   }
@@ -73,7 +80,7 @@ public class CreateRegionGroupsProcedure
   protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) {
     switch (state) {
       case CREATE_REGION_GROUPS:
-        failedRegionReplicaSets = env.doRegionCreation(createRegionGroupsPlan);
+        failedRegionReplicaSets = env.doRegionCreation(consensusGroupType, createRegionGroupsPlan);
         setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS);
         break;
       case SHUNT_REGION_REPLICAS:
@@ -214,6 +221,7 @@ public class CreateRegionGroupsProcedure
     // must serialize CREATE_REGION_GROUPS.ordinal() firstly
     stream.writeInt(ProcedureFactory.ProcedureType.CREATE_REGION_GROUPS.ordinal());
     super.serialize(stream);
+    stream.writeInt(consensusGroupType.getValue());
     createRegionGroupsPlan.serializeForProcedure(stream);
     stream.writeInt(failedRegionReplicaSets.size());
     failedRegionReplicaSets.forEach(
@@ -226,6 +234,7 @@ public class CreateRegionGroupsProcedure
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
+    this.consensusGroupType = TConsensusGroupType.findByValue(byteBuffer.getInt());
     try {
       createRegionGroupsPlan.deserializeForProcedure(byteBuffer);
       failedRegionReplicaSets.clear();
@@ -244,21 +253,17 @@ public class CreateRegionGroupsProcedure
   }
 
   @Override
-  public boolean equals(Object that) {
-    if (that instanceof CreateRegionGroupsProcedure) {
-      CreateRegionGroupsProcedure thatProc = (CreateRegionGroupsProcedure) that;
-      return thatProc.getProcId() == this.getProcId()
-          && thatProc.getState() == this.getState()
-          && thatProc.createRegionGroupsPlan.equals(this.createRegionGroupsPlan)
-          && thatProc.failedRegionReplicaSets.equals(this.failedRegionReplicaSets);
-    }
-    return false;
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CreateRegionGroupsProcedure that = (CreateRegionGroupsProcedure) o;
+    return consensusGroupType == that.consensusGroupType
+        && createRegionGroupsPlan.equals(that.createRegionGroupsPlan)
+        && failedRegionReplicaSets.equals(that.failedRegionReplicaSets);
   }
 
   @Override
   public int hashCode() {
-    int result = createRegionGroupsPlan.hashCode();
-    result = 31 * result + Objects.hash(failedRegionReplicaSets);
-    return result;
+    return Objects.hash(consensusGroupType, createRegionGroupsPlan, failedRegionReplicaSets);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteStorageGroupProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteStorageGroupProcedure.java
index 8f464ad141..4bc9eaa436 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteStorageGroupProcedure.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteTimeSeriesProcedure.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteTimeSeriesProcedure.java
index 57f2b776c5..645865efdc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteTimeSeriesProcedure.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -26,14 +26,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.task.ConstructSchemaBlackListDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.DeleteDataForDeleteTimeSeriesDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.DeleteTimeSeriesDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.FetchSchemaBlackListDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.InvalidateMatchedSchemaCacheDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.RollbackSchemaBlackListDataNodeTask;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -151,21 +146,16 @@ public class DeleteTimeSeriesProcedure
             // construct request and send
             Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
             dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
-            ConstructSchemaBlackListDataNodeTask dataNodeTask =
-                new ConstructSchemaBlackListDataNodeTask(dataNodeLocationMap);
-            AsyncDataNodeClientPool.getInstance()
-                .sendAsyncRequestToDataNodeWithRetry(
+
+            AsyncClientHandler<TConstructSchemaBlackListReq, TSStatus> clientHandler =
+                new AsyncClientHandler<>(
+                    DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST,
                     new TConstructSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
-                    dataNodeTask);
-            dataNodeTask
-                .getDataNodeResponseMap()
-                .forEach(
-                    (k, v) -> {
-                      if (v.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                        responseMap.put(k, v);
-                      }
-                    });
-            return dataNodeTask.getDataNodeResponseMap();
+                    dataNodeLocationMap);
+            AsyncDataNodeClientPool.getInstance()
+                .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+            responseMap.putAll(clientHandler.getResponseMap());
+            return clientHandler.getResponseMap();
           }
         };
     constructBlackListTask.execute();
@@ -184,12 +174,13 @@ public class DeleteTimeSeriesProcedure
   private void invalidateCache(ConfigNodeProcedureEnv env) {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    InvalidateMatchedSchemaCacheDataNodeTask dataNodeTask =
-        new InvalidateMatchedSchemaCacheDataNodeTask(dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance()
-        .sendAsyncRequestToDataNodeWithRetry(
-            new TInvalidateMatchedSchemaCacheReq(patternTreeBytes), dataNodeTask);
-    Map<Integer, TSStatus> statusMap = dataNodeTask.getDataNodeResponseMap();
+    AsyncClientHandler<TInvalidateMatchedSchemaCacheReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
+            new TInvalidateMatchedSchemaCacheReq(patternTreeBytes),
+            dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
     for (TSStatus status : statusMap.values()) {
       // all dataNodes must clear the related schema cache
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -279,15 +270,17 @@ public class DeleteTimeSeriesProcedure
               TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
             Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
             dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
-            DeleteDataForDeleteTimeSeriesDataNodeTask dataNodeTask =
-                new DeleteDataForDeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
-            AsyncDataNodeClientPool.getInstance()
-                .sendAsyncRequestToDataNodeWithRetry(
+            AsyncClientHandler<TDeleteDataForDeleteTimeSeriesReq, TSStatus> clientHandler =
+                new AsyncClientHandler<>(
+                    DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES,
                     new TDeleteDataForDeleteTimeSeriesReq(
                         new ArrayList<>(consensusGroupIdList),
                         preparePatternTreeBytesData(patternTree)),
-                    dataNodeTask);
-            return dataNodeTask.getDataNodeResponseMap();
+                    dataNodeLocationMap);
+            AsyncDataNodeClientPool.getInstance()
+                .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+            responseMap.putAll(clientHandler.getResponseMap());
+            return clientHandler.getResponseMap();
           }
         };
     deleteDataTask.setExecuteOnAllReplicaset(true);
@@ -305,21 +298,16 @@ public class DeleteTimeSeriesProcedure
               TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
             Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
             dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
-            FetchSchemaBlackListDataNodeTask dataNodeTask =
-                new FetchSchemaBlackListDataNodeTask(dataNodeLocationMap);
-            AsyncDataNodeClientPool.getInstance()
-                .sendAsyncRequestToDataNodeWithRetry(
+            AsyncClientHandler<TFetchSchemaBlackListReq, TFetchSchemaBlackListResp> clientHandler =
+                new AsyncClientHandler<>(
+                    DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST,
                     new TFetchSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
-                    dataNodeTask);
-            Map<Integer, TFetchSchemaBlackListResp> respMap = dataNodeTask.getDataNodeResponseMap();
+                    dataNodeLocationMap);
+            AsyncDataNodeClientPool.getInstance()
+                .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+            responseMap.putAll(clientHandler.getResponseMap());
             Map<Integer, TSStatus> statusMap = new HashMap<>();
-            respMap.forEach(
-                (k, v) -> {
-                  if (v.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                    responseMap.put(k, v);
-                  }
-                  statusMap.put(k, v.getStatus());
-                });
+            clientHandler.getResponseMap().forEach((k, v) -> statusMap.put(k, v.getStatus()));
             return statusMap;
           }
         };
@@ -352,12 +340,15 @@ public class DeleteTimeSeriesProcedure
               TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
             Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
             dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
-            DeleteTimeSeriesDataNodeTask dataNodeTask =
-                new DeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
+            AsyncClientHandler<TDeleteTimeSeriesReq, TSStatus> clientHandler =
+                new AsyncClientHandler<>(
+                    DataNodeRequestType.DELETE_TIMESERIES,
+                    new TDeleteTimeSeriesReq(consensusGroupIdList, patternTreeBytes),
+                    dataNodeLocationMap);
             AsyncDataNodeClientPool.getInstance()
-                .sendAsyncRequestToDataNodeWithRetry(
-                    new TDeleteTimeSeriesReq(consensusGroupIdList, patternTreeBytes), dataNodeTask);
-            return dataNodeTask.getDataNodeResponseMap();
+                .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+            responseMap.putAll(clientHandler.getResponseMap());
+            return clientHandler.getResponseMap();
           }
         };
     deleteTimeSeriesTask.execute();
@@ -424,13 +415,15 @@ public class DeleteTimeSeriesProcedure
               TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
             Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
             dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
-            RollbackSchemaBlackListDataNodeTask dataNodeTask =
-                new RollbackSchemaBlackListDataNodeTask(dataNodeLocationMap);
-            AsyncDataNodeClientPool.getInstance()
-                .sendAsyncRequestToDataNodeWithRetry(
+            AsyncClientHandler<TRollbackSchemaBlackListReq, TSStatus> clientHandler =
+                new AsyncClientHandler<>(
+                    DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST,
                     new TRollbackSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
-                    dataNodeTask);
-            return dataNodeTask.getDataNodeResponseMap();
+                    dataNodeLocationMap);
+            AsyncDataNodeClientPool.getInstance()
+                .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+            responseMap.putAll(clientHandler.getResponseMap());
+            return clientHandler.getResponseMap();
           }
         };
     rollbackStateTask.execute();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 6d2f87d972..6d4343c084 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -17,14 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/StateMachineProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/StateMachineProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/StateMachineProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/StateMachineProcedure.java
index efda7c4ed8..0178431bd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/StateMachineProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/StateMachineProcedure.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
 
+import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 218d976a45..6951d3cb0e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -20,15 +20,15 @@
 package org.apache.iotdb.confignode.procedure.store;
 
 import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 3d736db4f2..6d5411c47f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 0ba97cb955..92ad262529 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -82,8 +82,8 @@ import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTrigger
 import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
 import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -709,7 +709,8 @@ public class ConfigPhysicalPlanSerDeTest {
     createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
     createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
     CreateRegionGroupsProcedure procedure0 =
-        new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions);
+        new CreateRegionGroupsProcedure(
+            TConsensusGroupType.DataRegion, createRegionGroupsPlan, failedRegions);
 
     updateProcedurePlan0.setProcedure(procedure0);
     updateProcedurePlan1 =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
index 86161619b3..25a39b04cf 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
@@ -19,11 +19,11 @@
 
 package org.apache.iotdb.confignode.procedure.entity;
 
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
index 71a43580d5..1761a90337 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
@@ -19,11 +19,11 @@
 
 package org.apache.iotdb.confignode.procedure.entity;
 
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
index 365e753e6f..fbadc74ec6 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.confignode.procedure.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
@@ -91,7 +93,8 @@ public class CreateRegionGroupsProcedureTest {
     createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
 
     CreateRegionGroupsProcedure procedure0 =
-        new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions0);
+        new CreateRegionGroupsProcedure(
+            TConsensusGroupType.DataRegion, createRegionGroupsPlan, failedRegions0);
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java
index 9698f19882..db37e59002 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.procedure.impl;
 
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java
index 189f855f54..983893f3d0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 
 import org.junit.Assert;