You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/11 02:12:54 UTC
[iotdb] branch master updated: [IOTDB-4396] Refactor AbstractRetryHandler in ConfigNode (#7534)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e8bbc70c5b [IOTDB-4396] Refactor AbstractRetryHandler in ConfigNode (#7534)
e8bbc70c5b is described below
commit e8bbc70c5be333284c41110f88d8466b220a7e59
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Oct 11 10:12:48 2022 +0800
[IOTDB-4396] Refactor AbstractRetryHandler in ConfigNode (#7534)
---
.../AsyncConfigNodeHeartbeatClientPool.java | 4 +-
.../client/async/AsyncDataNodeClientPool.java | 284 +++++++++++
.../AsyncDataNodeHeartbeatClientPool.java | 4 +-
.../async/datanode/AsyncDataNodeClientPool.java | 545 ---------------------
.../async/handlers/AbstractRetryHandler.java | 75 ---
.../client/async/handlers/AsyncClientHandler.java | 201 ++++++++
.../client/async/handlers/ClearCacheHandler.java | 81 ---
.../handlers/ConstructSchemaBlackListHandler.java | 95 ----
.../client/async/handlers/CreateRegionHandler.java | 91 ----
.../DeleteDataForDeleteTimeSeriesHandler.java | 103 ----
.../async/handlers/DeleteTimeSeriesHandler.java | 93 ----
.../async/handlers/FunctionManagementHandler.java | 72 ---
.../InvalidateMatchedSchemaCacheHandler.java | 95 ----
.../async/handlers/LoadConfigurationHandler.java | 82 ----
.../client/async/handlers/MergeHandler.java | 83 ----
.../handlers/RollbackSchemaBlackListHandler.java | 99 ----
.../async/handlers/SetSystemStatusHandler.java | 82 ----
.../client/async/handlers/SetTTLHandler.java | 61 ---
.../async/handlers/TriggerManagementHandler.java | 72 ---
.../handlers/UpdateConfigNodeGroupHandler.java | 66 ---
.../handlers/UpdateRegionRouteMapHandler.java | 63 ---
.../ConfigNodeHeartbeatHandler.java | 2 +-
.../{ => heartbeat}/DataNodeHeartbeatHandler.java | 2 +-
.../handlers/rpc/AbstractAsyncRPCHandler.java | 84 ++++
.../AsyncTSStatusRPCHandler.java} | 64 +--
.../handlers/rpc/DeleteTimeSeriesRPCHandler.java | 84 ++++
.../FetchSchemaBlackListRPCHandler.java} | 66 +--
.../client/async/task/AbstractDataNodeTask.java | 53 --
.../task/ConstructSchemaBlackListDataNodeTask.java | 45 --
.../DeleteDataForDeleteTimeSeriesDataNodeTask.java | 46 --
.../async/task/DeleteTimeSeriesDataNodeTask.java | 45 --
.../task/FetchSchemaBlackListDataNodeTask.java | 46 --
.../InvalidateMatchedSchemaCacheDataNodeTask.java | 46 --
.../task/RollbackSchemaBlackListDataNodeTask.java | 45 --
.../{confignode => }/SyncConfigNodeClientPool.java | 2 +-
.../{datanode => }/SyncDataNodeClientPool.java | 2 +-
.../confignode/conf/ConfigNodeRemoveCheck.java | 2 +-
.../confignode/manager/ClusterSchemaManager.java | 51 +-
.../confignode/manager/PermissionManager.java | 2 +-
.../iotdb/confignode/manager/ProcedureManager.java | 21 +-
.../iotdb/confignode/manager/UDFManager.java | 33 +-
.../iotdb/confignode/manager/load/LoadManager.java | 15 +-
.../iotdb/confignode/manager/node/NodeManager.java | 71 ++-
.../manager/partition/PartitionManager.java | 5 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 198 ++++++--
.../procedure/env/DataNodeRemoveHandler.java | 4 +-
.../procedure/impl/CreateTriggerProcedure.java | 1 +
.../procedure/impl/DropTriggerProcedure.java | 1 +
.../impl/{ => node}/AbstractNodeProcedure.java | 4 +-
.../impl/{ => node}/AddConfigNodeProcedure.java | 2 +-
.../impl/{ => node}/RemoveConfigNodeProcedure.java | 2 +-
.../impl/{ => node}/RemoveDataNodeProcedure.java | 3 +-
.../CreateRegionGroupsProcedure.java | 37 +-
.../DeleteStorageGroupProcedure.java | 3 +-
.../DeleteTimeSeriesProcedure.java | 111 ++---
.../{ => statemachine}/RegionMigrateProcedure.java | 3 +-
.../statemachine}/StateMachineProcedure.java | 3 +-
.../procedure/store/ProcedureFactory.java | 14 +-
.../iotdb/confignode/service/ConfigNode.java | 2 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 7 +-
.../procedure/entity/SimpleSTMProcedure.java | 2 +-
.../procedure/entity/StuckSTMProcedure.java | 2 +-
.../impl/CreateRegionGroupsProcedureTest.java | 5 +-
.../impl/DeleteStorageGroupProcedureTest.java | 1 +
.../impl/DeleteTimeSeriesProcedureTest.java | 1 +
65 files changed, 1074 insertions(+), 2515 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeHeartbeatClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 37dd3a5555..8dd67da315 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.async.confignode;
+package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeHeartbeatServiceClient;
-import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
public class AsyncConfigNodeHeartbeatClientPool {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
new file mode 100644
index 0000000000..98fe826e93
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteTimeSeriesRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
+public class AsyncDataNodeClientPool {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
+
+ private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
+
+ private static final int MAX_RETRY_NUM = 6;
+
+ private AsyncDataNodeClientPool() {
+ clientManager =
+ new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ /**
+ * Send asynchronous requests to the specified DataNodes
+ *
+ * <p>Notice: The DataNodes that failed to receive the requests will be reconnected
+ *
+ * @param clientHandler <RequestType, ResponseType> which will also contain the result
+ */
+ public void sendAsyncRequestToDataNodeWithRetry(AsyncClientHandler<?, ?> clientHandler) {
+ if (clientHandler.getRequestIndices().isEmpty()) {
+ return;
+ }
+
+ DataNodeRequestType requestType = clientHandler.getRequestType();
+ for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
+ // Always Reset CountDownLatch first
+ clientHandler.resetCountDownLatch();
+
+ // Send requests to all targetDataNodes
+ for (int requestId : clientHandler.getRequestIndices()) {
+ TDataNodeLocation targetDataNode = clientHandler.getDataNodeLocation(requestId);
+ sendAsyncRequestToDataNode(clientHandler, requestId, targetDataNode, retry);
+ }
+
+ // Wait for this batch of asynchronous RPC requests finish
+ try {
+ clientHandler.getCountDownLatch().await();
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted during {} on ConfigNode", requestType);
+ }
+
+ // Check if there is a DataNode that fails to execute the request, and retry if there exists
+ if (clientHandler.getRequestIndices().isEmpty()) {
+ return;
+ }
+ }
+ }
+
+ private void sendAsyncRequestToDataNode(
+ AsyncClientHandler<?, ?> clientHandler,
+ int requestId,
+ TDataNodeLocation targetDataNode,
+ int retryCount) {
+
+ try {
+ AsyncDataNodeInternalServiceClient client;
+ client = clientManager.borrowClient(targetDataNode.getInternalEndPoint());
+
+ switch (clientHandler.getRequestType()) {
+ case SET_TTL:
+ client.setTTL(
+ (TSetTTLReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case CREATE_DATA_REGION:
+ client.createDataRegion(
+ (TCreateDataRegionReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case CREATE_SCHEMA_REGION:
+ client.createSchemaRegion(
+ (TCreateSchemaRegionReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case CREATE_FUNCTION:
+ client.createFunction(
+ (TCreateFunctionRequest) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case DROP_FUNCTION:
+ client.dropFunction(
+ (TDropFunctionRequest) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case CREATE_TRIGGER_INSTANCE:
+ client.createTriggerInstance(
+ (TCreateTriggerInstanceReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case DROP_TRIGGER_INSTANCE:
+ client.dropTriggerInstance(
+ (TDropTriggerInstanceReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case ACTIVE_TRIGGER_INSTANCE:
+ client.activeTriggerInstance(
+ (TActiveTriggerInstanceReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case INACTIVE_TRIGGER_INSTANCE:
+ client.inactiveTriggerInstance(
+ (TInactiveTriggerInstanceReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case MERGE:
+ case FULL_MERGE:
+ client.merge(
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case FLUSH:
+ client.flush(
+ (TFlushReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case CLEAR_CACHE:
+ client.clearCache(
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case LOAD_CONFIGURATION:
+ client.loadConfiguration(
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case SET_SYSTEM_STATUS:
+ client.setSystemStatus(
+ (String) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case UPDATE_REGION_ROUTE_MAP:
+ client.updateRegionCache(
+ (TRegionRouteReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+ client.updateConfigNodeGroup(
+ (TUpdateConfigNodeGroupReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case CONSTRUCT_SCHEMA_BLACK_LIST:
+ client.constructSchemaBlackList(
+ (TConstructSchemaBlackListReq) clientHandler.getRequest(requestId),
+ (DeleteTimeSeriesRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case ROLLBACK_SCHEMA_BLACK_LIST:
+ client.rollbackSchemaBlackList(
+ (TRollbackSchemaBlackListReq) clientHandler.getRequest(requestId),
+ (DeleteTimeSeriesRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case FETCH_SCHEMA_BLACK_LIST:
+ client.fetchSchemaBlackList(
+ (TFetchSchemaBlackListReq) clientHandler.getRequest(requestId),
+ (FetchSchemaBlackListRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case INVALIDATE_MATCHED_SCHEMA_CACHE:
+ client.invalidateMatchedSchemaCache(
+ (TInvalidateMatchedSchemaCacheReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case DELETE_DATA_FOR_DELETE_TIMESERIES:
+ client.deleteDataForDeleteTimeSeries(
+ (TDeleteDataForDeleteTimeSeriesReq) clientHandler.getRequest(requestId),
+ (DeleteTimeSeriesRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case DELETE_TIMESERIES:
+ client.deleteTimeSeries(
+ (TDeleteTimeSeriesReq) clientHandler.getRequest(requestId),
+ (DeleteTimeSeriesRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ default:
+ LOGGER.error(
+ "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
+ clientHandler.getRequestType());
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} failed on DataNode {}, because {}, retrying {}...",
+ clientHandler.getRequestType(),
+ targetDataNode.getInternalEndPoint(),
+ e.getMessage(),
+ retryCount);
+ }
+ }
+
+ /**
+ * Always call this interface when a DataNode is restarted or removed
+ *
+ * @param endPoint The specific DataNode
+ */
+ public void resetClient(TEndPoint endPoint) {
+ clientManager.clear(endPoint);
+ }
+
+ // TODO: Is the ClientPool must be a singleton?
+ private static class ClientPoolHolder {
+
+ private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
+
+ private ClientPoolHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static AsyncDataNodeClientPool getInstance() {
+ return ClientPoolHolder.INSTANCE;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeHeartbeatClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index b1a51a505e..a42616c722 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.async.datanode;
+package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeHeartbeatServiceClient;
-import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
deleted file mode 100644
index 746fed9db4..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.datanode;
-
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.ClearCacheHandler;
-import org.apache.iotdb.confignode.client.async.handlers.ConstructSchemaBlackListHandler;
-import org.apache.iotdb.confignode.client.async.handlers.CreateRegionHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteDataForDeleteTimeSeriesHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteTimeSeriesHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FetchSchemaBlackLsitHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
-import org.apache.iotdb.confignode.client.async.handlers.InvalidateMatchedSchemaCacheHandler;
-import org.apache.iotdb.confignode.client.async.handlers.LoadConfigurationHandler;
-import org.apache.iotdb.confignode.client.async.handlers.MergeHandler;
-import org.apache.iotdb.confignode.client.async.handlers.RollbackSchemaBlackListHandler;
-import org.apache.iotdb.confignode.client.async.handlers.SetSystemStatusHandler;
-import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
-import org.apache.iotdb.confignode.client.async.handlers.TriggerManagementHandler;
-import org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
-import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
-import org.apache.iotdb.confignode.client.async.task.AbstractDataNodeTask;
-import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
-public class AsyncDataNodeClientPool {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
-
- private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
-
- private static final int MAX_RETRY_NUM = 6;
-
- private AsyncDataNodeClientPool() {
- clientManager =
- new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
- .createClientManager(
- new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
- }
-
- /**
- * Send asynchronize requests to the specific DataNodes, and reconnect the DataNode that failed to
- * receive the requests
- *
- * @param req request
- * @param dataNodeLocationMap Map<DataNodeId, TDataNodeLocation>
- * @param requestType DataNodeRequestType
- * @param dataNodeResponseStatus response list.Used by CREATE_FUNCTION,DROP_FUNCTION and FLUSH
- */
- public void sendAsyncRequestToDataNodeWithRetry(
- Object req,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- DataNodeRequestType requestType,
- List<TSStatus> dataNodeResponseStatus) {
- if (dataNodeLocationMap.isEmpty()) {
- return;
- }
- for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
- CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
- for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
- AbstractRetryHandler handler;
- switch (requestType) {
- case SET_TTL:
- handler =
- new SetTTLHandler(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- break;
- case CREATE_FUNCTION:
- case DROP_FUNCTION:
- handler =
- new FunctionManagementHandler(
- countDownLatch,
- requestType,
- targetDataNode,
- dataNodeLocationMap,
- dataNodeResponseStatus);
- break;
- case CREATE_TRIGGER_INSTANCE:
- case DROP_TRIGGER_INSTANCE:
- case ACTIVE_TRIGGER_INSTANCE:
- case INACTIVE_TRIGGER_INSTANCE:
- handler =
- new TriggerManagementHandler(
- countDownLatch,
- requestType,
- targetDataNode,
- dataNodeLocationMap,
- dataNodeResponseStatus);
- break;
- case FULL_MERGE:
- case MERGE:
- handler =
- new MergeHandler(
- countDownLatch,
- requestType,
- targetDataNode,
- dataNodeLocationMap,
- dataNodeResponseStatus);
- break;
- case FLUSH:
- handler =
- new FlushHandler(
- countDownLatch,
- requestType,
- targetDataNode,
- dataNodeLocationMap,
- dataNodeResponseStatus);
- break;
- case CLEAR_CACHE:
- handler =
- new ClearCacheHandler(
- countDownLatch,
- requestType,
- targetDataNode,
- dataNodeLocationMap,
- dataNodeResponseStatus);
- break;
- case LOAD_CONFIGURATION:
- handler =
- new LoadConfigurationHandler(
- countDownLatch,
- requestType,
- targetDataNode,
- dataNodeLocationMap,
- dataNodeResponseStatus);
- break;
- case SET_SYSTEM_STATUS:
- handler =
- new SetSystemStatusHandler(
- countDownLatch,
- requestType,
- targetDataNode,
- dataNodeLocationMap,
- dataNodeResponseStatus);
- break;
- case UPDATE_REGION_ROUTE_MAP:
- handler =
- new UpdateRegionRouteMapHandler(
- countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- break;
- case BROADCAST_LATEST_CONFIG_NODE_GROUP:
- handler =
- new UpdateConfigNodeGroupHandler(
- countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- break;
- case CONSTRUCT_SCHEMA_BLACK_LIST:
- handler =
- new ConstructSchemaBlackListHandler(
- countDownLatch, targetDataNode, dataNodeLocationMap);
- break;
- case ROLLBACK_SCHEMA_BLACK_LIST:
- handler =
- new RollbackSchemaBlackListHandler(
- countDownLatch, targetDataNode, dataNodeLocationMap);
- break;
- case FETCH_SCHEMA_BLACK_LIST:
- handler =
- new FetchSchemaBlackLsitHandler(
- countDownLatch, targetDataNode, dataNodeLocationMap);
- break;
- case INVALIDATE_MATCHED_SCHEMA_CACHE:
- handler =
- new InvalidateMatchedSchemaCacheHandler(
- countDownLatch, targetDataNode, dataNodeLocationMap);
- break;
- case DELETE_DATA_FOR_DELETE_TIMESERIES:
- handler =
- new DeleteDataForDeleteTimeSeriesHandler(
- countDownLatch, targetDataNode, dataNodeLocationMap);
- break;
- case DELETE_TIMESERIES:
- handler =
- new DeleteTimeSeriesHandler(countDownLatch, targetDataNode, dataNodeLocationMap);
- break;
- default:
- return;
- }
- sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
- }
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- LOGGER.error("Interrupted during {} on ConfigNode", requestType);
- }
- // Check if there is a node that fails to send the request, and retry if there is one
- if (dataNodeLocationMap.isEmpty()) {
- break;
- }
- }
- }
-
- public void sendAsyncRequestToDataNodeWithRetry(
- Object req, AbstractDataNodeTask<?> dataNodeTask) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap = dataNodeTask.getDataNodeLocationMap();
- if (dataNodeLocationMap.isEmpty()) {
- return;
- }
- for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
- CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
- for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
- AbstractRetryHandler handler = dataNodeTask.getSingleRequestHandler();
- handler.setCountDownLatch(countDownLatch);
- handler.setTargetDataNode(targetDataNode);
- sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
- }
-
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- LOGGER.error("Interrupted during {} on ConfigNode", dataNodeTask.getDataNodeRequestType());
- }
- // Check if there is a node that fails to send the request, and retry if there is one
- if (dataNodeLocationMap.isEmpty()) {
- break;
- }
- }
- }
-
- public void sendAsyncRequestToDataNode(
- TDataNodeLocation dataNodeLocation,
- Object req,
- AbstractRetryHandler handler,
- int retryCount) {
- AsyncDataNodeInternalServiceClient client;
- try {
- client = clientManager.borrowClient(dataNodeLocation.getInternalEndPoint());
- switch (handler.getDataNodeRequestType()) {
- case SET_TTL:
- client.setTTL((TSetTTLReq) req, (SetTTLHandler) handler);
- break;
- case CREATE_DATA_REGION:
- client.createDataRegion((TCreateDataRegionReq) req, (CreateRegionHandler) handler);
- break;
- case CREATE_SCHEMA_REGION:
- client.createSchemaRegion((TCreateSchemaRegionReq) req, (CreateRegionHandler) handler);
- break;
- case CREATE_FUNCTION:
- client.createFunction((TCreateFunctionRequest) req, (FunctionManagementHandler) handler);
- break;
- case DROP_FUNCTION:
- client.dropFunction((TDropFunctionRequest) req, (FunctionManagementHandler) handler);
- break;
- case CREATE_TRIGGER_INSTANCE:
- client.createTriggerInstance(
- (TCreateTriggerInstanceReq) req, (TriggerManagementHandler) handler);
- break;
- case DROP_TRIGGER_INSTANCE:
- client.dropTriggerInstance(
- (TDropTriggerInstanceReq) req, (TriggerManagementHandler) handler);
- break;
- case ACTIVE_TRIGGER_INSTANCE:
- client.activeTriggerInstance(
- (TActiveTriggerInstanceReq) req, (TriggerManagementHandler) handler);
- break;
- case INACTIVE_TRIGGER_INSTANCE:
- client.inactiveTriggerInstance(
- (TInactiveTriggerInstanceReq) req, (TriggerManagementHandler) handler);
- break;
- case MERGE:
- case FULL_MERGE:
- client.merge((MergeHandler) handler);
- break;
- case FLUSH:
- client.flush((TFlushReq) req, (FlushHandler) handler);
- break;
- case CLEAR_CACHE:
- client.clearCache((ClearCacheHandler) handler);
- break;
- case LOAD_CONFIGURATION:
- client.loadConfiguration((LoadConfigurationHandler) handler);
- break;
- case SET_SYSTEM_STATUS:
- client.setSystemStatus((String) req, (SetSystemStatusHandler) handler);
- break;
- case UPDATE_REGION_ROUTE_MAP:
- client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
- break;
- case BROADCAST_LATEST_CONFIG_NODE_GROUP:
- client.updateConfigNodeGroup(
- (TUpdateConfigNodeGroupReq) req, (UpdateConfigNodeGroupHandler) handler);
- break;
- case CONSTRUCT_SCHEMA_BLACK_LIST:
- client.constructSchemaBlackList(
- (TConstructSchemaBlackListReq) req, (ConstructSchemaBlackListHandler) handler);
- break;
- case ROLLBACK_SCHEMA_BLACK_LIST:
- client.rollbackSchemaBlackList(
- (TRollbackSchemaBlackListReq) req, (RollbackSchemaBlackListHandler) handler);
- break;
- case FETCH_SCHEMA_BLACK_LIST:
- client.fetchSchemaBlackList(
- (TFetchSchemaBlackListReq) req, (FetchSchemaBlackLsitHandler) handler);
- break;
- case INVALIDATE_MATCHED_SCHEMA_CACHE:
- client.invalidateMatchedSchemaCache(
- (TInvalidateMatchedSchemaCacheReq) req,
- (InvalidateMatchedSchemaCacheHandler) handler);
- break;
- case DELETE_DATA_FOR_DELETE_TIMESERIES:
- client.deleteDataForDeleteTimeSeries(
- (TDeleteDataForDeleteTimeSeriesReq) req,
- (DeleteDataForDeleteTimeSeriesHandler) handler);
- break;
- case DELETE_TIMESERIES:
- client.deleteTimeSeries((TDeleteTimeSeriesReq) req, (DeleteTimeSeriesHandler) handler);
- break;
- default:
- LOGGER.error(
- "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
- handler.getDataNodeRequestType());
- }
- } catch (Exception e) {
- LOGGER.warn(
- "{} failed on ConfigNode {}, because {}, retrying {}...",
- handler.getDataNodeRequestType(),
- dataNodeLocation.getInternalEndPoint(),
- e.getMessage(),
- retryCount);
- }
- }
-
- /**
- * Execute CreateRegionGroupsPlan asynchronously
- *
- * @param ttlMap Map<StorageGroupName, TTL>
- * @return Those RegionReplicas that failed to create
- */
- public Map<TConsensusGroupId, TRegionReplicaSet> createRegionGroups(
- CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
- // Because different requests will be sent to the same node when createRegions,
- // so for CreateRegions use Map<index, TDataNodeLocation>
- Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- int index = 0;
- // Count the DataNodes to be sent
- for (List<TRegionReplicaSet> regionReplicaSets :
- createRegionGroupsPlan.getRegionGroupMap().values()) {
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
- for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
- dataNodeLocationMap.put(index++, dataNodeLocation);
- }
- }
- }
- if (dataNodeLocationMap.isEmpty()) {
- return new HashMap<>();
- }
- for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
- index = 0;
- CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
- for (Map.Entry<String, List<TRegionReplicaSet>> entry :
- createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
- // Enumerate each RegionReplicaSet
- for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
- // Enumerate each Region
- for (TDataNodeLocation targetDataNode : regionReplicaSet.getDataNodeLocations()) {
- if (dataNodeLocationMap.containsKey(index)) {
- AbstractRetryHandler handler;
- switch (regionReplicaSet.getRegionId().getType()) {
- case SchemaRegion:
- handler =
- new CreateRegionHandler(
- countDownLatch,
- DataNodeRequestType.CREATE_SCHEMA_REGION,
- regionReplicaSet.regionId,
- targetDataNode,
- dataNodeLocationMap,
- index++);
- sendAsyncRequestToDataNode(
- targetDataNode,
- genCreateSchemaRegionReq(entry.getKey(), regionReplicaSet),
- handler,
- retry);
- break;
- case DataRegion:
- handler =
- new CreateRegionHandler(
- countDownLatch,
- DataNodeRequestType.CREATE_DATA_REGION,
- regionReplicaSet.regionId,
- targetDataNode,
- dataNodeLocationMap,
- index++);
- sendAsyncRequestToDataNode(
- targetDataNode,
- genCreateDataRegionReq(
- entry.getKey(), regionReplicaSet, ttlMap.get(entry.getKey())),
- handler,
- retry);
- break;
- default:
- break;
- }
- } else {
- index++;
- }
- }
- }
- }
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- LOGGER.error("Interrupted during createRegions on ConfigNode");
- }
- // Check if there is a node that fails to send the request, and
- // retry if there is one
- if (dataNodeLocationMap.isEmpty()) {
- break;
- }
- }
-
- // Filter RegionGroups that weren't created successfully
- index = 0;
- Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
- for (List<TRegionReplicaSet> regionReplicaSets :
- createRegionGroupsPlan.getRegionGroupMap().values()) {
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
- for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
- if (dataNodeLocationMap.containsKey(index)) {
- failedRegions
- .computeIfAbsent(
- regionReplicaSet.getRegionId(),
- empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
- .addToDataNodeLocations(dataNodeLocation);
- }
- index += 1;
- }
- }
- }
- return failedRegions;
- }
-
- private TCreateSchemaRegionReq genCreateSchemaRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet) {
- TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- return req;
- }
-
- private TCreateDataRegionReq genCreateDataRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
- TCreateDataRegionReq req = new TCreateDataRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- req.setTtl(TTL);
- return req;
- }
-
- /**
- * notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced
- *
- * @param registeredDataNodeLocationMap Map<Integer, TDataNodeLocation>
- * @param registeredConfigNodes List<TConfigNodeLocation>
- */
- public void broadCastTheLatestConfigNodeGroup(
- Map<Integer, TDataNodeLocation> registeredDataNodeLocationMap,
- List<TConfigNodeLocation> registeredConfigNodes) {
- if (registeredDataNodeLocationMap != null) {
- TUpdateConfigNodeGroupReq updateConfigNodeGroupReq =
- new TUpdateConfigNodeGroupReq(registeredConfigNodes);
- LOGGER.info("Begin to broadcast the latest configNodeGroup: {}", registeredConfigNodes);
- sendAsyncRequestToDataNodeWithRetry(
- updateConfigNodeGroupReq,
- registeredDataNodeLocationMap,
- DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
- null);
- LOGGER.info("Broadcast the latest configNodeGroup finished.");
- }
- }
-
- /**
- * Always call this interface when a DataNode is restarted or removed
- *
- * @param endPoint The specific DataNode
- */
- public void resetClient(TEndPoint endPoint) {
- clientManager.clear(endPoint);
- }
-
- // TODO: Is the ClientPool must be a singleton?
- private static class ClientPoolHolder {
-
- private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
-
- private ClientPoolHolder() {
- // Empty constructor
- }
- }
-
- public static AsyncDataNodeClientPool getInstance() {
- return ClientPoolHolder.INSTANCE;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
deleted file mode 100644
index b3a5fd9328..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public abstract class AbstractRetryHandler {
-
- protected CountDownLatch countDownLatch;
-
- /**
- * Map<DataNodeId, TDataNodeLocation> The DataNode that successfully execute the request will be
- * removed from this list
- */
- protected Map<Integer, TDataNodeLocation> dataNodeLocationMap;
- /** Request type to DataNode */
- protected DataNodeRequestType dataNodeRequestType;
- /** Target DataNode */
- protected TDataNodeLocation targetDataNode;
-
- public AbstractRetryHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType dataNodeRequestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- this.countDownLatch = countDownLatch;
- this.dataNodeLocationMap = dataNodeLocationMap;
- this.dataNodeRequestType = dataNodeRequestType;
- this.targetDataNode = targetDataNode;
- }
-
- public AbstractRetryHandler(
- DataNodeRequestType dataNodeRequestType,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- this.dataNodeLocationMap = dataNodeLocationMap;
- this.dataNodeRequestType = dataNodeRequestType;
- }
-
- public DataNodeRequestType getDataNodeRequestType() {
- return dataNodeRequestType;
- }
-
- public Map<Integer, TDataNodeLocation> getDataNodeLocationMap() {
- return dataNodeLocationMap;
- }
-
- public void setCountDownLatch(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
-
- public void setTargetDataNode(TDataNodeLocation targetDataNode) {
- this.targetDataNode = targetDataNode;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
new file mode 100644
index 0000000000..60c7878fe4
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.AbstractAsyncRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteTimeSeriesRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Asynchronous Client handler
+ *
+ * @param <Q> ClassName of RPC request
+ * @param <R> ClassName of RPC response
+ */
+public class AsyncClientHandler<Q, R> {
+
+ // Type of RPC request
+ protected final DataNodeRequestType requestType;
+
+ /**
+ * Map key: The indices of asynchronous RPC requests
+ *
+ * <p>Map value: The corresponding RPC request
+ */
+ private final Map<Integer, Q> requestMap;
+
+ /**
+ * Map key: The indices of asynchronous RPC requests
+ *
+ * <p>Map value: The target DataNodes of corresponding indices
+ *
+ * <p>All kinds of AsyncHandler will remove its targetDataNode from the dataNodeLocationMap only
+ * if its corresponding RPC request success
+ */
+ private final Map<Integer, TDataNodeLocation> dataNodeLocationMap;
+
+ /**
+ * Map key: The indices(targetDataNode's ID) of asynchronous RPC requests
+ *
+ * <p>Map value: The response of corresponding indices
+ *
+ * <p>All kinds of AsyncHandler will add response to the responseMap after its corresponding RPC
+ * request finished
+ */
+ private final Map<Integer, R> responseMap;
+
+ private CountDownLatch countDownLatch;
+
+ /** Custom constructor */
+ public AsyncClientHandler(DataNodeRequestType requestType) {
+ this.requestType = requestType;
+ this.requestMap = new ConcurrentHashMap<>();
+ this.dataNodeLocationMap = new ConcurrentHashMap<>();
+ this.responseMap = new ConcurrentHashMap<>();
+ }
+
+ public void putRequest(int requestId, Q request) {
+ requestMap.put(requestId, request);
+ }
+
+ public void putDataNodeLocation(int requestId, TDataNodeLocation dataNodeLocation) {
+ dataNodeLocationMap.put(requestId, dataNodeLocation);
+ }
+
+ /** Constructor for null requests */
+ public AsyncClientHandler(
+ DataNodeRequestType requestType, Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+ this.requestType = requestType;
+ this.dataNodeLocationMap = dataNodeLocationMap;
+
+ this.requestMap = new ConcurrentHashMap<>();
+ this.responseMap = new ConcurrentHashMap<>();
+ }
+
+ /** Constructor for unique request */
+ public AsyncClientHandler(
+ DataNodeRequestType requestType,
+ Q request,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+ this.requestType = requestType;
+ this.dataNodeLocationMap = dataNodeLocationMap;
+
+ this.requestMap = new ConcurrentHashMap<>();
+ this.dataNodeLocationMap
+ .keySet()
+ .forEach(dataNodeId -> this.requestMap.put(dataNodeId, request));
+
+ this.responseMap = new ConcurrentHashMap<>();
+ }
+
+ public DataNodeRequestType getRequestType() {
+ return requestType;
+ }
+
+ public List<Integer> getRequestIndices() {
+ return new ArrayList<>(dataNodeLocationMap.keySet());
+ }
+
+ public Q getRequest(int requestId) {
+ return requestMap.get(requestId);
+ }
+
+ public TDataNodeLocation getDataNodeLocation(int requestId) {
+ return dataNodeLocationMap.get(requestId);
+ }
+
+ public List<R> getResponseList() {
+ return new ArrayList<>(responseMap.values());
+ }
+
+ public Map<Integer, R> getResponseMap() {
+ return responseMap;
+ }
+
+ /** Always reset CountDownLatch before retry */
+ public void resetCountDownLatch() {
+ countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public AbstractAsyncRPCHandler<?> createAsyncRPCHandler(
+ int requestId, TDataNodeLocation targetDataNode) {
+ switch (requestType) {
+ case CONSTRUCT_SCHEMA_BLACK_LIST:
+ case ROLLBACK_SCHEMA_BLACK_LIST:
+ case DELETE_DATA_FOR_DELETE_TIMESERIES:
+ case DELETE_TIMESERIES:
+ return new DeleteTimeSeriesRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map<Integer, TSStatus>) responseMap,
+ countDownLatch);
+ case FETCH_SCHEMA_BLACK_LIST:
+ return new FetchSchemaBlackListRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map<Integer, TFetchSchemaBlackListResp>) responseMap,
+ countDownLatch);
+ case SET_TTL:
+ case CREATE_DATA_REGION:
+ case CREATE_SCHEMA_REGION:
+ case CREATE_FUNCTION:
+ case DROP_FUNCTION:
+ case CREATE_TRIGGER_INSTANCE:
+ case DROP_TRIGGER_INSTANCE:
+ case ACTIVE_TRIGGER_INSTANCE:
+ case INACTIVE_TRIGGER_INSTANCE:
+ case MERGE:
+ case FULL_MERGE:
+ case FLUSH:
+ case CLEAR_CACHE:
+ case LOAD_CONFIGURATION:
+ case SET_SYSTEM_STATUS:
+ case UPDATE_REGION_ROUTE_MAP:
+ case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+ case INVALIDATE_MATCHED_SCHEMA_CACHE:
+ default:
+ return new AsyncTSStatusRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map<Integer, TSStatus>) responseMap,
+ countDownLatch);
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ClearCacheHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ClearCacheHandler.java
deleted file mode 100644
index 67a02843e9..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ClearCacheHandler.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class ClearCacheHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ClearCacheHandler.class);
-
- private final List<TSStatus> dataNodeResponseStatus;
-
- public ClearCacheHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- List<TSStatus> dataNodeResponseStatus) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- this.dataNodeResponseStatus = dataNodeResponseStatus;
- }
-
- @Override
- public void onComplete(TSStatus response) {
- if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeResponseStatus.add(response);
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully Clear Cache on DataNode: {}", targetDataNode);
- } else {
- LOGGER.error(
- "Failed to Clear Cache on DataNode {}, {}",
- dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
- response);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- countDownLatch.countDown();
- dataNodeResponseStatus.add(
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Clear Cache error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + exception.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConstructSchemaBlackListHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConstructSchemaBlackListHandler.java
deleted file mode 100644
index bce6a47d5b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConstructSchemaBlackListHandler.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class ConstructSchemaBlackListHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(ConstructSchemaBlackListHandler.class);
-
- private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
- public ConstructSchemaBlackListHandler(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- Map<Integer, TSStatus> dataNodeResponseStatusMap) {
- super(DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST, dataNodeLocationMap);
- this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
- }
-
- public ConstructSchemaBlackListHandler(
- CountDownLatch countDownLatch,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(
- countDownLatch,
- DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST,
- targetDataNode,
- dataNodeLocationMap);
- this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
- }
-
- @Override
- public void onComplete(TSStatus tsStatus) {
- dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully construct schema black list on DataNode: {}", targetDataNode);
- } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.error(
- "Failed to construct schema black list on DataNode {}, {}", targetDataNode, tsStatus);
- } else {
- LOGGER.error(
- "Failed to construct schema black list on DataNode {}, {}", targetDataNode, tsStatus);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- countDownLatch.countDown();
- dataNodeResponseStatusMap.put(
- targetDataNode.dataNodeId,
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Construct schema black list error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + e.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
deleted file mode 100644
index 3cef7ed55a..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-/** Only use CreateRegionHandler when the LoadManager wants to create Regions */
-public class CreateRegionHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionHandler.class);
-
- // The index of dataNodeLocations
- // We use Index instead of DataNodeId because it is possible to send multiple createRegion
- // requests to the same DataNode
- private final int index;
- // Used for Logger
- private final TConsensusGroupId consensusGroupId;
-
- public CreateRegionHandler(
- CountDownLatch latch,
- DataNodeRequestType requestType,
- TConsensusGroupId consensusGroupId,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- int index) {
- super(latch, requestType, targetDataNode, dataNodeLocationMap);
- this.consensusGroupId = consensusGroupId;
- this.index = index;
- }
-
- @Override
- public void onComplete(TSStatus tsStatus) {
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(index);
- LOGGER.info(
- String.format(
- "Successfully create %s on DataNode: %s",
- ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode));
- } else {
- LOGGER.error(
- String.format(
- "Create %s on DataNode: %s failed, %s",
- ConsensusGroupId.formatTConsensusGroupId(consensusGroupId),
- targetDataNode,
- tsStatus));
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- LOGGER.error(
- String.format(
- "Create %s on DataNode: %s failed, %s",
- ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode, e));
- countDownLatch.countDown();
- }
-
- public TConsensusGroupId getConsensusGroupId() {
- return consensusGroupId;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteDataForDeleteTimeSeriesHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteDataForDeleteTimeSeriesHandler.java
deleted file mode 100644
index d72fee586a..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteDataForDeleteTimeSeriesHandler.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class DeleteDataForDeleteTimeSeriesHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(DeleteDataForDeleteTimeSeriesHandler.class);
-
- private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
- public DeleteDataForDeleteTimeSeriesHandler(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- Map<Integer, TSStatus> dataNodeResponseStatusMap) {
- super(DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES, dataNodeLocationMap);
- this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
- }
-
- public DeleteDataForDeleteTimeSeriesHandler(
- CountDownLatch countDownLatch,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(
- countDownLatch,
- DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES,
- targetDataNode,
- dataNodeLocationMap);
- this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
- }
-
- public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
- return dataNodeResponseStatusMap;
- }
-
- @Override
- public void onComplete(TSStatus tsStatus) {
- dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully delete data for delete timeseries on DataNode: {}", targetDataNode);
- } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.error(
- "Failed to delete data for delete timeseries on DataNode {}, {}",
- targetDataNode,
- tsStatus);
- } else {
- LOGGER.error(
- "Failed to delete data for delete timeseries on DataNode {}, {}",
- targetDataNode,
- tsStatus);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- countDownLatch.countDown();
- dataNodeResponseStatusMap.put(
- targetDataNode.dataNodeId,
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Delete data for delete timeseries error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + e.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteTimeSeriesHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteTimeSeriesHandler.java
deleted file mode 100644
index e7c0cea47b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DeleteTimeSeriesHandler.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class DeleteTimeSeriesHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTimeSeriesHandler.class);
-
- private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
- public DeleteTimeSeriesHandler(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- Map<Integer, TSStatus> dataNodeResponseStatusMap) {
- super(DataNodeRequestType.DELETE_TIMESERIES, dataNodeLocationMap);
- this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
- }
-
- public DeleteTimeSeriesHandler(
- CountDownLatch countDownLatch,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(
- countDownLatch, DataNodeRequestType.DELETE_TIMESERIES, targetDataNode, dataNodeLocationMap);
- dataNodeResponseStatusMap = new ConcurrentHashMap<>();
- }
-
- public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
- return dataNodeResponseStatusMap;
- }
-
- @Override
- public void onComplete(TSStatus tsStatus) {
- dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully delete timeseries on DataNode: {}", targetDataNode);
- } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.error("Failed to delete timeseries on DataNode {}, {}", targetDataNode, tsStatus);
- } else {
- LOGGER.error("Failed to delete timeseries on DataNode {}, {}", targetDataNode, tsStatus);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- countDownLatch.countDown();
- dataNodeResponseStatusMap.put(
- targetDataNode.dataNodeId,
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Delete timeseries error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + e.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
deleted file mode 100644
index e48545a8ae..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class FunctionManagementHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FunctionManagementHandler.class);
-
- private final List<TSStatus> dataNodeResponseStatus;
-
- public FunctionManagementHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- List<TSStatus> dataNodeResponseStatus) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- this.dataNodeResponseStatus = dataNodeResponseStatus;
- }
-
- @Override
- public void onComplete(TSStatus response) {
- if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeResponseStatus.add(response);
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
- } else {
- LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- dataNodeResponseStatus.add(
- new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
- .setMessage(targetDataNode + exception.getMessage()));
- LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
- countDownLatch.countDown();
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/InvalidateMatchedSchemaCacheHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/InvalidateMatchedSchemaCacheHandler.java
deleted file mode 100644
index b620560fad..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/InvalidateMatchedSchemaCacheHandler.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class InvalidateMatchedSchemaCacheHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(InvalidateMatchedSchemaCacheHandler.class);
-
- private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
- public InvalidateMatchedSchemaCacheHandler(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- Map<Integer, TSStatus> dataNodeResponseStatusMap) {
- super(DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE, dataNodeLocationMap);
- this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
- }
-
- public InvalidateMatchedSchemaCacheHandler(
- CountDownLatch countDownLatch,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(
- countDownLatch,
- DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
- targetDataNode,
- dataNodeLocationMap);
- this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
- }
-
- public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
- return dataNodeResponseStatusMap;
- }
-
- @Override
- public void onComplete(TSStatus tsStatus) {
- dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully invalidate matched schema cache on DataNode: {}", targetDataNode);
- } else {
- LOGGER.error(
- "Failed to invalidate matched schema cache on DataNode {}, {}", targetDataNode, tsStatus);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- countDownLatch.countDown();
- dataNodeResponseStatusMap.put(
- targetDataNode.getDataNodeId(),
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Invalidate matched schema cache error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + e.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/LoadConfigurationHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/LoadConfigurationHandler.java
deleted file mode 100644
index 198ce3f5f6..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/LoadConfigurationHandler.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class LoadConfigurationHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(LoadConfigurationHandler.class);
-
- private final List<TSStatus> dataNodeResponseStatus;
-
- public LoadConfigurationHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- List<TSStatus> dataNodeResponseStatus) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- this.dataNodeResponseStatus = dataNodeResponseStatus;
- }
-
- @Override
- public void onComplete(TSStatus response) {
- if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeResponseStatus.add(response);
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully Load Configuration on DataNode: {}", targetDataNode);
- } else {
- dataNodeResponseStatus.add(response);
- LOGGER.error(
- "Failed to Load Configuration on DataNode {}, {}",
- dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
- response);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- countDownLatch.countDown();
- dataNodeResponseStatus.add(
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Load Configuration error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + exception.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/MergeHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/MergeHandler.java
deleted file mode 100644
index 5ed81310de..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/MergeHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class MergeHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MergeHandler.class);
-
- private final List<TSStatus> dataNodeResponseStatus;
-
- public MergeHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType dataNodeRequestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- List<TSStatus> dataNodeResponseStatus) {
- super(countDownLatch, dataNodeRequestType, targetDataNode, dataNodeLocationMap);
- this.dataNodeResponseStatus = dataNodeResponseStatus;
- }
-
- @Override
- public void onComplete(TSStatus response) {
- if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeResponseStatus.add(response);
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
- } else {
- LOGGER.error(
- "Failed to {} on DataNode {}, {}",
- dataNodeRequestType,
- dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
- response);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- countDownLatch.countDown();
- dataNodeResponseStatus.add(
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- dataNodeRequestType
- + " error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + exception.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/RollbackSchemaBlackListHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/RollbackSchemaBlackListHandler.java
deleted file mode 100644
index c1ea9b08f1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/RollbackSchemaBlackListHandler.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-public class RollbackSchemaBlackListHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(RollbackSchemaBlackListHandler.class);
-
- private Map<Integer, TSStatus> dataNodeResponseStatusMap;
-
- public RollbackSchemaBlackListHandler(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- Map<Integer, TSStatus> dataNodeResponseStatusMap) {
- super(DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST, dataNodeLocationMap);
- this.dataNodeResponseStatusMap = dataNodeResponseStatusMap;
- }
-
- public RollbackSchemaBlackListHandler(
- CountDownLatch countDownLatch,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(
- countDownLatch,
- DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST,
- targetDataNode,
- dataNodeLocationMap);
- this.dataNodeResponseStatusMap = new ConcurrentHashMap<>();
- }
-
- public Map<Integer, TSStatus> getDataNodeResponseStatusMap() {
- return dataNodeResponseStatusMap;
- }
-
- @Override
- public void onComplete(TSStatus tsStatus) {
- dataNodeResponseStatusMap.put(targetDataNode.getDataNodeId(), tsStatus);
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully rollback schema black list on DataNode: {}", targetDataNode);
- } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.error(
- "Failed to rollback schema black list on DataNode {}, {}", targetDataNode, tsStatus);
- } else {
- LOGGER.error(
- "Failed to rollback schema black list on DataNode {}, {}", targetDataNode, tsStatus);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- countDownLatch.countDown();
- dataNodeResponseStatusMap.put(
- targetDataNode.getDataNodeId(),
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Rollback schema black list error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + e.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java
deleted file mode 100644
index 92f00aa766..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class SetSystemStatusHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SetSystemStatusHandler.class);
-
- private final List<TSStatus> dataNodeResponseStatus;
-
- public SetSystemStatusHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- List<TSStatus> dataNodeResponseStatus) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- this.dataNodeResponseStatus = dataNodeResponseStatus;
- }
-
- @Override
- public void onComplete(TSStatus response) {
- if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeResponseStatus.add(response);
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully Set System Status on DataNode: {}", targetDataNode);
- } else {
- dataNodeResponseStatus.add(response);
- LOGGER.error(
- "Failed to Set System Status on DataNode {}, {}",
- dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
- response);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- countDownLatch.countDown();
- dataNodeResponseStatus.add(
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Set System Status error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + exception.getMessage())));
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
deleted file mode 100644
index 6792a49e7f..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class SetTTLHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLHandler.class);
-
- public SetTTLHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- }
-
- @Override
- public void onComplete(TSStatus response) {
- if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully SetTTL on DataNode: {}", targetDataNode);
- } else {
- LOGGER.error("Failed to SetTTL on DataNode: {}, {}", targetDataNode, response);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- countDownLatch.countDown();
- LOGGER.error("Failed to SetTTL on DataNode: {}", targetDataNode);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
deleted file mode 100644
index 00d8047bdf..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class TriggerManagementHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManagementHandler.class);
-
- private final List<TSStatus> dataNodeResponseStatus;
-
- public TriggerManagementHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- List<TSStatus> dataNodeResponseStatus) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- this.dataNodeResponseStatus = dataNodeResponseStatus;
- }
-
- @Override
- public void onComplete(TSStatus response) {
- dataNodeResponseStatus.add(response);
- if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
- } else {
- LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- dataNodeResponseStatus.add(
- new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
- .setMessage(targetDataNode + exception.getMessage()));
- LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
- countDownLatch.countDown();
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
deleted file mode 100644
index e4cf71b4b1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class UpdateConfigNodeGroupHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(UpdateConfigNodeGroupHandler.class);
-
- public UpdateConfigNodeGroupHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- }
-
- @Override
- public void onComplete(TSStatus status) {
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info(
- "Successfully broadCast the latest configNodeGroup on DataNode: {}", targetDataNode);
- } else {
- LOGGER.error(
- "Failed to broadCast the latest configNodeGroup on DataNode: {}, {}",
- targetDataNode,
- status);
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- LOGGER.error("BroadCast the latest configNodeGroup on DataNode: {} failed", targetDataNode);
- countDownLatch.countDown();
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
deleted file mode 100644
index 30a88953c1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class UpdateRegionRouteMapHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TSStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
-
- public UpdateRegionRouteMapHandler(
- CountDownLatch countDownLatch,
- DataNodeRequestType requestType,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- }
-
- @Override
- public void onComplete(TSStatus status) {
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info(
- "Successfully update the RegionRouteMap on DataNode: {}", targetDataNode.getDataNodeId());
- } else {
- LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode.getDataNodeId());
- }
- countDownLatch.countDown();
- }
-
- @Override
- public void onError(Exception e) {
- LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode.getDataNodeId());
- countDownLatch.countDown();
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index 458267568e..62f2ea3676 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
import org.apache.iotdb.confignode.manager.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index ded73deaa2..fc61134e9d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AbstractAsyncRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AbstractAsyncRPCHandler.java
new file mode 100644
index 0000000000..7fe5d780f9
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AbstractAsyncRPCHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public abstract class AbstractAsyncRPCHandler<T> implements AsyncMethodCallback<T> {
+
+ // Type of RPC request
+ protected final DataNodeRequestType requestType;
+ // Index of request
+ protected final int requestId;
+ // Target DataNode
+ protected final TDataNodeLocation targetDataNode;
+
+ /**
+ * Map key: The indices of asynchronous RPC requests
+ *
+ * <p>Map value: The target DataNodes of corresponding indices
+ *
+ * <p>All kinds of AsyncHandler will remove its targetDataNode from the dataNodeLocationMap only
+ * if its corresponding RPC request success
+ */
+ protected final Map<Integer, TDataNodeLocation> dataNodeLocationMap;
+
+ /**
+ * Map key: The indices(targetDataNode's ID) of asynchronous RPC requests
+ *
+ * <p>Map value: The response of corresponding indices
+ *
+ * <p>All kinds of AsyncHandler will add response to the responseMap after its corresponding RPC
+ * request finished
+ */
+ protected final Map<Integer, T> responseMap;
+
+ // All kinds of AsyncHandler will invoke countDown after its corresponding RPC request finished
+ protected final CountDownLatch countDownLatch;
+
+ protected final String formattedTargetLocation;
+
+ public AbstractAsyncRPCHandler(
+ DataNodeRequestType requestType,
+ int requestId,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ Map<Integer, T> responseMap,
+ CountDownLatch countDownLatch) {
+ this.requestType = requestType;
+ this.requestId = requestId;
+ this.targetDataNode = targetDataNode;
+ this.formattedTargetLocation =
+ "{id="
+ + targetDataNode.getDataNodeId()
+ + ", internalEndPoint="
+ + targetDataNode.getInternalEndPoint()
+ + "}";
+
+ this.dataNodeLocationMap = dataNodeLocationMap;
+ this.responseMap = responseMap;
+ this.countDownLatch = countDownLatch;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AsyncTSStatusRPCHandler.java
similarity index 54%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AsyncTSStatusRPCHandler.java
index aa0c366ced..ace2eb58aa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/AsyncTSStatusRPCHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -24,57 +24,65 @@ import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
+/** General RPC handler for TSStatus response type */
+public class AsyncTSStatusRPCHandler extends AbstractAsyncRPCHandler<TSStatus> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlushHandler.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTSStatusRPCHandler.class);
- private final List<TSStatus> dataNodeResponseStatus;
-
- public FlushHandler(
- CountDownLatch countDownLatch,
+ public AsyncTSStatusRPCHandler(
DataNodeRequestType requestType,
+ int requestId,
TDataNodeLocation targetDataNode,
Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- List<TSStatus> dataNodeResponseStatus) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
- this.dataNodeResponseStatus = dataNodeResponseStatus;
+ Map<Integer, TSStatus> responseMap,
+ CountDownLatch countDownLatch) {
+ super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
}
@Override
public void onComplete(TSStatus response) {
+ // Put response
+ responseMap.put(requestId, response);
+
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeResponseStatus.add(response);
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully Flush on DataNode: {}", targetDataNode);
+ // Remove only if success
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
} else {
LOGGER.error(
- "Failed to Flush on DataNode {}, {}",
- dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
+ "Failed to {} on DataNode: {}, response: {}",
+ requestType,
+ formattedTargetLocation,
response);
}
+
+ // Always CountDown
countDownLatch.countDown();
}
@Override
- public void onError(Exception exception) {
- countDownLatch.countDown();
- dataNodeResponseStatus.add(
+ public void onError(Exception e) {
+ String errorMsg =
+ "Failed to "
+ + requestType
+ + " on DataNode: "
+ + formattedTargetLocation
+ + ", exception: "
+ + e.getMessage();
+ LOGGER.error(errorMsg);
+
+ responseMap.put(
+ requestId,
new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Flush error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + exception.getMessage())));
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
+
+ // Always CountDown
+ countDownLatch.countDown();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DeleteTimeSeriesRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DeleteTimeSeriesRPCHandler.java
new file mode 100644
index 0000000000..c2d9074d6e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DeleteTimeSeriesRPCHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class DeleteTimeSeriesRPCHandler extends AsyncTSStatusRPCHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTimeSeriesRPCHandler.class);
+
+ public DeleteTimeSeriesRPCHandler(
+ DataNodeRequestType requestType,
+ int requestId,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ Map<Integer, TSStatus> responseMap,
+ CountDownLatch countDownLatch) {
+ super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
+ }
+
+ @Override
+ public void onComplete(TSStatus tsStatus) {
+ responseMap.put(requestId, tsStatus);
+ LOGGER.info("{} for {} receives: {}", requestType, requestId, tsStatus);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
+ } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.error(
+ "Failed to {} on DataNode {}, {}", requestType, formattedTargetLocation, tsStatus);
+ } else {
+ LOGGER.error(
+ "Failed to {} on DataNode {}, {}", requestType, formattedTargetLocation, tsStatus);
+ }
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ String errorMsg =
+ requestType
+ + " error on DataNode: {id="
+ + targetDataNode.getDataNodeId()
+ + ", internalEndPoint="
+ + targetDataNode.getInternalEndPoint()
+ + "}"
+ + e.getMessage();
+ LOGGER.error(errorMsg);
+
+ countDownLatch.countDown();
+ responseMap.put(
+ requestId,
+ new TSStatus(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FetchSchemaBlackLsitHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
similarity index 55%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FetchSchemaBlackLsitHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
index 9db74c5f59..90c0f90ab6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FetchSchemaBlackLsitHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.client.async.handlers;
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -26,53 +26,37 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-public class FetchSchemaBlackLsitHandler extends AbstractRetryHandler
- implements AsyncMethodCallback<TFetchSchemaBlackListResp> {
+public class FetchSchemaBlackListRPCHandler
+ extends AbstractAsyncRPCHandler<TFetchSchemaBlackListResp> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FetchSchemaBlackLsitHandler.class);
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FetchSchemaBlackListRPCHandler.class);
- private Map<Integer, TFetchSchemaBlackListResp> dataNodeResponseMap;
-
- public FetchSchemaBlackLsitHandler(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- Map<Integer, TFetchSchemaBlackListResp> dataNodeResponseMap) {
- super(DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST, dataNodeLocationMap);
- this.dataNodeResponseMap = dataNodeResponseMap;
- }
-
- public FetchSchemaBlackLsitHandler(
- CountDownLatch countDownLatch,
+ public FetchSchemaBlackListRPCHandler(
+ DataNodeRequestType requestType,
+ int requestId,
TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(
- countDownLatch,
- DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST,
- targetDataNode,
- dataNodeLocationMap);
- this.dataNodeResponseMap = new ConcurrentHashMap<>();
- }
-
- public Map<Integer, TFetchSchemaBlackListResp> getDataNodeResponseMap() {
- return dataNodeResponseMap;
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ Map<Integer, TFetchSchemaBlackListResp> responseMap,
+ CountDownLatch countDownLatch) {
+ super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
}
@Override
public void onComplete(TFetchSchemaBlackListResp tFetchSchemaBlackListResp) {
TSStatus tsStatus = tFetchSchemaBlackListResp.getStatus();
- dataNodeResponseMap.put(targetDataNode.getDataNodeId(), tFetchSchemaBlackListResp);
+ responseMap.put(requestId, tFetchSchemaBlackListResp);
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+ dataNodeLocationMap.remove(requestId);
LOGGER.info("Successfully fetch schema black list on DataNode: {}", targetDataNode);
} else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+ dataNodeLocationMap.remove(requestId);
LOGGER.error(
"Failed to fetch schema black list on DataNode {}, {}", targetDataNode, tsStatus);
} else {
@@ -84,18 +68,20 @@ public class FetchSchemaBlackLsitHandler extends AbstractRetryHandler
@Override
public void onError(Exception e) {
+ String errorMsg =
+ "Fetch schema black list error on DataNode: {id="
+ + targetDataNode.getDataNodeId()
+ + ", internalEndPoint="
+ + targetDataNode.getInternalEndPoint()
+ + "}"
+ + e.getMessage();
+ LOGGER.error(errorMsg);
+
countDownLatch.countDown();
TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
resp.setStatus(
new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
- "Fetch schema black list error on DataNode: {id="
- + targetDataNode.getDataNodeId()
- + ", internalEndPoint="
- + targetDataNode.getInternalEndPoint()
- + "}"
- + e.getMessage())));
- dataNodeResponseMap.put(targetDataNode.getDataNodeId(), resp);
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
+ responseMap.put(requestId, resp);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/AbstractDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/AbstractDataNodeTask.java
deleted file mode 100644
index 10671031f0..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/AbstractDataNodeTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public abstract class AbstractDataNodeTask<T> {
-
- protected Map<Integer, TDataNodeLocation> dataNodeLocationMap;
-
- protected Map<Integer, T> dataNodeResponseMap = new ConcurrentHashMap<>();
-
- /** Must provide thread-safe map */
- public AbstractDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- this.dataNodeLocationMap = dataNodeLocationMap;
- }
-
- public Map<Integer, TDataNodeLocation> getDataNodeLocationMap() {
- return dataNodeLocationMap;
- }
-
- public abstract DataNodeRequestType getDataNodeRequestType();
-
- /** Provide the handler for single rpc process */
- public abstract AbstractRetryHandler getSingleRequestHandler();
-
- /** Get the final responses from all dataNodes */
- public Map<Integer, T> getDataNodeResponseMap() {
- return dataNodeResponseMap;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/ConstructSchemaBlackListDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/ConstructSchemaBlackListDataNodeTask.java
deleted file mode 100644
index 9e12a1b61b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/ConstructSchemaBlackListDataNodeTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.ConstructSchemaBlackListHandler;
-
-import java.util.Map;
-
-public class ConstructSchemaBlackListDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
- public ConstructSchemaBlackListDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(dataNodeLocationMap);
- }
-
- @Override
- public DataNodeRequestType getDataNodeRequestType() {
- return DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST;
- }
-
- @Override
- public AbstractRetryHandler getSingleRequestHandler() {
- return new ConstructSchemaBlackListHandler(dataNodeLocationMap, dataNodeResponseMap);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteDataForDeleteTimeSeriesDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteDataForDeleteTimeSeriesDataNodeTask.java
deleted file mode 100644
index 089068aa51..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteDataForDeleteTimeSeriesDataNodeTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteDataForDeleteTimeSeriesHandler;
-
-import java.util.Map;
-
-public class DeleteDataForDeleteTimeSeriesDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
- public DeleteDataForDeleteTimeSeriesDataNodeTask(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(dataNodeLocationMap);
- }
-
- @Override
- public DataNodeRequestType getDataNodeRequestType() {
- return DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES;
- }
-
- @Override
- public AbstractRetryHandler getSingleRequestHandler() {
- return new DeleteDataForDeleteTimeSeriesHandler(dataNodeLocationMap, dataNodeResponseMap);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteTimeSeriesDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteTimeSeriesDataNodeTask.java
deleted file mode 100644
index caf67790c2..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/DeleteTimeSeriesDataNodeTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DeleteTimeSeriesHandler;
-
-import java.util.Map;
-
-public class DeleteTimeSeriesDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
- public DeleteTimeSeriesDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(dataNodeLocationMap);
- }
-
- @Override
- public DataNodeRequestType getDataNodeRequestType() {
- return DataNodeRequestType.DELETE_TIMESERIES;
- }
-
- @Override
- public AbstractRetryHandler getSingleRequestHandler() {
- return new DeleteTimeSeriesHandler(dataNodeLocationMap, dataNodeResponseMap);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/FetchSchemaBlackListDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/FetchSchemaBlackListDataNodeTask.java
deleted file mode 100644
index 8545b7dd5c..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/FetchSchemaBlackListDataNodeTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FetchSchemaBlackLsitHandler;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-
-import java.util.Map;
-
-public class FetchSchemaBlackListDataNodeTask
- extends AbstractDataNodeTask<TFetchSchemaBlackListResp> {
-
- public FetchSchemaBlackListDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(dataNodeLocationMap);
- }
-
- @Override
- public DataNodeRequestType getDataNodeRequestType() {
- return DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST;
- }
-
- @Override
- public AbstractRetryHandler getSingleRequestHandler() {
- return new FetchSchemaBlackLsitHandler(dataNodeLocationMap, dataNodeResponseMap);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/InvalidateMatchedSchemaCacheDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/InvalidateMatchedSchemaCacheDataNodeTask.java
deleted file mode 100644
index 2a08fdcb99..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/InvalidateMatchedSchemaCacheDataNodeTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.InvalidateMatchedSchemaCacheHandler;
-
-import java.util.Map;
-
-public class InvalidateMatchedSchemaCacheDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
- public InvalidateMatchedSchemaCacheDataNodeTask(
- Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(dataNodeLocationMap);
- }
-
- @Override
- public DataNodeRequestType getDataNodeRequestType() {
- return DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE;
- }
-
- @Override
- public AbstractRetryHandler getSingleRequestHandler() {
- return new InvalidateMatchedSchemaCacheHandler(dataNodeLocationMap, dataNodeResponseMap);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/RollbackSchemaBlackListDataNodeTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/RollbackSchemaBlackListDataNodeTask.java
deleted file mode 100644
index 3a24ae7898..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/task/RollbackSchemaBlackListDataNodeTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.async.task;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.RollbackSchemaBlackListHandler;
-
-import java.util.Map;
-
-public class RollbackSchemaBlackListDataNodeTask extends AbstractDataNodeTask<TSStatus> {
-
- public RollbackSchemaBlackListDataNodeTask(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- super(dataNodeLocationMap);
- }
-
- @Override
- public DataNodeRequestType getDataNodeRequestType() {
- return DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST;
- }
-
- @Override
- public AbstractRetryHandler getSingleRequestHandler() {
- return new RollbackSchemaBlackListHandler(dataNodeLocationMap, dataNodeResponseMap);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index de74066c1b..557a00f55f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.sync.confignode;
+package org.apache.iotdb.confignode.client.sync;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
similarity index 99%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 41a8d2d914..d50210bb4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.sync.datanode;
+package org.apache.iotdb.confignode.client.sync;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
index d47a3f7424..6b9031c30a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 257a68d1ca..7627405067 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -28,8 +28,9 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
@@ -216,37 +217,39 @@ public class ClusterSchemaManager {
TSStatusCode.STORAGE_GROUP_NOT_EXIST,
"Path [" + new PartialPath(setTTLPlan.getStorageGroupPathPattern()) + "] does not exist");
}
- Map<Integer, TDataNodeLocation> dataNodeLocationMaps = new ConcurrentHashMap<>();
+
+ // Map<DataNodeId, TDataNodeLocation>
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+ // Map<DataNodeId, StorageGroupPatterns>
Map<Integer, List<String>> dnlToSgMap = new ConcurrentHashMap<>();
for (String storageGroup : storageSchemaMap.keySet()) {
-
+ // Get related DataNodes
Set<TDataNodeLocation> dataNodeLocations =
getPartitionManager()
.getStorageGroupRelatedDataNodes(storageGroup, TConsensusGroupType.DataRegion);
+
for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
- if (!dataNodeLocationMaps.containsKey(dataNodeLocation.getDataNodeId())) {
- dataNodeLocationMaps.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
- List<String> storageGroups = new ArrayList<>();
- storageGroups.add(storageGroup);
- dnlToSgMap.put(dataNodeLocation.getDataNodeId(), storageGroups);
- } else {
- List<String> storageGroups = dnlToSgMap.get(dataNodeLocation.getDataNodeId());
- storageGroups.add(storageGroup);
- dnlToSgMap.put(dataNodeLocation.getDataNodeId(), storageGroups);
- }
+ dataNodeLocationMap.putIfAbsent(dataNodeLocation.getDataNodeId(), dataNodeLocation);
+ dnlToSgMap
+ .computeIfAbsent(dataNodeLocation.getDataNodeId(), empty -> new ArrayList<>())
+ .add(storageGroup);
}
}
- for (Map.Entry<Integer, List<String>> entry : dnlToSgMap.entrySet()) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- dataNodeLocationMap.put(entry.getKey(), dataNodeLocationMaps.get(entry.getKey()));
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- new TSetTTLReq(entry.getValue(), setTTLPlan.getTTL()),
- dataNodeLocationMap,
- DataNodeRequestType.SET_TTL,
- null);
- }
+ AsyncClientHandler<TSetTTLReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.SET_TTL);
+ dnlToSgMap
+ .keySet()
+ .forEach(
+ dataNodeId -> {
+ TSetTTLReq setTTLReq =
+ new TSetTTLReq(dnlToSgMap.get(dataNodeId), setTTLPlan.getTTL());
+ clientHandler.putRequest(dataNodeId, setTTLReq);
+ clientHandler.putDataNodeLocation(dataNodeId, dataNodeLocationMap.get(dataNodeId));
+ });
+ // TODO: Check response
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+
return getConsensusManager().write(setTTLPlan).getStatus();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index a1e2ca29ef..150d7175c3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 656e5880f6..e57b16e56f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PathPatternTree;
@@ -34,15 +35,15 @@ import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
@@ -212,9 +213,11 @@ public class ProcedureManager {
*
* @return SUCCESS_STATUS if all RegionGroups created successfully, CREATE_REGION_ERROR otherwise
*/
- public TSStatus createRegionGroups(CreateRegionGroupsPlan createRegionGroupsPlan) {
+ public TSStatus createRegionGroups(
+ TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
long procedureId =
- executor.submitProcedure(new CreateRegionGroupsProcedure(createRegionGroupsPlan));
+ executor.submitProcedure(
+ new CreateRegionGroupsProcedure(consensusGroupType, createRegionGroupsPlan));
List<TSStatus> statusList = new ArrayList<>();
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 0309ace63a..9a380741ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -22,7 +22,8 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -34,8 +35,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -81,17 +80,13 @@ public class UDFManager {
String functionName, String className, List<String> uris) {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TCreateFunctionRequest request =
new TCreateFunctionRequest(functionName, className, uris);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- request,
- dataNodeLocationMap,
- DataNodeRequestType.CREATE_FUNCTION,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+
+ AsyncClientHandler<TCreateFunctionRequest, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.CREATE_FUNCTION, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public TSStatus dropFunction(String functionName) {
@@ -113,15 +108,11 @@ public class UDFManager {
private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- request,
- dataNodeLocationMap,
- DataNodeRequestType.DROP_FUNCTION,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+
+ AsyncClientHandler<TDropFunctionRequest, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.DROP_FUNCTION, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 7f01dcde96..902c77be99 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -30,7 +31,8 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
@@ -248,12 +250,13 @@ public class LoadManager {
LOGGER.info("[latestRegionRouteMap] Begin to broadcast RegionRouteMap:");
long broadcastTime = System.currentTimeMillis();
printRegionRouteMap(broadcastTime, latestRegionRouteMap);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
- dataNodeLocationMap,
+
+ AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
- null);
+ new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+ dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap finished.");
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index f997298e3d..df31b8c922 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -33,12 +33,13 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeHeartbeatClientPool;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeHeartbeatClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -74,7 +75,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -463,62 +463,47 @@ public class NodeManager {
public List<TSStatus> merge() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- null, dataNodeLocationMap, DataNodeRequestType.MERGE, dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncClientHandler<Object, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.MERGE, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public List<TSStatus> flush(TFlushReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- req, dataNodeLocationMap, DataNodeRequestType.FLUSH, dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncClientHandler<TFlushReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public List<TSStatus> clearCache() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- null, dataNodeLocationMap, DataNodeRequestType.CLEAR_CACHE, dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncClientHandler<Object, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public List<TSStatus> loadConfiguration() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- null,
- dataNodeLocationMap,
- DataNodeRequestType.LOAD_CONFIGURATION,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncClientHandler<Object, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public List<TSStatus> setSystemStatus(String status) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- status,
- dataNodeLocationMap,
- DataNodeRequestType.SET_SYSTEM_STATUS,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncClientHandler<String, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
/** Start the heartbeat service */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 8a666f53b8..29d4098917 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
@@ -389,7 +389,8 @@ public class PartitionManager {
if (!allotmentMap.isEmpty()) {
CreateRegionGroupsPlan createRegionGroupsPlan =
getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
- result = getProcedureManager().createRegionGroups(createRegionGroupsPlan);
+ result =
+ getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan);
} else {
result = RpcUtils.SUCCESS_STATUS;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index c8133dda1a..6862be0a4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -29,9 +30,10 @@ import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
@@ -51,10 +53,13 @@ import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -66,7 +71,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -302,12 +306,19 @@ public class ConfigNodeProcedureEnv {
ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
}
- /** notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced */
+ /** Notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced */
public void broadCastTheLatestConfigNodeGroup() {
- AsyncDataNodeClientPool.getInstance()
- .broadCastTheLatestConfigNodeGroup(
- configManager.getNodeManager().getRegisteredDataNodeLocations(),
- configManager.getNodeManager().getRegisteredConfigNodes());
+ List<TConfigNodeLocation> registeredConfigNodes =
+ configManager.getNodeManager().getRegisteredConfigNodes();
+ AsyncClientHandler<TUpdateConfigNodeGroupReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
+ new TUpdateConfigNodeGroupReq(registeredConfigNodes),
+ configManager.getNodeManager().getRegisteredDataNodeLocations());
+
+ LOG.info("Begin to broadcast the latest configNodeGroup: {}", registeredConfigNodes);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ LOG.info("Broadcast the latest configNodeGroup finished.");
}
/**
@@ -327,7 +338,97 @@ public class ConfigNodeProcedureEnv {
* @return Those RegionReplicas that failed to create
*/
public Map<TConsensusGroupId, TRegionReplicaSet> doRegionCreation(
+ TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
+
+ // Prepare clientHandler
+ AsyncClientHandler<?, TSStatus> clientHandler;
+ switch (consensusGroupType) {
+ case SchemaRegion:
+ clientHandler = getCreateSchemaRegionClientHandler(createRegionGroupsPlan);
+ break;
+ case DataRegion:
+ default:
+ clientHandler = getCreateDataRegionClientHandler(createRegionGroupsPlan);
+ break;
+ }
+ if (clientHandler.getRequestIndices().isEmpty()) {
+ return new HashMap<>();
+ }
+
+ // Send CreateRegion requests to DataNodes
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+
+ // Filter RegionGroups that weren't created successfully
+ int requestId = 0;
+ Map<Integer, TSStatus> responseMap = clientHandler.getResponseMap();
+ Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
+ for (List<TRegionReplicaSet> regionReplicaSets :
+ createRegionGroupsPlan.getRegionGroupMap().values()) {
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ if (responseMap.get(requestId).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failedRegions
+ .computeIfAbsent(
+ regionReplicaSet.getRegionId(),
+ empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
+ .addToDataNodeLocations(dataNodeLocation);
+ }
+ requestId += 1;
+ }
+ }
+ }
+ return failedRegions;
+ }
+
+ private AsyncClientHandler<TCreateSchemaRegionReq, TSStatus> getCreateSchemaRegionClientHandler(
CreateRegionGroupsPlan createRegionGroupsPlan) {
+ AsyncClientHandler<TCreateSchemaRegionReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.CREATE_SCHEMA_REGION);
+
+ int requestId = 0;
+ for (Map.Entry<String, List<TRegionReplicaSet>> sgRegionsEntry :
+ createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+ String storageGroup = sgRegionsEntry.getKey();
+ List<TRegionReplicaSet> regionReplicaSets = sgRegionsEntry.getValue();
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ clientHandler.putRequest(
+ requestId, genCreateSchemaRegionReq(storageGroup, regionReplicaSet));
+ clientHandler.putDataNodeLocation(requestId, dataNodeLocation);
+ requestId += 1;
+ }
+ }
+ }
+
+ return clientHandler;
+ }
+
+ private AsyncClientHandler<TCreateDataRegionReq, TSStatus> getCreateDataRegionClientHandler(
+ CreateRegionGroupsPlan createRegionGroupsPlan) {
+ Map<String, Long> ttlMap = getTTLMap(createRegionGroupsPlan);
+ AsyncClientHandler<TCreateDataRegionReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.CREATE_DATA_REGION);
+
+ int requestId = 0;
+ for (Map.Entry<String, List<TRegionReplicaSet>> sgRegionsEntry :
+ createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+ String storageGroup = sgRegionsEntry.getKey();
+ List<TRegionReplicaSet> regionReplicaSets = sgRegionsEntry.getValue();
+ long ttl = ttlMap.get(storageGroup);
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ clientHandler.putRequest(
+ requestId, genCreateDataRegionReq(storageGroup, regionReplicaSet, ttl));
+ clientHandler.putDataNodeLocation(requestId, dataNodeLocation);
+ requestId += 1;
+ }
+ }
+ }
+
+ return clientHandler;
+ }
+
+ private Map<String, Long> getTTLMap(CreateRegionGroupsPlan createRegionGroupsPlan) {
Map<String, Long> ttlMap = new HashMap<>();
for (String storageGroup : createRegionGroupsPlan.getRegionGroupMap().keySet()) {
try {
@@ -335,11 +436,28 @@ public class ConfigNodeProcedureEnv {
storageGroup,
getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
} catch (StorageGroupNotExistsException e) {
- // Notice: This line will never
+ // Notice: This line will never reach since we've checked before
LOG.error("StorageGroup doesn't exist", e);
}
}
- return AsyncDataNodeClientPool.getInstance().createRegionGroups(createRegionGroupsPlan, ttlMap);
+ return ttlMap;
+ }
+
+ private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet) {
+ TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ return req;
+ }
+
+ private TCreateDataRegionReq genCreateDataRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+ TCreateDataRegionReq req = new TCreateDataRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ req.setTtl(TTL);
+ return req;
}
public long getTTL(String storageGroup) throws StorageGroupNotExistsException {
@@ -375,72 +493,58 @@ public class ConfigNodeProcedureEnv {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
nodeManager.getRegisteredDataNodeLocations();
- final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TCreateTriggerInstanceReq request =
new TCreateTriggerInstanceReq(
triggerInformation.serialize(), ByteBuffer.wrap(jarFile.getValues()));
+
+ AsyncClientHandler<TCreateTriggerInstanceReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.CREATE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
// TODO: The request sent to DataNodes which stateful triggerInstance needn't to be created
// don't set
// JarFile
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- request,
- dataNodeLocationMap,
- DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public List<TSStatus> dropTriggerOnDataNodes(String triggerName, boolean needToDeleteJarFile) {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
nodeManager.getRegisteredDataNodeLocations();
- final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TDropTriggerInstanceReq request =
new TDropTriggerInstanceReq(triggerName, needToDeleteJarFile);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- request,
- dataNodeLocationMap,
- DataNodeRequestType.DROP_TRIGGER_INSTANCE,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+
+ AsyncClientHandler<TDropTriggerInstanceReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.DROP_TRIGGER_INSTANCE, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public List<TSStatus> activeTriggerOnDataNodes(String triggerName) {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
nodeManager.getRegisteredDataNodeLocations();
- final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TActiveTriggerInstanceReq request = new TActiveTriggerInstanceReq(triggerName);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- request,
- dataNodeLocationMap,
- DataNodeRequestType.ACTIVE_TRIGGER_INSTANCE,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncClientHandler<TActiveTriggerInstanceReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.ACTIVE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
nodeManager.getRegisteredDataNodeLocations();
- final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TInactiveTriggerInstanceReq request = new TInactiveTriggerInstanceReq(triggerName);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- request,
- dataNodeLocationMap,
- DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE,
- dataNodeResponseStatus);
- return dataNodeResponseStatus;
+ AsyncClientHandler<TInactiveTriggerInstanceReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
}
public LockQueue getNodeLock() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index bcf5ba1a6e..73e2b65c48 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -26,8 +26,8 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index a016160087..a9fb7bf404 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.state.CreateTriggerState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
index 7902466653..3d1c764ad5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTrigger
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.state.DropTriggerState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index 1bd5565dc4..b4b35ee77f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.slf4j.Logger;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 82131683f6..fe1dd6fc67 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
index d63a7c6c20..5e560ce799 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 1743e1f111..f46d6bf4f7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
similarity index 90%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 4e7bf3a6ef..10f24259fd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -16,19 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
@@ -49,6 +49,8 @@ public class CreateRegionGroupsProcedure
private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionGroupsProcedure.class);
+ private TConsensusGroupType consensusGroupType;
+
private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
/** key: TConsensusGroupId value: Failed RegionReplicas */
@@ -58,13 +60,18 @@ public class CreateRegionGroupsProcedure
super();
}
- public CreateRegionGroupsProcedure(CreateRegionGroupsPlan createRegionGroupsPlan) {
+ public CreateRegionGroupsProcedure(
+ TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
+ this.consensusGroupType = consensusGroupType;
this.createRegionGroupsPlan = createRegionGroupsPlan;
}
+ @TestOnly
public CreateRegionGroupsProcedure(
+ TConsensusGroupType consensusGroupType,
CreateRegionGroupsPlan createRegionGroupsPlan,
Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets) {
+ this.consensusGroupType = consensusGroupType;
this.createRegionGroupsPlan = createRegionGroupsPlan;
this.failedRegionReplicaSets = failedRegionReplicaSets;
}
@@ -73,7 +80,7 @@ public class CreateRegionGroupsProcedure
protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) {
switch (state) {
case CREATE_REGION_GROUPS:
- failedRegionReplicaSets = env.doRegionCreation(createRegionGroupsPlan);
+ failedRegionReplicaSets = env.doRegionCreation(consensusGroupType, createRegionGroupsPlan);
setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS);
break;
case SHUNT_REGION_REPLICAS:
@@ -214,6 +221,7 @@ public class CreateRegionGroupsProcedure
// must serialize CREATE_REGION_GROUPS.ordinal() firstly
stream.writeInt(ProcedureFactory.ProcedureType.CREATE_REGION_GROUPS.ordinal());
super.serialize(stream);
+ stream.writeInt(consensusGroupType.getValue());
createRegionGroupsPlan.serializeForProcedure(stream);
stream.writeInt(failedRegionReplicaSets.size());
failedRegionReplicaSets.forEach(
@@ -226,6 +234,7 @@ public class CreateRegionGroupsProcedure
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
+ this.consensusGroupType = TConsensusGroupType.findByValue(byteBuffer.getInt());
try {
createRegionGroupsPlan.deserializeForProcedure(byteBuffer);
failedRegionReplicaSets.clear();
@@ -244,21 +253,17 @@ public class CreateRegionGroupsProcedure
}
@Override
- public boolean equals(Object that) {
- if (that instanceof CreateRegionGroupsProcedure) {
- CreateRegionGroupsProcedure thatProc = (CreateRegionGroupsProcedure) that;
- return thatProc.getProcId() == this.getProcId()
- && thatProc.getState() == this.getState()
- && thatProc.createRegionGroupsPlan.equals(this.createRegionGroupsPlan)
- && thatProc.failedRegionReplicaSets.equals(this.failedRegionReplicaSets);
- }
- return false;
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CreateRegionGroupsProcedure that = (CreateRegionGroupsProcedure) o;
+ return consensusGroupType == that.consensusGroupType
+ && createRegionGroupsPlan.equals(that.createRegionGroupsPlan)
+ && failedRegionReplicaSets.equals(that.failedRegionReplicaSets);
}
@Override
public int hashCode() {
- int result = createRegionGroupsPlan.hashCode();
- result = 31 * result + Objects.hash(failedRegionReplicaSets);
- return result;
+ return Objects.hash(consensusGroupType, createRegionGroupsPlan, failedRegionReplicaSets);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteStorageGroupProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteStorageGroupProcedure.java
index 8f464ad141..4bc9eaa436 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteStorageGroupProcedure.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteTimeSeriesProcedure.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteTimeSeriesProcedure.java
index 57f2b776c5..645865efdc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/DeleteTimeSeriesProcedure.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -26,14 +26,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.task.ConstructSchemaBlackListDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.DeleteDataForDeleteTimeSeriesDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.DeleteTimeSeriesDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.FetchSchemaBlackListDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.InvalidateMatchedSchemaCacheDataNodeTask;
-import org.apache.iotdb.confignode.client.async.task.RollbackSchemaBlackListDataNodeTask;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -151,21 +146,16 @@ public class DeleteTimeSeriesProcedure
// construct request and send
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
- ConstructSchemaBlackListDataNodeTask dataNodeTask =
- new ConstructSchemaBlackListDataNodeTask(dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
+
+ AsyncClientHandler<TConstructSchemaBlackListReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.CONSTRUCT_SCHEMA_BLACK_LIST,
new TConstructSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
- dataNodeTask);
- dataNodeTask
- .getDataNodeResponseMap()
- .forEach(
- (k, v) -> {
- if (v.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- responseMap.put(k, v);
- }
- });
- return dataNodeTask.getDataNodeResponseMap();
+ dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ responseMap.putAll(clientHandler.getResponseMap());
+ return clientHandler.getResponseMap();
}
};
constructBlackListTask.execute();
@@ -184,12 +174,13 @@ public class DeleteTimeSeriesProcedure
private void invalidateCache(ConfigNodeProcedureEnv env) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- InvalidateMatchedSchemaCacheDataNodeTask dataNodeTask =
- new InvalidateMatchedSchemaCacheDataNodeTask(dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- new TInvalidateMatchedSchemaCacheReq(patternTreeBytes), dataNodeTask);
- Map<Integer, TSStatus> statusMap = dataNodeTask.getDataNodeResponseMap();
+ AsyncClientHandler<TInvalidateMatchedSchemaCacheReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
+ new TInvalidateMatchedSchemaCacheReq(patternTreeBytes),
+ dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
for (TSStatus status : statusMap.values()) {
// all dataNodes must clear the related schema cache
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -279,15 +270,17 @@ public class DeleteTimeSeriesProcedure
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
- DeleteDataForDeleteTimeSeriesDataNodeTask dataNodeTask =
- new DeleteDataForDeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
+ AsyncClientHandler<TDeleteDataForDeleteTimeSeriesReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.DELETE_DATA_FOR_DELETE_TIMESERIES,
new TDeleteDataForDeleteTimeSeriesReq(
new ArrayList<>(consensusGroupIdList),
preparePatternTreeBytesData(patternTree)),
- dataNodeTask);
- return dataNodeTask.getDataNodeResponseMap();
+ dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ responseMap.putAll(clientHandler.getResponseMap());
+ return clientHandler.getResponseMap();
}
};
deleteDataTask.setExecuteOnAllReplicaset(true);
@@ -305,21 +298,16 @@ public class DeleteTimeSeriesProcedure
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
- FetchSchemaBlackListDataNodeTask dataNodeTask =
- new FetchSchemaBlackListDataNodeTask(dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
+ AsyncClientHandler<TFetchSchemaBlackListReq, TFetchSchemaBlackListResp> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.FETCH_SCHEMA_BLACK_LIST,
new TFetchSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
- dataNodeTask);
- Map<Integer, TFetchSchemaBlackListResp> respMap = dataNodeTask.getDataNodeResponseMap();
+ dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ responseMap.putAll(clientHandler.getResponseMap());
Map<Integer, TSStatus> statusMap = new HashMap<>();
- respMap.forEach(
- (k, v) -> {
- if (v.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- responseMap.put(k, v);
- }
- statusMap.put(k, v.getStatus());
- });
+ clientHandler.getResponseMap().forEach((k, v) -> statusMap.put(k, v.getStatus()));
return statusMap;
}
};
@@ -352,12 +340,15 @@ public class DeleteTimeSeriesProcedure
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
- DeleteTimeSeriesDataNodeTask dataNodeTask =
- new DeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
+ AsyncClientHandler<TDeleteTimeSeriesReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.DELETE_TIMESERIES,
+ new TDeleteTimeSeriesReq(consensusGroupIdList, patternTreeBytes),
+ dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- new TDeleteTimeSeriesReq(consensusGroupIdList, patternTreeBytes), dataNodeTask);
- return dataNodeTask.getDataNodeResponseMap();
+ .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ responseMap.putAll(clientHandler.getResponseMap());
+ return clientHandler.getResponseMap();
}
};
deleteTimeSeriesTask.execute();
@@ -424,13 +415,15 @@ public class DeleteTimeSeriesProcedure
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
- RollbackSchemaBlackListDataNodeTask dataNodeTask =
- new RollbackSchemaBlackListDataNodeTask(dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
+ AsyncClientHandler<TRollbackSchemaBlackListReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.ROLLBACK_SCHEMA_BLACK_LIST,
new TRollbackSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
- dataNodeTask);
- return dataNodeTask.getDataNodeResponseMap();
+ dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ responseMap.putAll(clientHandler.getResponseMap());
+ return clientHandler.getResponseMap();
}
};
rollbackStateTask.execute();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 6d2f87d972..6d4343c084 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -17,14 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/StateMachineProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/StateMachineProcedure.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/StateMachineProcedure.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/StateMachineProcedure.java
index efda7c4ed8..0178431bd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/StateMachineProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/StateMachineProcedure.java
@@ -17,8 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure;
+package org.apache.iotdb.confignode.procedure.impl.statemachine;
+import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 218d976a45..6951d3cb0e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -20,15 +20,15 @@
package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 3d736db4f2..6d5411c47f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 0ba97cb955..92ad262529 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -82,8 +82,8 @@ import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTrigger
import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
-import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -709,7 +709,8 @@ public class ConfigPhysicalPlanSerDeTest {
createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
CreateRegionGroupsProcedure procedure0 =
- new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions);
+ new CreateRegionGroupsProcedure(
+ TConsensusGroupType.DataRegion, createRegionGroupsPlan, failedRegions);
updateProcedurePlan0.setProcedure(procedure0);
updateProcedurePlan1 =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
index 86161619b3..25a39b04cf 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.confignode.procedure.entity;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
index 71a43580d5..1761a90337 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.confignode.procedure.entity;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
import java.io.DataOutputStream;
import java.io.IOException;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
index 365e753e6f..fbadc74ec6 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
@@ -20,10 +20,12 @@
package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -91,7 +93,8 @@ public class CreateRegionGroupsProcedureTest {
createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
CreateRegionGroupsProcedure procedure0 =
- new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions0);
+ new CreateRegionGroupsProcedure(
+ TConsensusGroupType.DataRegion, createRegionGroupsPlan, failedRegions0);
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java
index 9698f19882..db37e59002 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedureTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.procedure.impl;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java
index 189f855f54..983893f3d0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedureTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.junit.Assert;